Implement block list and window name logging.
This commit is contained in:
121
blocker.py
121
blocker.py
@@ -1,121 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import os
|
|
||||||
import psutil
|
|
||||||
import re
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
|
|
||||||
BLOCKER = "blocker.py"
|
|
||||||
BLOCKER_CONFIG_DIR = "/home/{}/.config/focusfriend"
|
|
||||||
BLOCKED_BROWSER_WORDS = ["mogelpower", "DER SPIEGEL", "nitter"]
|
|
||||||
|
|
||||||
|
|
||||||
def is_window_blocked(window_name, blocked):
|
|
||||||
for b in blocked:
|
|
||||||
if type(b) is str and b == window_name:
|
|
||||||
return True
|
|
||||||
elif type(b) is re.Pattern and b.findall(window_name):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def get_active_window_name_and_pid():
|
|
||||||
CMD = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"]
|
|
||||||
p = subprocess.run(CMD, capture_output=True)
|
|
||||||
if p.returncode != 0:
|
|
||||||
return "", ""
|
|
||||||
window_name, window_pid, _ = p.stdout.decode().split("\n")
|
|
||||||
return window_name, window_pid
|
|
||||||
|
|
||||||
|
|
||||||
def find_window_name(window_name):
|
|
||||||
CMD = ["xdotool", "search", window_name, "getwindowpid"]
|
|
||||||
p = subprocess.run(CMD, capture_output=True)
|
|
||||||
if p.returncode != 0:
|
|
||||||
return ""
|
|
||||||
l = p.stdout.decode().split("\n")
|
|
||||||
|
|
||||||
|
|
||||||
def init_kill_sequence(blocked):
|
|
||||||
count = 5
|
|
||||||
while True:
|
|
||||||
window_name, window_pid = get_active_window_name_and_pid()
|
|
||||||
if not is_window_blocked(window_name, blocked):
|
|
||||||
notify(f"{window_name} is okay. Return from kill sequence.")
|
|
||||||
return
|
|
||||||
notify(f"{window_name} is blocked. Kill in {count} seconds.")
|
|
||||||
if count == 0:
|
|
||||||
p = psutil.Process(int(window_pid))
|
|
||||||
p.kill()
|
|
||||||
return
|
|
||||||
time.sleep(1)
|
|
||||||
count -= 1
|
|
||||||
|
|
||||||
|
|
||||||
def notify(message):
|
|
||||||
env = {
|
|
||||||
**os.environ,
|
|
||||||
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
|
|
||||||
}
|
|
||||||
user = env["SUDO_USER"]
|
|
||||||
CMD = ["runuser", "-m", "-u", user, "notify-send", message]
|
|
||||||
p = subprocess.run(CMD, env=env)
|
|
||||||
|
|
||||||
|
|
||||||
def get_config_dir() -> str:
|
|
||||||
user = os.environ["SUDO_USER"]
|
|
||||||
config_dir = BLOCKER_CONFIG_DIR.format(user)
|
|
||||||
assert(os.path.isdir(config_dir))
|
|
||||||
return config_dir
|
|
||||||
|
|
||||||
|
|
||||||
def write_pid_file() -> str:
|
|
||||||
p = psutil.Process()
|
|
||||||
pid_file = os.path.join(get_config_dir(), f"{p.pid}.pid")
|
|
||||||
with open(pid_file, "w") as f:
|
|
||||||
f.write(pid_file)
|
|
||||||
return pid_file
|
|
||||||
|
|
||||||
|
|
||||||
def terminate_existing_blocker() -> None:
|
|
||||||
this_pid = psutil.Process().pid
|
|
||||||
config_dir = get_config_dir()
|
|
||||||
pid_files = [f for f in os.listdir(config_dir) if f.endswith(".pid")]
|
|
||||||
for pid_file in pid_files:
|
|
||||||
pid = int(pid_file.replace(".pid", ""))
|
|
||||||
if this_pid == pid:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
p = psutil.Process(pid)
|
|
||||||
p.terminate()
|
|
||||||
except psutil.NoSuchProcess:
|
|
||||||
pass
|
|
||||||
os.remove(os.path.join(config_dir, pid_file))
|
|
||||||
|
|
||||||
|
|
||||||
def init() -> None:
|
|
||||||
terminate_existing_blocker()
|
|
||||||
write_pid_file()
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
init()
|
|
||||||
blocked = [re.compile(f"{word}.*(Firefox|Chromium)", flags=re.IGNORECASE)
|
|
||||||
for word in BLOCKED_BROWSER_WORDS]
|
|
||||||
while True:
|
|
||||||
time.sleep(1)
|
|
||||||
window_name, window_pid = get_active_window_name_and_pid()
|
|
||||||
if is_window_blocked(window_name, blocked):
|
|
||||||
init_kill_sequence(blocked)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
if os.geteuid() == 0:
|
|
||||||
newpid = os.fork()
|
|
||||||
if newpid == 0:
|
|
||||||
main()
|
|
||||||
else:
|
|
||||||
cmd = ["sudo", BLOCKER] + sys.argv[1:]
|
|
||||||
subprocess.Popen(cmd)
|
|
||||||
150
focusfriend.py
Executable file
150
focusfriend.py
Executable file
@@ -0,0 +1,150 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import psutil
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from typing import List, Tuple, Set
|
||||||
|
|
||||||
|
FOCUSFRIEND_PY = "focusfriend.py"
|
||||||
|
BLOCKER_CONFIG_DIR = "/home/{}/.config/focusfriend"
|
||||||
|
BLOCKED_BROWSER_WORDS_TXT = "blocked_browser_words.txt"
|
||||||
|
WINDOW_NAMES_TXT = "window_names.txt"
|
||||||
|
|
||||||
|
|
||||||
|
def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool:
|
||||||
|
for b in blocked:
|
||||||
|
if b.findall(window_name):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_active_window_name_and_pid() -> Tuple[str, str]:
|
||||||
|
CMD = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"]
|
||||||
|
p = subprocess.run(CMD, capture_output=True)
|
||||||
|
if p.returncode != 0:
|
||||||
|
return ("", "")
|
||||||
|
window_name, window_pid, _ = p.stdout.decode().split("\n")
|
||||||
|
return (window_name, window_pid)
|
||||||
|
|
||||||
|
|
||||||
|
def kill_sequence(blocked: List[re.Pattern]) -> None:
|
||||||
|
|
||||||
|
def to_display(name: str) -> str:
|
||||||
|
return name if len(name) < 30 else name[:30] + "..."
|
||||||
|
|
||||||
|
for count in range(5, 0, -1):
|
||||||
|
window_name, window_pid = get_active_window_name_and_pid()
|
||||||
|
if not is_window_blocked(window_name, blocked):
|
||||||
|
notify(f"[okay] {to_display(window_name)}")
|
||||||
|
return
|
||||||
|
notify(f"[kill {count}s] {to_display(window_name)}")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
p = psutil.Process(int(window_pid))
|
||||||
|
p.terminate()
|
||||||
|
|
||||||
|
|
||||||
|
def notify(message: str) -> None:
|
||||||
|
env = {
|
||||||
|
**os.environ,
|
||||||
|
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
|
||||||
|
}
|
||||||
|
user = env["SUDO_USER"]
|
||||||
|
CMD = ["runuser", "-m", "-u", user, "notify-send", message]
|
||||||
|
p = subprocess.run(CMD, env=env)
|
||||||
|
|
||||||
|
|
||||||
|
def get_config_dir() -> str:
|
||||||
|
user = os.environ["SUDO_USER"]
|
||||||
|
config_dir = BLOCKER_CONFIG_DIR.format(user)
|
||||||
|
assert(os.path.isdir(config_dir))
|
||||||
|
return config_dir
|
||||||
|
|
||||||
|
|
||||||
|
def write_pid_file() -> str:
|
||||||
|
p = psutil.Process()
|
||||||
|
pid_file = os.path.join(get_config_dir(), f"{p.pid}.pid")
|
||||||
|
with open(pid_file, "w") as f:
|
||||||
|
f.write(pid_file)
|
||||||
|
return pid_file
|
||||||
|
|
||||||
|
|
||||||
|
def terminate_existing_blocker() -> None:
|
||||||
|
this_pid = psutil.Process().pid
|
||||||
|
config_dir = get_config_dir()
|
||||||
|
pid_files = [f for f in os.listdir(config_dir) if f.endswith(".pid")]
|
||||||
|
for pid_file in pid_files:
|
||||||
|
pid = int(pid_file.replace(".pid", ""))
|
||||||
|
if this_pid == pid:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
p = psutil.Process(pid)
|
||||||
|
p.terminate()
|
||||||
|
except psutil.NoSuchProcess:
|
||||||
|
pass
|
||||||
|
os.remove(os.path.join(config_dir, pid_file))
|
||||||
|
|
||||||
|
|
||||||
|
def init() -> None:
|
||||||
|
terminate_existing_blocker()
|
||||||
|
write_pid_file()
|
||||||
|
|
||||||
|
|
||||||
|
def load_blocked_browser_words() -> List[re.Pattern]:
|
||||||
|
config_dir = get_config_dir()
|
||||||
|
blocked_words_file = os.path.join(config_dir, BLOCKED_BROWSER_WORDS_TXT)
|
||||||
|
assert(os.path.isfile(blocked_words_file))
|
||||||
|
blocked = []
|
||||||
|
with open(blocked_words_file, "r") as f:
|
||||||
|
for line in f.readlines():
|
||||||
|
line = line.strip()
|
||||||
|
r = re.compile(f"{line}.*(Firefox|Chromium)", flags=re.IGNORECASE)
|
||||||
|
blocked.append(r)
|
||||||
|
return blocked
|
||||||
|
|
||||||
|
|
||||||
|
def load_window_names() -> Set[str]:
|
||||||
|
config_dir = get_config_dir()
|
||||||
|
window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT)
|
||||||
|
if not os.path.isfile(window_names_file):
|
||||||
|
return set()
|
||||||
|
with open(window_names_file, "r") as f:
|
||||||
|
return {l.strip() for l in f.readlines()}
|
||||||
|
|
||||||
|
|
||||||
|
def write_window_names(window_names: Set[str]) -> None:
|
||||||
|
config_dir = get_config_dir()
|
||||||
|
window_names = sorted(list(window_names))
|
||||||
|
window_names = "\n".join(window_names)
|
||||||
|
window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT)
|
||||||
|
with open(window_names_file, "w") as f:
|
||||||
|
f.write(window_names_file)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
init()
|
||||||
|
blocked = load_blocked_browser_words()
|
||||||
|
window_names = load_window_names()
|
||||||
|
counter = 0
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
window_name, window_pid = get_active_window_name_and_pid()
|
||||||
|
window_names.add(window_name)
|
||||||
|
if is_window_blocked(window_name, blocked):
|
||||||
|
kill_sequence(blocked)
|
||||||
|
if counter % 60 == 59:
|
||||||
|
write_window_names(window_names)
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if os.geteuid() == 0:
|
||||||
|
newpid = os.fork()
|
||||||
|
if newpid == 0:
|
||||||
|
main()
|
||||||
|
else:
|
||||||
|
cmd = ["sudo", FOCUSFRIEND_PY] + sys.argv[1:]
|
||||||
|
subprocess.Popen(cmd)
|
||||||
Reference in New Issue
Block a user