Files
antidrift/antidrift/daemon.py

262 lines
9.4 KiB
Python

import dbus
import dbus.service
import logging
import os
import pwd
import re
import sys
import time
import antidrift.xwindow as xwindow
from antidrift.xwindow import XWindow
from antidrift.config import Config, State, Block
from gi.repository import GLib, Gio
from typing import List, Optional
BUS_NAME = "com.antidrift"
IFACE = "com.antidrift"
OPATH = "/com/antidrift"
def reload_callback(m, f, o, event):
filename = f.get_basename()
m = f"[dark_orange3]Restart after change in '{filename}'.[/dark_orange3]"
logging.warning(m)
os.execv(sys.executable, ["python3"] + sys.argv)
def get_dbus_interface() -> Optional[dbus.Interface]:
try:
bus = dbus.SessionBus()
bus_object = bus.get_object(BUS_NAME, OPATH)
interface = dbus.Interface(bus_object, IFACE)
return interface
except dbus.exceptions.DBusException:
return None
class AntiDriftDaemon(dbus.service.Object):
def __init__(self, config: Config):
user_name = os.environ.get("SUDO_USER", pwd.getpwuid(os.getuid()).pw_name)
user_uid = pwd.getpwnam(user_name).pw_uid
euid = os.geteuid()
os.seteuid(user_uid)
bus = dbus.bus.BusConnection(f"unix:path=/run/user/{user_uid}/bus")
os.seteuid(euid)
bus.request_name(BUS_NAME)
bus_name = dbus.service.BusName(BUS_NAME, bus=bus)
dbus.service.Object.__init__(self, bus_name, OPATH)
self.config = config
self.reset_block_state()
self.enforce_count = 0
self.enforce_value = int(config.enforce_delay_ms / config.polling_cycle_ms)
def reset_block_state(self):
self.state = State(
active_blackblocks=self.config.blackblocks,
active_whiteblocks=[],
inactive_blackblocks=[],
)
@dbus.service.method(dbus_interface=IFACE, in_signature="as", out_signature="s")
def start(self, whiteblocks: List[str]) -> str:
self.reset_block_state()
all_whiteblocks = {wb.name: wb for wb in self.config.whiteblocks}
success_wbs, fail_blocks = [], []
for block_name in whiteblocks:
if block_name in all_whiteblocks:
self.state.active_whiteblocks.append(all_whiteblocks[block_name])
success_wbs.append(block_name)
else:
fail_blocks.append(block_name)
if success_wbs:
wbs = ", ".join(success_wbs)
r = f"Start whiteblocks [sky_blue3]{wbs}[/sky_blue3]."
logging.info(r)
else:
r = "No whiteblocks started."
if fail_blocks:
m = f"No whiteblocks [red3]{', '.join(fail_blocks)}[/red3]."
logging.warning(m)
return r
@dbus.service.method(dbus_interface=IFACE, in_signature="s", out_signature="s")
def schedule(self, blackblock_name: str) -> str:
"""Schedule blackblock based if it has a non-zero timeout value."""
all_blackblocks = {bb.name: bb for bb in self.config.blackblocks}
if blackblock_name not in all_blackblocks:
m = f"No blackblock [red3]{blackblock_name}[/red3]."
logging.warning(m)
return m
blackblock = all_blackblocks[blackblock_name]
if blackblock.delay == 0:
m = f"Blackblock [red3]{blackblock_name}[/red3] cannot be scheduled without delay."
logging.warning(m)
return m
def allow():
self.allow_blackblock(blackblock)
delay_ms = blackblock.delay * 1000 * 60
GLib.timeout_add(delay_ms, allow)
m = f"Scheduled [sky_blue3]{blackblock_name}[/sky_blue3] in {blackblock.delay} minutes."
logging.info(m)
return m
@dbus.service.method(dbus_interface=IFACE, in_signature="", out_signature="s")
def stop(self) -> str:
self.reset_block_state()
m = "Blacklist only mode."
logging.info(m)
return m
@dbus.service.method(dbus_interface=IFACE, in_signature="", out_signature="s")
def status(self) -> str:
white_active = bool(self.state.active_whiteblocks)
black_active = bool(self.state.active_blackblocks)
m = "🟢 ad "
inactive_bbs = " ".join(
map(lambda b: "-" + b.name, self.state.inactive_blackblocks)
)
match (white_active, black_active):
case (True, _):
m += "wb: "
m += " ".join(map(lambda b: b.name, self.state.active_whiteblocks))
if inactive_bbs:
m += " "
m += inactive_bbs
case (False, True):
m += "bb"
if inactive_bbs:
m += ": "
m += inactive_bbs
case _:
m = "inactive"
return m
def allow_blackblock(self, blackblock: Block):
if blackblock in self.state.active_blackblocks:
self.state.active_blackblocks.remove(blackblock)
if blackblock not in self.state.inactive_blackblocks:
self.state.inactive_blackblocks.append(blackblock)
m = f"Blackblock [sky_blue3]{blackblock.name}[/sky_blue3] is now allowed."
logging.info(m)
def get_intention(self) -> str:
s = " ".join(map(lambda b: b.name, self.state.active_whiteblocks))
return f"intention is {s} work" if s else "no intention"
def run(self, debug: bool = False):
def _enforce():
self.enforce()
GLib.timeout_add(self.config.polling_cycle_ms, _enforce)
def _log():
self.config.window_log_file.parent.mkdir(parents=True, exist_ok=True)
window = XWindow()
ts = int(time.time())
intention = self.get_intention()
log_line = f"{ts}, {window.name}, {window.cls}, {intention}\n"
with self.config.window_log_file.open('a') as f:
f.write(log_line)
ONE_MINUTE_IN_MS = 60 * 1000
GLib.timeout_add(ONE_MINUTE_IN_MS, _log)
# autorestart on file change for development
monitors = []
files = [
"antidrift.py",
"antidrift/daemon.py",
"antidrift/client.py",
"antidrift/config.py",
]
if debug:
logging.warning("[red]Running in debug mode.[/red]")
for filename in files:
gio_file = Gio.File.new_for_path(filename)
monitor = gio_file.monitor_file(Gio.FileMonitorFlags.NONE, None)
monitor.connect("changed", reload_callback)
monitors.append(monitor)
_enforce()
_log()
mainloop = GLib.MainLoop()
mainloop.run()
xwindow.notify(f"AntiDrift running.")
def enforce(self):
if self.enforce_count >= self.enforce_value:
window = XWindow()
xwindow.notify(f"Minimize {window.name[:30]}.")
window.minimize()
self.enforce_count = 0
elif self.enforce_count > 0 and window_is_blocked(self.state, True):
self.enforce_count += 1
elif self.enforce_count == 0 and window_is_blocked(self.state):
self.enforce_count += 1
delay = int(self.config.enforce_delay_ms / 1000)
xwindow.notify(f"AntiDrift will minimize in {delay}s.")
elif self.enforce_count > 0:
xwindow.notify("We are gucci again.")
self.enforce_count = 0
def window_is_blocked(state: State, silent: bool = False) -> bool:
blackblocks = state.active_blackblocks
whiteblocks = state.active_whiteblocks
window = XWindow()
if not window.keywords:
return False
def keyword_matches_window(keyword: str, window: XWindow):
if keyword.startswith("/") and keyword.endswith("/"):
try:
r = re.compile(keyword[1:-1], re.IGNORECASE)
if r.findall(window.name):
return True
else:
return False
except re.error:
m = f"Invalid regex [red3]{keyword}[/red3]."
logging.warning(m)
return False
else:
if k in window.keywords:
return True
return False
for b in blackblocks:
for k in b.keywords:
if keyword_matches_window(k, window) and b.kill:
window.kill()
xwindow.notify(f"Kill for {k} on {b.name}.")
logging.warning(f"Kill for [red]{k}[/red] on [red]{b.name}[/red].")
return True
elif keyword_matches_window(k, window):
if not silent:
xwindow.notify(f"{window.name[:30]} blocked by {b.name}.")
logging.warning(
f"[red]{window.name[:50]}[/red] "
f"blocked by [red]{b.name}[/red]."
)
return True
if not whiteblocks:
if not silent:
logging.debug("All non-blackblock windows are allowed.")
return False
for w in whiteblocks:
for k in w.keywords:
if keyword_matches_window(k, window):
if not silent:
logging.debug(
f"[pale_green3]{window.name[:30]}[/pale_green3] "
f"allowed by [sky_blue3]{w.name}[/sky_blue3]."
)
return False
if not silent:
xwindow.notify(f"'{window.name[:30]}' not on any whiteblock.")
return True