From bf491f5e1936a18d0ef501e7e83302302625dfee Mon Sep 17 00:00:00 2001 From: Felix Martin Date: Wed, 20 Oct 2021 22:09:00 -0400 Subject: [PATCH] Implement block list and window name logging. --- blocker.py | 121 --------------------------------------- focusfriend.py | 150 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 121 deletions(-) delete mode 100755 blocker.py create mode 100755 focusfriend.py diff --git a/blocker.py b/blocker.py deleted file mode 100755 index 18aca70..0000000 --- a/blocker.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 - -import os -import psutil -import re -import subprocess -import sys -import time - -BLOCKER = "blocker.py" -BLOCKER_CONFIG_DIR = "/home/{}/.config/focusfriend" -BLOCKED_BROWSER_WORDS = ["mogelpower", "DER SPIEGEL", "nitter"] - - -def is_window_blocked(window_name, blocked): - for b in blocked: - if type(b) is str and b == window_name: - return True - elif type(b) is re.Pattern and b.findall(window_name): - return True - return False - - -def get_active_window_name_and_pid(): - CMD = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"] - p = subprocess.run(CMD, capture_output=True) - if p.returncode != 0: - return "", "" - window_name, window_pid, _ = p.stdout.decode().split("\n") - return window_name, window_pid - - -def find_window_name(window_name): - CMD = ["xdotool", "search", window_name, "getwindowpid"] - p = subprocess.run(CMD, capture_output=True) - if p.returncode != 0: - return "" - l = p.stdout.decode().split("\n") - - -def init_kill_sequence(blocked): - count = 5 - while True: - window_name, window_pid = get_active_window_name_and_pid() - if not is_window_blocked(window_name, blocked): - notify(f"{window_name} is okay. Return from kill sequence.") - return - notify(f"{window_name} is blocked. Kill in {count} seconds.") - if count == 0: - p = psutil.Process(int(window_pid)) - p.kill() - return - time.sleep(1) - count -= 1 - - -def notify(message): - env = { - **os.environ, - "DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus" - } - user = env["SUDO_USER"] - CMD = ["runuser", "-m", "-u", user, "notify-send", message] - p = subprocess.run(CMD, env=env) - - -def get_config_dir() -> str: - user = os.environ["SUDO_USER"] - config_dir = BLOCKER_CONFIG_DIR.format(user) - assert(os.path.isdir(config_dir)) - return config_dir - - -def write_pid_file() -> str: - p = psutil.Process() - pid_file = os.path.join(get_config_dir(), f"{p.pid}.pid") - with open(pid_file, "w") as f: - f.write(pid_file) - return pid_file - - -def terminate_existing_blocker() -> None: - this_pid = psutil.Process().pid - config_dir = get_config_dir() - pid_files = [f for f in os.listdir(config_dir) if f.endswith(".pid")] - for pid_file in pid_files: - pid = int(pid_file.replace(".pid", "")) - if this_pid == pid: - continue - try: - p = psutil.Process(pid) - p.terminate() - except psutil.NoSuchProcess: - pass - os.remove(os.path.join(config_dir, pid_file)) - - -def init() -> None: - terminate_existing_blocker() - write_pid_file() - - -def main() -> None: - init() - blocked = [re.compile(f"{word}.*(Firefox|Chromium)", flags=re.IGNORECASE) - for word in BLOCKED_BROWSER_WORDS] - while True: - time.sleep(1) - window_name, window_pid = get_active_window_name_and_pid() - if is_window_blocked(window_name, blocked): - init_kill_sequence(blocked) - - -if __name__ == "__main__": - if os.geteuid() == 0: - newpid = os.fork() - if newpid == 0: - main() - else: - cmd = ["sudo", BLOCKER] + sys.argv[1:] - subprocess.Popen(cmd) diff --git a/focusfriend.py b/focusfriend.py new file mode 100755 index 0000000..34d821e --- /dev/null +++ b/focusfriend.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 + +import os +import psutil +import re +import subprocess +import sys +import time +from typing import List, Tuple, Set + +FOCUSFRIEND_PY = "focusfriend.py" +BLOCKER_CONFIG_DIR = "/home/{}/.config/focusfriend" +BLOCKED_BROWSER_WORDS_TXT = "blocked_browser_words.txt" +WINDOW_NAMES_TXT = "window_names.txt" + + +def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool: + for b in blocked: + if b.findall(window_name): + return True + return False + + +def get_active_window_name_and_pid() -> Tuple[str, str]: + CMD = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"] + p = subprocess.run(CMD, capture_output=True) + if p.returncode != 0: + return ("", "") + window_name, window_pid, _ = p.stdout.decode().split("\n") + return (window_name, window_pid) + + +def kill_sequence(blocked: List[re.Pattern]) -> None: + + def to_display(name: str) -> str: + return name if len(name) < 30 else name[:30] + "..." + + for count in range(5, 0, -1): + window_name, window_pid = get_active_window_name_and_pid() + if not is_window_blocked(window_name, blocked): + notify(f"[okay] {to_display(window_name)}") + return + notify(f"[kill {count}s] {to_display(window_name)}") + time.sleep(1) + + p = psutil.Process(int(window_pid)) + p.terminate() + + +def notify(message: str) -> None: + env = { + **os.environ, + "DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus" + } + user = env["SUDO_USER"] + CMD = ["runuser", "-m", "-u", user, "notify-send", message] + p = subprocess.run(CMD, env=env) + + +def get_config_dir() -> str: + user = os.environ["SUDO_USER"] + config_dir = BLOCKER_CONFIG_DIR.format(user) + assert(os.path.isdir(config_dir)) + return config_dir + + +def write_pid_file() -> str: + p = psutil.Process() + pid_file = os.path.join(get_config_dir(), f"{p.pid}.pid") + with open(pid_file, "w") as f: + f.write(pid_file) + return pid_file + + +def terminate_existing_blocker() -> None: + this_pid = psutil.Process().pid + config_dir = get_config_dir() + pid_files = [f for f in os.listdir(config_dir) if f.endswith(".pid")] + for pid_file in pid_files: + pid = int(pid_file.replace(".pid", "")) + if this_pid == pid: + continue + try: + p = psutil.Process(pid) + p.terminate() + except psutil.NoSuchProcess: + pass + os.remove(os.path.join(config_dir, pid_file)) + + +def init() -> None: + terminate_existing_blocker() + write_pid_file() + + +def load_blocked_browser_words() -> List[re.Pattern]: + config_dir = get_config_dir() + blocked_words_file = os.path.join(config_dir, BLOCKED_BROWSER_WORDS_TXT) + assert(os.path.isfile(blocked_words_file)) + blocked = [] + with open(blocked_words_file, "r") as f: + for line in f.readlines(): + line = line.strip() + r = re.compile(f"{line}.*(Firefox|Chromium)", flags=re.IGNORECASE) + blocked.append(r) + return blocked + + +def load_window_names() -> Set[str]: + config_dir = get_config_dir() + window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT) + if not os.path.isfile(window_names_file): + return set() + with open(window_names_file, "r") as f: + return {l.strip() for l in f.readlines()} + + +def write_window_names(window_names: Set[str]) -> None: + config_dir = get_config_dir() + window_names = sorted(list(window_names)) + window_names = "\n".join(window_names) + window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT) + with open(window_names_file, "w") as f: + f.write(window_names_file) + + +def main() -> None: + init() + blocked = load_blocked_browser_words() + window_names = load_window_names() + counter = 0 + while True: + time.sleep(1) + window_name, window_pid = get_active_window_name_and_pid() + window_names.add(window_name) + if is_window_blocked(window_name, blocked): + kill_sequence(blocked) + if counter % 60 == 59: + write_window_names(window_names) + counter += 1 + + +if __name__ == "__main__": + if os.geteuid() == 0: + newpid = os.fork() + if newpid == 0: + main() + else: + cmd = ["sudo", FOCUSFRIEND_PY] + sys.argv[1:] + subprocess.Popen(cmd)