diff --git a/.gitignore b/.gitignore index 0bf8259..794845b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +config.yaml # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/antidrift.py b/antidrift.py index 5da6aa3..6c59b96 100644 --- a/antidrift.py +++ b/antidrift.py @@ -1,194 +1,82 @@ #!/usr/bin/env python3 -import os -import re -import subprocess +import logging import shutil import sys import time -from typing import List, Tuple, Set -import psutil -from config import Config +import re +import xwindow +from config import Config, Block +from typing import List -def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool: - for b in blocked: - if b.findall(window_name): - return True - return False +def check_for_xdotool(): + r = shutil.which("xdotool") + if not r: + logging.critical("Please install xdotool") + sys.exit(1) -def get_active_window_name_and_pid() -> Tuple[str, str]: - cmd = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"] - p = subprocess.run(cmd, capture_output=True) - if p.returncode != 0: - return ("", "") - window_name, window_pid, _ = p.stdout.decode().split("\n") - return (window_name, window_pid) +def init_logging(): + FORMAT = '%(levelname)-8s | %(message)s' + logging.basicConfig(format=FORMAT, level=logging.DEBUG) -def kill_sequence(blocked: List[re.Pattern]) -> None: - def to_display(name: str) -> str: - return name if len(name) < 30 else name[:30] + "..." +def window_is_blocked(config: Config) -> bool: + # These should be selectable in the future (not all at the same time) + blackblocks = config.blackblocks + whiteblocks = config.whiteblocks - for count in range(5, 0, -1): - window_name, window_pid = get_active_window_name_and_pid() - if not is_window_blocked(window_name, blocked): - notify(f"[okay] {to_display(window_name)}") - return - notify(f"[kill {count}s] {to_display(window_name)}") - time.sleep(1) - - p = psutil.Process(int(window_pid)) - p.terminate() + window = xwindow.XWindow() + if not window.keywords: + return False + for b in blackblocks: + for k in b.keywords: + if k in window.keywords: + xwindow.notify(f"{window.name[:30]} blocked by {b.name}.") + logging.warning(f"{window.name[:30]} blocked by {b.name}.") + return True + if config.blackblocks_only: + logging.debug(f"All non-blacklisted windows are allowed.") + return False + for w in whiteblocks: + for k in w.keywords: + if k in window.keywords: + logging.debug(f"{window.name[:30]} allowed by {w.name}.") + return False + xwindow.notify(f"{window.name[:30]} not on any whitelist.") + return True -def notify(message: str) -> None: - """ Notify user via the Xorg notify-send command. """ - env = { - **os.environ, - "DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus" - } - user = env.get("SUDO_USER", None) - if user is None: - cmd = ["notify-send", message] +def enforce(config: Config): + if not window_is_blocked(config): + return + xwindow.notify(f"AntiDrift will minimize the window in {config.minimize_delay} seconds.") + time.sleep(config.minimize_delay) + window = xwindow.XWindow() + if window_is_blocked(config): + window = xwindow.XWindow() + xwindow.notify(f"Minimize {window.name[:30]}.") + window.minimize() else: - cmd = ["runuser", "-m", "-u", user, "notify-send", message] - subprocess.run(cmd, env=env) + xwindow.notify(f"We are gucci again.") -def write_pid_file(config: Config) -> str: - p = psutil.Process() - pid_file = os.path.join(config.directory, f"{p.pid}.pid") - with open(pid_file, "w") as f: - f.write(str(p.pid)) - return pid_file - - -def terminate_existing_blocker(config: Config) -> None: - this_pid = psutil.Process().pid - pid_files = [f for f in config.directory if f.endswith(".pid")] - for pid_file in pid_files: - pid = int(pid_file.replace(".pid", "")) - if this_pid == pid: - continue - try: - p = psutil.Process(pid) - p.terminate() - except psutil.NoSuchProcess: - print(f"Could not terminate {p.pid=}.") - pid_file = os.path.join(config.directory, pid_file) - try: - os.remove(pid_file) - except PermissionError: - print(f"Could not remove {pid_file=}.") - - -def load_blocked_patterns(config: Config) -> List[re.Pattern]: - blocked = [] - for block in config.blocks: - for item in block.items: - s = "".join([block.prefix, item, block.postfix]) - r = re.compile(s, re.IGNORECASE) - blocked.append(r) - return blocked - - -def init(config: Config) -> None: - terminate_existing_blocker(config) - write_pid_file(config) - - -def run_checks(config: Config) -> None: - # shutil.which("xdotool") - # That's what we do if anything goes wrong: - # xkill --all - pass - - -def kill_window_if_blocked(blocked: List[re.Pattern]) -> None: - window_name, window_pid = get_active_window_name_and_pid() - if is_window_blocked(window_name, blocked): - kill_sequence(blocked) - - -def is_process_active(process_name: str) -> bool: - for proc in psutil.process_iter(): - if proc.name() == process_name: - return True - return False - - -def enforce_aw_commit(): - def aw_commit_active(): - return is_process_active("aw-commit") - - def to_display(name: str) -> str: - return name if len(name) < 30 else name[:30] + "..." - - if aw_commit_active(): - return - - for _ in range(10, 0, -1): - notify(f"[warning] aw-commit not running") - time.sleep(1) - if aw_commit_active(): - return - - if aw_commit_active(): - return - - window_name, window_pid = get_active_window_name_and_pid() - if window_name: - notify(f"[kill aw-commit not running] {to_display(window_name)}") - p = psutil.Process(int(window_pid)) - p.terminate() - return enforce_aw_commit() - - -def main_root(config: Config) -> None: - init(config) - blocked = load_blocked_patterns(config) +def session(config: Config): + logging.info(f"Start session with {len(config.whiteblocks)} whiteblocks") while True: - time.sleep(config.sleep_time) - run_checks(config) - kill_window_if_blocked(blocked) - if config.enforce_aw_commit: - enforce_aw_commit() + enforce(config) + time.sleep(config.check_delay) def main() -> None: - """ Run main_root as root except while debugging. """ - config_path = "~/.config/aw-focus/config.json" - config = Config.load_config(config_path) - - if config.start_as_user: - terminate_existing_blocker(config) - main_root(config) - - if os.geteuid() == 0: - newpid = os.fork() - if newpid == 0: - main_root(config) - else: - cmd = ["sudo", config.aw_focus_cmd] + sys.argv[1:] - subprocess.Popen(cmd) + init_logging() + logging.info("AntiDrift running") + check_for_xdotool() + config = Config.load("config.yaml") + session(config) if __name__ == "__main__": main() - -def load_window_names(config: Config) -> Set[str]: - window_names_file = os.path.join(config.directory, config.window_names) - if not os.path.isfile(window_names_file): - return set() - with open(window_names_file, "r") as f: - return {l.strip() for l in f.readlines()} - - -def write_window_names(config: Config, window_names: Set[str]) -> None: - window_names_file = os.path.join(config.directory, config.window_names) - window_names = "\n".join(sorted(list(window_names))) - with open(window_names_file, "w") as f: - f.write(window_names) \ No newline at end of file diff --git a/config.py b/config.py index 013590b..fe0f059 100644 --- a/config.py +++ b/config.py @@ -1,33 +1,36 @@ import json import os +import yaml +from pathlib import Path from typing import List from pydantic import BaseModel class Block(BaseModel): - name: str = '' - prefix: str = '' - postfix: str = '' - items: List[str] + name: str + keywords: List[str] + + +def load(filename: str) -> List[Block]: + result = [Block(path=filename, **block) for block in block_list] + return result class Config(BaseModel): - directory: str - blocks: List[Block] - start_as_user: bool = True - sleep_time: int = 1 - aw_focus_cmd: str = "aw-focus" - window_names: str = "window_names.txt" - enforce_aw_commit: bool = True + blackblocks: List[Block] + whiteblocks: List[Block] + check_delay: int = 1 + minimize_delay: int = 5 + blackblocks_only: bool = True class Config: extra = 'forbid' @classmethod - def load_config(cls, config_file: str) -> Config: + def load(cls, config_file: str) -> Config: config_file = os.path.expanduser(config_file) - with open(config_file, 'r', encoding='utf8') as f: - config_dict = json.load(f) + with open(config_file, "r") as f: + config_dict = yaml.safe_load(f) config = cls(**config_dict) - config.directory = os.path.expanduser(config.directory) return config + diff --git a/obsolete.py b/obsolete.py new file mode 100644 index 0000000..19234b5 --- /dev/null +++ b/obsolete.py @@ -0,0 +1,171 @@ + +def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool: + for b in blocked: + if b.findall(window_name): + return True + return False + + +def kill_sequence(blocked: List[re.Pattern]) -> None: + def to_display(name: str) -> str: + return name if len(name) < 30 else name[:30] + "..." + + for count in range(5, 0, -1): + window_name, window_pid = get_active_window_name_and_pid() + if not is_window_blocked(window_name, blocked): + notify(f"[okay] {to_display(window_name)}") + return + notify(f"[kill {count}s] {to_display(window_name)}") + time.sleep(1) + + p = psutil.Process(int(window_pid)) + p.terminate() + + +def notify(message: str) -> None: + """ Notify user via the Xorg notify-send command. """ + env = { + **os.environ, + "DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus" + } + user = env.get("SUDO_USER", None) + if user is None: + cmd = ["notify-send", message] + else: + cmd = ["runuser", "-m", "-u", user, "notify-send", message] + subprocess.run(cmd, env=env) + + +def write_pid_file(config: Config) -> str: + p = psutil.Process() + pid_file = os.path.join(config.directory, f"{p.pid}.pid") + with open(pid_file, "w") as f: + f.write(str(p.pid)) + return pid_file + + +def terminate_existing_blocker(config: Config) -> None: + this_pid = psutil.Process().pid + pid_files = [f for f in config.directory if f.endswith(".pid")] + for pid_file in pid_files: + pid = int(pid_file.replace(".pid", "")) + if this_pid == pid: + continue + try: + p = psutil.Process(pid) + p.terminate() + except psutil.NoSuchProcess: + print(f"Could not terminate {p.pid=}.") + pid_file = os.path.join(config.directory, pid_file) + try: + os.remove(pid_file) + except PermissionError: + print(f"Could not remove {pid_file=}.") + + +def load_blocked_patterns(config: Config) -> List[re.Pattern]: + blocked = [] + for block in config.blocks: + for item in block.items: + s = "".join([block.prefix, item, block.postfix]) + r = re.compile(s, re.IGNORECASE) + blocked.append(r) + return blocked + + +def init(config: Config) -> None: + terminate_existing_blocker(config) + write_pid_file(config) + + +def run_checks(config: Config) -> None: + # shutil.which("xdotool") + # That's what we do if anything goes wrong: + # xkill --all + pass + + +def kill_window_if_blocked(blocked: List[re.Pattern]) -> None: + window_name, window_pid = get_active_window_name_and_pid() + if is_window_blocked(window_name, blocked): + kill_sequence(blocked) + + +def is_process_active(process_name: str) -> bool: + for proc in psutil.process_iter(): + if proc.name() == process_name: + return True + return False + + +def enforce_aw_commit(): + def aw_commit_active(): + return is_process_active("aw-commit") + + def to_display(name: str) -> str: + return name if len(name) < 30 else name[:30] + "..." + + if aw_commit_active(): + return + + for _ in range(10, 0, -1): + notify(f"[warning] aw-commit not running") + time.sleep(1) + if aw_commit_active(): + return + + if aw_commit_active(): + return + + window_name, window_pid = get_active_window_name_and_pid() + if window_name: + notify(f"[kill aw-commit not running] {to_display(window_name)}") + p = psutil.Process(int(window_pid)) + p.terminate() + return enforce_aw_commit() + + +def load_window_names(config: Config) -> Set[str]: + window_names_file = os.path.join(config.directory, config.window_names) + if not os.path.isfile(window_names_file): + return set() + with open(window_names_file, "r") as f: + return {l.strip() for l in f.readlines()} + + +def write_window_names(config: Config, window_names: Set[str]) -> None: + window_names_file = os.path.join(config.directory, config.window_names) + window_names = "\n".join(sorted(list(window_names))) + with open(window_names_file, "w") as f: + f.write(window_names) + + +def main() -> None: + """ Run main_root as root except while debugging. """ + config_path = "~/.config/aw-focus/config.json" + config = Config.load_config(config_path) + + if config.start_as_user: + terminate_existing_blocker(config) + main_root(config) + + if os.geteuid() == 0: + newpid = os.fork() + if newpid == 0: + main_root(config) + else: + cmd = ["sudo", config.aw_focus_cmd] + sys.argv[1:] + subprocess.Popen(cmd) + + +def main_root(config: Config) -> None: + init(config) + blocked = load_blocked_patterns(config) + while True: + time.sleep(config.sleep_time) + run_checks(config) + kill_window_if_blocked(blocked) + if config.enforce_aw_commit: + enforce_aw_commit() + + diff --git a/xwindow.py b/xwindow.py new file mode 100644 index 0000000..91cb8d9 --- /dev/null +++ b/xwindow.py @@ -0,0 +1,47 @@ +import os +import re +import subprocess +import logging + + +class XWindow: + def __init__(self): + self.window = self._run(["getactivewindow"]) + if not self.window: + self.name = "" + self.cls = "" + self.pid = "" + else: + self.name = self._run(["getwindowname", self.window]) + self.cls = self._run(["getwindowclassname", self.window]) + self.pid = self._run(["getwindowpid", self.window]) + self.keywords = list(re.findall("\w+", self.name.lower())) + + def _run(self, cmd) -> str: + cmd = ["xdotool"] + cmd + p = subprocess.run(cmd, capture_output=True) + if p.returncode != 0: + return "" + return p.stdout.decode().strip() + + def minimize(self): + self._run(["windowminimize", self.window]) + + def quit(self): + self._run(["windowquit", self.window]) + + +def notify(message: str) -> None: + """ Notify user via the Xorg notify-send command. """ + logging.debug(f"notify: {message}") + env = { + **os.environ, + "DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus" + } + user = env.get("SUDO_USER", None) + if user is None: + cmd = ["notify-send", message] + else: + cmd = ["runuser", "-m", "-u", user, "notify-send", message] + subprocess.run(cmd, env=env) +