Repurpose FocusFriend to get AntiDrift off the ground.

This commit is contained in:
2022-06-21 21:29:31 -04:00
parent 1d40434bf0
commit 0eed43c915
5 changed files with 293 additions and 183 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
config.yaml
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@@ -1,194 +1,82 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import logging
import re
import subprocess
import shutil import shutil
import sys import sys
import time import time
from typing import List, Tuple, Set import re
import psutil import xwindow
from config import Config from config import Config, Block
from typing import List
def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool: def check_for_xdotool():
for b in blocked: r = shutil.which("xdotool")
if b.findall(window_name): if not r:
return True logging.critical("Please install xdotool")
sys.exit(1)
def init_logging():
FORMAT = '%(levelname)-8s | %(message)s'
logging.basicConfig(format=FORMAT, level=logging.DEBUG)
def window_is_blocked(config: Config) -> bool:
# These should be selectable in the future (not all at the same time)
blackblocks = config.blackblocks
whiteblocks = config.whiteblocks
window = xwindow.XWindow()
if not window.keywords:
return False return False
for b in blackblocks:
for k in b.keywords:
if k in window.keywords:
xwindow.notify(f"{window.name[:30]} blocked by {b.name}.")
logging.warning(f"{window.name[:30]} blocked by {b.name}.")
return True
if config.blackblocks_only:
logging.debug(f"All non-blacklisted windows are allowed.")
return False
for w in whiteblocks:
for k in w.keywords:
if k in window.keywords:
logging.debug(f"{window.name[:30]} allowed by {w.name}.")
return False
xwindow.notify(f"{window.name[:30]} not on any whitelist.")
return True
def get_active_window_name_and_pid() -> Tuple[str, str]: def enforce(config: Config):
cmd = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"] if not window_is_blocked(config):
p = subprocess.run(cmd, capture_output=True)
if p.returncode != 0:
return ("", "")
window_name, window_pid, _ = p.stdout.decode().split("\n")
return (window_name, window_pid)
def kill_sequence(blocked: List[re.Pattern]) -> None:
def to_display(name: str) -> str:
return name if len(name) < 30 else name[:30] + "..."
for count in range(5, 0, -1):
window_name, window_pid = get_active_window_name_and_pid()
if not is_window_blocked(window_name, blocked):
notify(f"[okay] {to_display(window_name)}")
return return
notify(f"[kill {count}s] {to_display(window_name)}") xwindow.notify(f"AntiDrift will minimize the window in {config.minimize_delay} seconds.")
time.sleep(1) time.sleep(config.minimize_delay)
window = xwindow.XWindow()
p = psutil.Process(int(window_pid)) if window_is_blocked(config):
p.terminate() window = xwindow.XWindow()
xwindow.notify(f"Minimize {window.name[:30]}.")
window.minimize()
def notify(message: str) -> None:
""" Notify user via the Xorg notify-send command. """
env = {
**os.environ,
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
}
user = env.get("SUDO_USER", None)
if user is None:
cmd = ["notify-send", message]
else: else:
cmd = ["runuser", "-m", "-u", user, "notify-send", message] xwindow.notify(f"We are gucci again.")
subprocess.run(cmd, env=env)
def write_pid_file(config: Config) -> str: def session(config: Config):
p = psutil.Process() logging.info(f"Start session with {len(config.whiteblocks)} whiteblocks")
pid_file = os.path.join(config.directory, f"{p.pid}.pid")
with open(pid_file, "w") as f:
f.write(str(p.pid))
return pid_file
def terminate_existing_blocker(config: Config) -> None:
this_pid = psutil.Process().pid
pid_files = [f for f in config.directory if f.endswith(".pid")]
for pid_file in pid_files:
pid = int(pid_file.replace(".pid", ""))
if this_pid == pid:
continue
try:
p = psutil.Process(pid)
p.terminate()
except psutil.NoSuchProcess:
print(f"Could not terminate {p.pid=}.")
pid_file = os.path.join(config.directory, pid_file)
try:
os.remove(pid_file)
except PermissionError:
print(f"Could not remove {pid_file=}.")
def load_blocked_patterns(config: Config) -> List[re.Pattern]:
blocked = []
for block in config.blocks:
for item in block.items:
s = "".join([block.prefix, item, block.postfix])
r = re.compile(s, re.IGNORECASE)
blocked.append(r)
return blocked
def init(config: Config) -> None:
terminate_existing_blocker(config)
write_pid_file(config)
def run_checks(config: Config) -> None:
# shutil.which("xdotool")
# That's what we do if anything goes wrong:
# xkill --all
pass
def kill_window_if_blocked(blocked: List[re.Pattern]) -> None:
window_name, window_pid = get_active_window_name_and_pid()
if is_window_blocked(window_name, blocked):
kill_sequence(blocked)
def is_process_active(process_name: str) -> bool:
for proc in psutil.process_iter():
if proc.name() == process_name:
return True
return False
def enforce_aw_commit():
def aw_commit_active():
return is_process_active("aw-commit")
def to_display(name: str) -> str:
return name if len(name) < 30 else name[:30] + "..."
if aw_commit_active():
return
for _ in range(10, 0, -1):
notify(f"[warning] aw-commit not running")
time.sleep(1)
if aw_commit_active():
return
if aw_commit_active():
return
window_name, window_pid = get_active_window_name_and_pid()
if window_name:
notify(f"[kill aw-commit not running] {to_display(window_name)}")
p = psutil.Process(int(window_pid))
p.terminate()
return enforce_aw_commit()
def main_root(config: Config) -> None:
init(config)
blocked = load_blocked_patterns(config)
while True: while True:
time.sleep(config.sleep_time) enforce(config)
run_checks(config) time.sleep(config.check_delay)
kill_window_if_blocked(blocked)
if config.enforce_aw_commit:
enforce_aw_commit()
def main() -> None: def main() -> None:
""" Run main_root as root except while debugging. """ init_logging()
config_path = "~/.config/aw-focus/config.json" logging.info("AntiDrift running")
config = Config.load_config(config_path) check_for_xdotool()
config = Config.load("config.yaml")
if config.start_as_user: session(config)
terminate_existing_blocker(config)
main_root(config)
if os.geteuid() == 0:
newpid = os.fork()
if newpid == 0:
main_root(config)
else:
cmd = ["sudo", config.aw_focus_cmd] + sys.argv[1:]
subprocess.Popen(cmd)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
def load_window_names(config: Config) -> Set[str]:
window_names_file = os.path.join(config.directory, config.window_names)
if not os.path.isfile(window_names_file):
return set()
with open(window_names_file, "r") as f:
return {l.strip() for l in f.readlines()}
def write_window_names(config: Config, window_names: Set[str]) -> None:
window_names_file = os.path.join(config.directory, config.window_names)
window_names = "\n".join(sorted(list(window_names)))
with open(window_names_file, "w") as f:
f.write(window_names)

