#!/usr/bin/env python3 import os import re import subprocess import shutil import sys import time from typing import List, Tuple, Set import psutil from config import Config def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool: for b in blocked: if b.findall(window_name): return True return False def get_active_window_name_and_pid() -> Tuple[str, str]: cmd = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"] p = subprocess.run(cmd, capture_output=True) if p.returncode != 0: return ("", "") window_name, window_pid, _ = p.stdout.decode().split("\n") return (window_name, window_pid) def kill_sequence(blocked: List[re.Pattern]) -> None: def to_display(name: str) -> str: return name if len(name) < 30 else name[:30] + "..." for count in range(5, 0, -1): window_name, window_pid = get_active_window_name_and_pid() if not is_window_blocked(window_name, blocked): notify(f"[okay] {to_display(window_name)}") return notify(f"[kill {count}s] {to_display(window_name)}") time.sleep(1) p = psutil.Process(int(window_pid)) p.terminate() def notify(message: str) -> None: """ Notify user via the Xorg notify-send command. """ env = { **os.environ, "DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus" } user = env.get("SUDO_USER", None) if user is None: cmd = ["notify-send", message] else: cmd = ["runuser", "-m", "-u", user, "notify-send", message] subprocess.run(cmd, env=env) def write_pid_file(config: Config) -> str: p = psutil.Process() pid_file = os.path.join(config.directory, f"{p.pid}.pid") with open(pid_file, "w") as f: f.write(str(p.pid)) return pid_file def terminate_existing_blocker(config: Config) -> None: this_pid = psutil.Process().pid pid_files = [f for f in config.directory if f.endswith(".pid")] for pid_file in pid_files: pid = int(pid_file.replace(".pid", "")) if this_pid == pid: continue try: p = psutil.Process(pid) p.terminate() except psutil.NoSuchProcess: print(f"Could not terminate {p.pid=}.") pid_file = os.path.join(config.directory, pid_file) try: os.remove(pid_file) except PermissionError: print(f"Could not remove {pid_file=}.") def load_blocked_patterns(config: Config) -> List[re.Pattern]: blocked = [] for block in config.blocks: for item in block.items: s = "".join([block.prefix, item, block.postfix]) r = re.compile(s, re.IGNORECASE) blocked.append(r) return blocked def init(config: Config) -> None: terminate_existing_blocker(config) write_pid_file(config) def run_checks(config: Config) -> None: # shutil.which("xdotool") # That's what we do if anything goes wrong: # xkill --all pass def kill_window_if_blocked(blocked: List[re.Pattern]) -> None: window_name, window_pid = get_active_window_name_and_pid() if is_window_blocked(window_name, blocked): kill_sequence(blocked) def is_process_active(process_name: str) -> bool: for proc in psutil.process_iter(): if proc.name() == process_name: return True return False def enforce_aw_commit(): def aw_commit_active(): return is_process_active("aw-commit") def to_display(name: str) -> str: return name if len(name) < 30 else name[:30] + "..." if aw_commit_active(): return for _ in range(10, 0, -1): notify(f"[warning] aw-commit not running") time.sleep(1) if aw_commit_active(): return if aw_commit_active(): return window_name, window_pid = get_active_window_name_and_pid() if window_name: notify(f"[kill aw-commit not running] {to_display(window_name)}") p = psutil.Process(int(window_pid)) p.terminate() return enforce_aw_commit() def main_root(config: Config) -> None: init(config) blocked = load_blocked_patterns(config) while True: time.sleep(config.sleep_time) run_checks(config) kill_window_if_blocked(blocked) if config.enforce_aw_commit: enforce_aw_commit() def main() -> None: """ Run main_root as root except while debugging. """ config_path = "~/.config/aw-focus/config.json" config = Config.load_config(config_path) if config.start_as_user: terminate_existing_blocker(config) main_root(config) if os.geteuid() == 0: newpid = os.fork() if newpid == 0: main_root(config) else: cmd = ["sudo", config.aw_focus_cmd] + sys.argv[1:] subprocess.Popen(cmd) if __name__ == "__main__": main() def load_window_names(config: Config) -> Set[str]: window_names_file = os.path.join(config.directory, config.window_names) if not os.path.isfile(window_names_file): return set() with open(window_names_file, "r") as f: return {l.strip() for l in f.readlines()} def write_window_names(config: Config, window_names: Set[str]) -> None: window_names_file = os.path.join(config.directory, config.window_names) window_names = "\n".join(sorted(list(window_names))) with open(window_names_file, "w") as f: f.write(window_names)