diff --git a/config.json b/config.json new file mode 120000 index 0000000..b59d268 --- /dev/null +++ b/config.json @@ -0,0 +1 @@ +/home/felixm/.config/focusfriend/config.json \ No newline at end of file diff --git a/config.py b/config.py index b3e2e63..b01361e 100644 --- a/config.py +++ b/config.py @@ -1,9 +1,10 @@ import json +import os from typing import List from pydantic import BaseModel -class BlockList(BaseModel): +class Block(BaseModel): name: str = '' prefix: str = '' postfix: str = '' @@ -11,14 +12,21 @@ class BlockList(BaseModel): class Config(BaseModel): + directory: str + blocks: List[Block] + start_as_user: bool = True + sleep_time: int = 1 + focusfriend_py: str = "focusfriend.py" + window_names: str = "window_names.txt" + class Config: extra = 'forbid' - blocklists: List[BlockList] - - -def load_config(config_file: str) -> Config: - with open(config_file, 'r', encoding='utf8') as f: - config_dict = json.load(f) - return Config(**config_dict) - + @classmethod + def load_config(cls, config_file: str) -> Config: + config_file = os.path.expanduser(config_file) + with open(config_file, 'r', encoding='utf8') as f: + config_dict = json.load(f) + config = cls(**config_dict) + config.directory = os.path.expanduser(config.directory) + return config diff --git a/focusfriend.code-workspace b/focusfriend.code-workspace index ab68550..9f23043 100644 --- a/focusfriend.code-workspace +++ b/focusfriend.code-workspace @@ -1,31 +1,32 @@ { - "python.linting.pylintEnabled": false, - "python.linting.enabled": false, - "folders": [ - { - "path": "." - } - ], - "launch": { - "version": "0.2.0", - "configurations": [ - { - "name": "Python: Current File", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal" - }, - { - "name": "FocusFriend Debug", - "type": "python", - "request": "launch", - "program": "focusfriend.py", - "console": "integratedTerminal", - "args": [ - "--debug", - ], - } - ] - } -} \ No newline at end of file + "folders": [ + { + "path": "." + } + ], + "launch": { + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + }, + { + "name": "FocusFriend Debug", + "type": "python", + "request": "launch", + "program": "focusfriend.py", + "console": "integratedTerminal", + "args": ["--debug"] + } + ] + }, + "settings": { + "python.linting.pylintEnabled": true, + "python.linting.enabled": true, + "python.linting.pylintArgs": ["--errors-only"] + } +} diff --git a/focusfriend.py b/focusfriend.py index 9ddf9db..1b04c5b 100755 --- a/focusfriend.py +++ b/focusfriend.py @@ -3,18 +3,12 @@ import os import re import subprocess +import shutil import sys import time from typing import List, Tuple, Set import psutil -import config - - -FOCUSFRIEND_PY = "focusfriend.py" -CONFIG_FILE = "config.json" -BLOCKER_CONFIG_DIR = "/home/{}/.config/focusfriend" -BLOCKED_BROWSER_WORDS_TXT = "blocked_browser_words.txt" -WINDOW_NAMES_TXT = "window_names.txt" +from config import Config def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool: @@ -26,13 +20,14 @@ def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool: def get_active_window_name_and_pid() -> Tuple[str, str]: cmd = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"] - p = subprocess.run(cmd, capture_output=True, check=True) + p = subprocess.run(cmd, capture_output=True) + if p.returncode != 0: + return ("", "") window_name, window_pid, _ = p.stdout.decode().split("\n") return (window_name, window_pid) def kill_sequence(blocked: List[re.Pattern]) -> None: - def to_display(name: str) -> str: return name if len(name) < 30 else name[:30] + "..." @@ -49,39 +44,30 @@ def kill_sequence(blocked: List[re.Pattern]) -> None: def notify(message: str) -> None: - """Notify user via the Xorg notify-send command. - - Args: - message (str): Message shown to the user. - """ + """ Notify user via the Xorg notify-send command. """ env = { **os.environ, "DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus" } - user = env["SUDO_USER"] - cmd = ["runuser", "-m", "-u", user, "notify-send", message] + user = env.get("SUDO_USER", None) + if user is None: + cmd = ["notify-send", message] + else: + cmd = ["runuser", "-m", "-u", user, "notify-send", message] subprocess.run(cmd, env=env) -def get_config_dir() -> str: - user = os.environ.get("SUDO_USER", False) or os.environ["USER"] - config_dir = BLOCKER_CONFIG_DIR.format(user) - assert(os.path.isdir(config_dir)) - return config_dir - - -def write_pid_file() -> str: +def write_pid_file(config: Config) -> str: p = psutil.Process() - pid_file = os.path.join(get_config_dir(), f"{p.pid}.pid") + pid_file = os.path.join(config.directory, f"{p.pid}.pid") with open(pid_file, "w") as f: - f.write(pid_file) + f.write(str(p.pid)) return pid_file -def terminate_existing_blocker() -> None: +def terminate_existing_blocker(config: Config) -> None: this_pid = psutil.Process().pid - config_dir = get_config_dir() - pid_files = [f for f in os.listdir(config_dir) if f.endswith(".pid")] + pid_files = [f for f in config.directory if f.endswith(".pid")] for pid_file in pid_files: pid = int(pid_file.replace(".pid", "")) if this_pid == pid: @@ -90,87 +76,83 @@ def terminate_existing_blocker() -> None: p = psutil.Process(pid) p.terminate() except psutil.NoSuchProcess: - pass - os.remove(os.path.join(config_dir, pid_file)) + print(f"Could not terminate {p.pid=}.") + pid_file = os.path.join(config.directory, pid_file) + try: + os.remove(pid_file) + except PermissionError: + print(f"Could not remove {pid_file=}.") -def init() -> None: - terminate_existing_blocker() - write_pid_file() - - -def load_blocked_browser_words() -> List[re.Pattern]: - config_dir = get_config_dir() - blocked_words_file = os.path.join(config_dir, BLOCKED_BROWSER_WORDS_TXT) - assert(os.path.isfile(blocked_words_file)) +def load_blocked_patterns(config: Config) -> List[re.Pattern]: blocked = [] - with open(blocked_words_file, "r") as f: - for line in f.readlines(): - line = line.strip() - r = re.compile(f"{line}.*(Firefox|Chromium)", flags=re.IGNORECASE) + for block in config.blocks: + for item in block.items: + s = "".join([block.prefix, item, block.postfix]) + r = re.compile(s, re.IGNORECASE) blocked.append(r) return blocked -def load_window_names() -> Set[str]: - config_dir = get_config_dir() - window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT) +def init(config: Config) -> None: + terminate_existing_blocker(config) + write_pid_file(config) + + +def run_checks(config: Config) -> None: + # shutil.which("xdotool") + # That's what we do if anything goes wrong: + # xkill --all + pass + + +def kill_window_if_blocked(blocked: List[re.Pattern]) -> None: + window_name, window_pid = get_active_window_name_and_pid() + if is_window_blocked(window_name, blocked): + kill_sequence(blocked) + + +def main_root(config: Config) -> None: + init(config) + blocked = load_blocked_patterns(config) + while True: + time.sleep(config.sleep_time) + run_checks(config) + kill_window_if_blocked(blocked) + + +def main() -> None: + """ Run main_root as root except while debugging. """ + config_path = "~/.config/focusfriend/config.json" + config = Config.load_config(config_path) + + if config.start_as_user: + terminate_existing_blocker(config) + main_root(config) + + if os.geteuid() == 0: + newpid = os.fork() + if newpid == 0: + main_root(config) + else: + cmd = ["sudo", config.focusfriend_py] + sys.argv[1:] + subprocess.Popen(cmd) + + +if __name__ == "__main__": + main() + + +def load_window_names(config: Config) -> Set[str]: + window_names_file = os.path.join(config.directory, config.window_names) if not os.path.isfile(window_names_file): return set() with open(window_names_file, "r") as f: return {l.strip() for l in f.readlines()} -def write_window_names(window_names: Set[str]) -> None: - config_dir = get_config_dir() - window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT) +def write_window_names(config: Config, window_names: Set[str]) -> None: + window_names_file = os.path.join(config.directory, config.window_names) window_names = "\n".join(sorted(list(window_names))) with open(window_names_file, "w") as f: - f.write(window_names) - - -def load_config() -> config.Config: - config_dir = get_config_dir() - config_file = os.path.join(config_dir, CONFIG_FILE) - config_obj = config.load_config(config_file) - - -def main() -> None: - # config = load_config() - init() - blocked = load_blocked_browser_words() - window_names = load_window_names() - counter = 0 - while True: - time.sleep(1) - window_name, window_pid = get_active_window_name_and_pid() - window_names.add(window_name) - if is_window_blocked(window_name, blocked): - kill_sequence(blocked) - if counter % 60 == 59: - write_window_names(window_names) - counter += 1 - - -def sudo_run() -> None: - """Run main as root except while debugging. - """ - try: - if sys.argv[1] == "--debug": - terminate_existing_blocker() - main() - return - except IndexError: - pass - - if os.geteuid() == 0: - newpid = os.fork() - if newpid == 0: - main() - else: - cmd = ["sudo", FOCUSFRIEND_PY] + sys.argv[1:] - subprocess.Popen(cmd) - - -if __name__ == "__main__": - sudo_run() + f.write(window_names) \ No newline at end of file