FocusFriend is now AntiDrift.

This commit is contained in:
2022-06-18 15:51:24 -04:00
parent 488aee46ab
commit 1d40434bf0
4 changed files with 2 additions and 11 deletions

194
antidrift.py Normal file
View File

@@ -0,0 +1,194 @@
#!/usr/bin/env python3
import os
import re
import subprocess
import shutil
import sys
import time
from typing import List, Tuple, Set
import psutil
from config import Config
def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool:
for b in blocked:
if b.findall(window_name):
return True
return False
def get_active_window_name_and_pid() -> Tuple[str, str]:
cmd = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"]
p = subprocess.run(cmd, capture_output=True)
if p.returncode != 0:
return ("", "")
window_name, window_pid, _ = p.stdout.decode().split("\n")
return (window_name, window_pid)
def kill_sequence(blocked: List[re.Pattern]) -> None:
def to_display(name: str) -> str:
return name if len(name) < 30 else name[:30] + "..."
for count in range(5, 0, -1):
window_name, window_pid = get_active_window_name_and_pid()
if not is_window_blocked(window_name, blocked):
notify(f"[okay] {to_display(window_name)}")
return
notify(f"[kill {count}s] {to_display(window_name)}")
time.sleep(1)
p = psutil.Process(int(window_pid))
p.terminate()
def notify(message: str) -> None:
""" Notify user via the Xorg notify-send command. """
env = {
**os.environ,
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
}
user = env.get("SUDO_USER", None)
if user is None:
cmd = ["notify-send", message]
else:
cmd = ["runuser", "-m", "-u", user, "notify-send", message]
subprocess.run(cmd, env=env)
def write_pid_file(config: Config) -> str:
p = psutil.Process()
pid_file = os.path.join(config.directory, f"{p.pid}.pid")
with open(pid_file, "w") as f:
f.write(str(p.pid))
return pid_file
def terminate_existing_blocker(config: Config) -> None:
this_pid = psutil.Process().pid
pid_files = [f for f in config.directory if f.endswith(".pid")]
for pid_file in pid_files:
pid = int(pid_file.replace(".pid", ""))
if this_pid == pid:
continue
try:
p = psutil.Process(pid)
p.terminate()
except psutil.NoSuchProcess:
print(f"Could not terminate {p.pid=}.")
pid_file = os.path.join(config.directory, pid_file)
try:
os.remove(pid_file)
except PermissionError:
print(f"Could not remove {pid_file=}.")
def load_blocked_patterns(config: Config) -> List[re.Pattern]:
blocked = []
for block in config.blocks:
for item in block.items:
s = "".join([block.prefix, item, block.postfix])
r = re.compile(s, re.IGNORECASE)
blocked.append(r)
return blocked
def init(config: Config) -> None:
terminate_existing_blocker(config)
write_pid_file(config)
def run_checks(config: Config) -> None:
# shutil.which("xdotool")
# That's what we do if anything goes wrong:
# xkill --all
pass
def kill_window_if_blocked(blocked: List[re.Pattern]) -> None:
window_name, window_pid = get_active_window_name_and_pid()
if is_window_blocked(window_name, blocked):
kill_sequence(blocked)
def is_process_active(process_name: str) -> bool:
for proc in psutil.process_iter():
if proc.name() == process_name:
return True
return False
def enforce_aw_commit():
def aw_commit_active():
return is_process_active("aw-commit")
def to_display(name: str) -> str:
return name if len(name) < 30 else name[:30] + "..."
if aw_commit_active():
return
for _ in range(10, 0, -1):
notify(f"[warning] aw-commit not running")
time.sleep(1)
if aw_commit_active():
return
if aw_commit_active():
return
window_name, window_pid = get_active_window_name_and_pid()
if window_name:
notify(f"[kill aw-commit not running] {to_display(window_name)}")
p = psutil.Process(int(window_pid))
p.terminate()
return enforce_aw_commit()
def main_root(config: Config) -> None:
init(config)
blocked = load_blocked_patterns(config)
while True:
time.sleep(config.sleep_time)
run_checks(config)
kill_window_if_blocked(blocked)
if config.enforce_aw_commit:
enforce_aw_commit()
def main() -> None:
""" Run main_root as root except while debugging. """
config_path = "~/.config/aw-focus/config.json"
config = Config.load_config(config_path)
if config.start_as_user:
terminate_existing_blocker(config)
main_root(config)
if os.geteuid() == 0:
newpid = os.fork()
if newpid == 0:
main_root(config)
else:
cmd = ["sudo", config.aw_focus_cmd] + sys.argv[1:]
subprocess.Popen(cmd)
if __name__ == "__main__":
main()
def load_window_names(config: Config) -> Set[str]:
window_names_file = os.path.join(config.directory, config.window_names)
if not os.path.isfile(window_names_file):
return set()
with open(window_names_file, "r") as f:
return {l.strip() for l in f.readlines()}
def write_window_names(config: Config, window_names: Set[str]) -> None:
window_names_file = os.path.join(config.directory, config.window_names)
window_names = "\n".join(sorted(list(window_names)))
with open(window_names_file, "w") as f:
f.write(window_names)