View File

@@ -1,33 +1,36 @@
import json import json
import os import os
import yaml
from pathlib import Path
from typing import List from typing import List
from pydantic import BaseModel from pydantic import BaseModel
class Block(BaseModel): class Block(BaseModel):
name: str = '' name: str
prefix: str = '' keywords: List[str]
postfix: str = ''
items: List[str]
def load(filename: str) -> List[Block]:
result = [Block(path=filename, **block) for block in block_list]
return result
class Config(BaseModel): class Config(BaseModel):
directory: str blackblocks: List[Block]
blocks: List[Block] whiteblocks: List[Block]
start_as_user: bool = True check_delay: int = 1
sleep_time: int = 1 minimize_delay: int = 5
aw_focus_cmd: str = "aw-focus" blackblocks_only: bool = True
window_names: str = "window_names.txt"
enforce_aw_commit: bool = True
class Config: class Config:
extra = 'forbid' extra = 'forbid'
@classmethod @classmethod
def load_config(cls, config_file: str) -> Config: def load(cls, config_file: str) -> Config:
config_file = os.path.expanduser(config_file) config_file = os.path.expanduser(config_file)
with open(config_file, 'r', encoding='utf8') as f: with open(config_file, "r") as f:
config_dict = json.load(f) config_dict = yaml.safe_load(f)
config = cls(**config_dict) config = cls(**config_dict)
config.directory = os.path.expanduser(config.directory)
return config return config

171
obsolete.py Normal file
View File

@@ -0,0 +1,171 @@
def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool:
for b in blocked:
if b.findall(window_name):
return True
return False
def kill_sequence(blocked: List[re.Pattern]) -> None:
def to_display(name: str) -> str:
return name if len(name) < 30 else name[:30] + "..."
for count in range(5, 0, -1):
window_name, window_pid = get_active_window_name_and_pid()
if not is_window_blocked(window_name, blocked):
notify(f"[okay] {to_display(window_name)}")
return
notify(f"[kill {count}s] {to_display(window_name)}")
time.sleep(1)
p = psutil.Process(int(window_pid))
p.terminate()
def notify(message: str) -> None:
""" Notify user via the Xorg notify-send command. """
env = {
**os.environ,
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
}
user = env.get("SUDO_USER", None)
if user is None:
cmd = ["notify-send", message]
else:
cmd = ["runuser", "-m", "-u", user, "notify-send", message]
subprocess.run(cmd, env=env)
def write_pid_file(config: Config) -> str:
p = psutil.Process()
pid_file = os.path.join(config.directory, f"{p.pid}.pid")
with open(pid_file, "w") as f:
f.write(str(p.pid))
return pid_file
def terminate_existing_blocker(config: Config) -> None:
this_pid = psutil.Process().pid
pid_files = [f for f in config.directory if f.endswith(".pid")]
for pid_file in pid_files:
pid = int(pid_file.replace(".pid", ""))
if this_pid == pid:
continue
try:
p = psutil.Process(pid)
p.terminate()
except psutil.NoSuchProcess:
print(f"Could not terminate {p.pid=}.")
pid_file = os.path.join(config.directory, pid_file)
try:
os.remove(pid_file)
except PermissionError:
print(f"Could not remove {pid_file=}.")
def load_blocked_patterns(config: Config) -> List[re.Pattern]:
blocked = []
for block in config.blocks:
for item in block.items:
s = "".join([block.prefix, item, block.postfix])
r = re.compile(s, re.IGNORECASE)
blocked.append(r)
return blocked
def init(config: Config) -> None:
terminate_existing_blocker(config)
write_pid_file(config)
def run_checks(config: Config) -> None:
# shutil.which("xdotool")
# That's what we do if anything goes wrong:
# xkill --all
pass
def kill_window_if_blocked(blocked: List[re.Pattern]) -> None:
window_name, window_pid = get_active_window_name_and_pid()
if is_window_blocked(window_name, blocked):
kill_sequence(blocked)
def is_process_active(process_name: str) -> bool:
for proc in psutil.process_iter():
if proc.name() == process_name:
return True
return False
def enforce_aw_commit():
def aw_commit_active():
return is_process_active("aw-commit")
def to_display(name: str) -> str:
return name if len(name) < 30 else name[:30] + "..."
if aw_commit_active():
return
for _ in range(10, 0, -1):
notify(f"[warning] aw-commit not running")
time.sleep(1)
if aw_commit_active():
return
if aw_commit_active():
return
window_name, window_pid = get_active_window_name_and_pid()
if window_name:
notify(f"[kill aw-commit not running] {to_display(window_name)}")
p = psutil.Process(int(window_pid))
p.terminate()
return enforce_aw_commit()
def load_window_names(config: Config) -> Set[str]:
window_names_file = os.path.join(config.directory, config.window_names)
if not os.path.isfile(window_names_file):
return set()
with open(window_names_file, "r") as f:
return {l.strip() for l in f.readlines()}
def write_window_names(config: Config, window_names: Set[str]) -> None:
window_names_file = os.path.join(config.directory, config.window_names)
window_names = "\n".join(sorted(list(window_names)))
with open(window_names_file, "w") as f:
f.write(window_names)
def main() -> None:
""" Run main_root as root except while debugging. """
config_path = "~/.config/aw-focus/config.json"
config = Config.load_config(config_path)
if config.start_as_user:
terminate_existing_blocker(config)
main_root(config)
if os.geteuid() == 0:
newpid = os.fork()
if newpid == 0:
main_root(config)
else:
cmd = ["sudo", config.aw_focus_cmd] + sys.argv[1:]
subprocess.Popen(cmd)
def main_root(config: Config) -> None:
init(config)
blocked = load_blocked_patterns(config)
while True:
time.sleep(config.sleep_time)
run_checks(config)
kill_window_if_blocked(blocked)
if config.enforce_aw_commit:
enforce_aw_commit()

47
xwindow.py Normal file
View File

@@ -0,0 +1,47 @@
import os
import re
import subprocess
import logging
class XWindow:
def __init__(self):
self.window = self._run(["getactivewindow"])
if not self.window:
self.name = ""
self.cls = ""
self.pid = ""
else:
self.name = self._run(["getwindowname", self.window])
self.cls = self._run(["getwindowclassname", self.window])
self.pid = self._run(["getwindowpid", self.window])
self.keywords = list(re.findall("\w+", self.name.lower()))
def _run(self, cmd) -> str:
cmd = ["xdotool"] + cmd
p = subprocess.run(cmd, capture_output=True)
if p.returncode != 0:
return ""
return p.stdout.decode().strip()
def minimize(self):
self._run(["windowminimize", self.window])
def quit(self):
self._run(["windowquit", self.window])
def notify(message: str) -> None:
""" Notify user via the Xorg notify-send command. """
logging.debug(f"notify: {message}")
env = {
**os.environ,
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
}
user = env.get("SUDO_USER", None)
if user is None:
cmd = ["notify-send", message]
else:
cmd = ["runuser", "-m", "-u", user, "notify-send", message]
subprocess.run(cmd, env=env)