From 981709e96588b07a037113fc6882ff86ec2f4c63 Mon Sep 17 00:00:00 2001 From: felixm Date: Sat, 27 May 2023 17:11:46 -0400 Subject: [PATCH] Switch to asyncio with dbus-next --- antidrift/auth.py | 33 +++++++ antidrift/client.py | 62 +++++++------ antidrift/config.py | 3 +- antidrift/daemon.py | 204 +++++++++++++++++++++++-------------------- antidrift/xwindow.py | 3 + main.py | 15 ++-- 6 files changed, 188 insertions(+), 132 deletions(-) create mode 100644 antidrift/auth.py diff --git a/antidrift/auth.py b/antidrift/auth.py new file mode 100644 index 0000000..e62b979 --- /dev/null +++ b/antidrift/auth.py @@ -0,0 +1,33 @@ +from dbus_next.auth import Authenticator, _AuthResponse + + +class AuthExternal(Authenticator): + """An authenticator class for the external auth protocol for use with the + :class:`MessageBus `. + :sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol + """ + def __init__(self, user_uid): + self.user_uid = user_uid + self.negotiate_unix_fd = False + self.negotiating_fds = False + + def _authentication_start(self, negotiate_unix_fd=False) -> str: + self.negotiate_unix_fd = negotiate_unix_fd + hex_uid = str(self.user_uid).encode().hex() + return f'AUTH EXTERNAL {hex_uid}' + + def _receive_line(self, line: str): + response, args = _AuthResponse.parse(line) + + if response is _AuthResponse.OK: + if self.negotiate_unix_fd: + self.negotiating_fds = True + return "NEGOTIATE_UNIX_FD" + else: + return "BEGIN" + + if response is _AuthResponse.AGREE_UNIX_FD: + return "BEGIN" + + raise AuthError(f'authentication failed: {response.value}: {args}') + diff --git a/antidrift/client.py b/antidrift/client.py index 50a8b74..cfb0d55 100644 --- a/antidrift/client.py +++ b/antidrift/client.py @@ -1,19 +1,43 @@ -import time -import antidrift.daemon +import asyncio +from dbus_next.aio import MessageBus +from dbus_next import BusType +from dbus_next.errors import DBusError from antidrift.config import Config from argparse import Namespace from rich import print +from antidrift.daemon import IFACE, OPATH, BUS_NAME -def antidrift_daemon_is_running() -> bool: - """Check if AntiDrift is running via the DBUS""" - interface = antidrift.daemon.get_dbus_interface() +async def get_dbus_interface(): + try: + bus = await MessageBus(bus_type=BusType.SESSION).connect() + introspection = await bus.introspect(BUS_NAME, OPATH) + proxy_obj = bus.get_proxy_object(BUS_NAME, OPATH, introspection) + return proxy_obj.get_interface(IFACE) + except DBusError: + return None + + +async def run(args: Namespace, config: Config): + interface = await get_dbus_interface() + reply = "🟡 ad daemon active but no command" if interface is None: - return False - reply = interface.status() - if reply: - return True - return False + reply = "🔴 ad inactive" + elif args.start: + reply = await interface.call_start(args.start) + elif args.stop: + reply = await interface.call_stop() + elif args.pause: + reply = await interface.call_pause() + elif args.unpause: + reply = await interface.call_unpause() + elif args.schedule: + reply = await interface.call_schedule(args.schedule) + elif args.tailf: + tailf(config) + elif args.status: + reply = await interface.call_status() + print(reply) def tailf(config): @@ -25,21 +49,3 @@ def tailf(config): time.sleep(0.1) else: print(line.strip()) - - -def run(args: Namespace, config: Config): - interface = antidrift.daemon.get_dbus_interface() - reply = "🟡 ad daemon active but no command" - if interface is None: - reply = "🔴 ad inactive" - elif args.start: - reply = interface.start(args.start) - elif args.stop: - reply = interface.stop() - elif args.schedule: - reply = interface.schedule(args.schedule) - elif args.tailf: - tailf(config) - elif args.status: - reply = interface.status() - print(reply) diff --git a/antidrift/config.py b/antidrift/config.py index 4177a93..7db41fb 100644 --- a/antidrift/config.py +++ b/antidrift/config.py @@ -22,7 +22,7 @@ class Config(BaseModel): daemon_log_file: Path = Path() client_log_file: Path = Path() config_file: Path = Path() - polling_cycle_ms: int = 500 + polling_cycle_ms: int = 2000 enforce_delay_ms: int = 5000 class Config: @@ -46,6 +46,7 @@ class State(BaseModel): active_blackblocks: List[Block] = [] active_whiteblocks: List[Block] = [] inactive_blackblocks: List[Block] = [] + pause: bool = False class Config: extra = "forbid" diff --git a/antidrift/daemon.py b/antidrift/daemon.py index cda0f4d..a09ceac 100644 --- a/antidrift/daemon.py +++ b/antidrift/daemon.py @@ -1,59 +1,82 @@ from datetime import datetime import csv -import dbus -import dbus.service import logging import os import pwd import re import sys import time +import asyncio import antidrift.xwindow as xwindow from antidrift.xwindow import XWindow from antidrift.config import Config, State, Block -from gi.repository import GLib, Gio +from antidrift.auth import AuthExternal from typing import List, Optional +from dbus_next.aio import MessageBus +from dbus_next.service import ServiceInterface, method +from dbus_next import Variant, BusType + BUS_NAME = "com.antidrift" IFACE = "com.antidrift" OPATH = "/com/antidrift" -def reload_callback(m, f, o, event): - filename = f.get_basename() - m = f"[dark_orange3]Restart after change in '{filename}'.[/dark_orange3]" - logging.warning(m) - os.execv(sys.executable, ["python3"] + sys.argv) - - -def get_dbus_interface() -> Optional[dbus.Interface]: - try: - bus = dbus.SessionBus() - bus_object = bus.get_object(BUS_NAME, OPATH) - interface = dbus.Interface(bus_object, IFACE) - return interface - except dbus.exceptions.DBusException: - return None - - -class AntiDriftDaemon(dbus.service.Object): +class AntiDriftDaemon(ServiceInterface): def __init__(self, config: Config): - user_name = os.environ.get("SUDO_USER", pwd.getpwuid(os.getuid()).pw_name) - user_uid = pwd.getpwnam(user_name).pw_uid - euid = os.geteuid() - os.seteuid(user_uid) - bus = dbus.bus.BusConnection(f"unix:path=/run/user/{user_uid}/bus") - os.seteuid(euid) - - bus.request_name(BUS_NAME) - bus_name = dbus.service.BusName(BUS_NAME, bus=bus) - dbus.service.Object.__init__(self, bus_name, OPATH) + super().__init__(IFACE) self.config = config self.reset_block_state() self.enforce_count = 0 self.enforce_value = int(config.enforce_delay_ms / config.polling_cycle_ms) + async def init_bus(self): + """ + We are switching the effective UID to the target user's UID in order to + connect to the D-Bus session bus with the correct permissions and + authentication. Once the D-Bus connection is established, we restore + the original effective UID to maintain the appropriate privilege + levels. + """ + user_name = os.environ.get("SUDO_USER", pwd.getpwuid(os.getuid()).pw_name) + user_uid = pwd.getpwnam(user_name).pw_uid + euid = os.geteuid() + os.seteuid(user_uid) + auth = AuthExternal(user_uid) + bus_address = f"unix:path=/run/user/{user_uid}/bus" + bus = MessageBus(bus_address=bus_address, bus_type=BusType.SESSION, auth=auth) + await bus.connect() + bus.export(OPATH, self) + await bus.request_name(BUS_NAME) + os.seteuid(euid) + return bus + + async def run(self, debug: bool = False): + bus = await self.init_bus() + + async def _enforce(): + while True: + if self.state.pause is True: + await self.enforce_pause() + else: + await self.enforce() + await asyncio.sleep(self.config.polling_cycle_ms / 1000) + + async def _log(): + while True: + if self.state.pause is False: + self.log_window() + await asyncio.sleep(60) # Sleep for 60 seconds + + # Start _enforce and _log as background tasks + asyncio.create_task(_enforce()) + asyncio.create_task(_log()) + + xwindow.notify(f"AntiDrift running.") + stop = asyncio.Event() + await stop.wait() + def reset_block_state(self): self.state = State( active_blackblocks=self.config.blackblocks, @@ -61,8 +84,8 @@ class AntiDriftDaemon(dbus.service.Object): inactive_blackblocks=[], ) - @dbus.service.method(dbus_interface=IFACE, in_signature="as", out_signature="s") - def start(self, whiteblocks: List[str]) -> str: + @method() + def start(self, whiteblocks: 'as') -> 's': self.reset_block_state() all_whiteblocks = {wb.name: wb for wb in self.config.whiteblocks} success_wbs, fail_blocks = [], [] @@ -83,8 +106,8 @@ class AntiDriftDaemon(dbus.service.Object): logging.warning(m) return r - @dbus.service.method(dbus_interface=IFACE, in_signature="s", out_signature="s") - def schedule(self, blackblock_name: str) -> str: + @method() + def schedule(self, blackblock_name: 's') -> 's': """Schedule blackblock based if it has a non-zero timeout value.""" all_blackblocks = {bb.name: bb for bb in self.config.blackblocks} if blackblock_name not in all_blackblocks: @@ -101,27 +124,46 @@ class AntiDriftDaemon(dbus.service.Object): def allow(): self.allow_blackblock(blackblock) - delay_ms = blackblock.delay * 1000 * 60 - GLib.timeout_add(delay_ms, allow) - m = f"Scheduled [sky_blue3]{blackblock_name}[/sky_blue3] in {blackblock.delay} minutes." + delay_sec = blackblock.delay * 60 + + delay_sec = blackblock.delay * 60 + loop = asyncio.get_event_loop() + loop.call_later(delay_sec, allow) + m = f"Scheduled [sky_blue3]{blackblock.name}[/sky_blue3] in {blackblock.delay} minutes." logging.info(m) return m - @dbus.service.method(dbus_interface=IFACE, in_signature="", out_signature="s") - def stop(self) -> str: + @method() + def stop(self) -> 's': self.reset_block_state() m = "Blacklist only mode." logging.info(m) return m - @dbus.service.method(dbus_interface=IFACE, in_signature="", out_signature="s") - def status(self) -> str: + @method() + def pause(self) -> 's': + self.state.pause = True + m = "Antidrift paused." + logging.info(m) + return m + + @method() + def unpause(self) -> 's': + self.state.pause = False + m = "Antidrift unpaused." + logging.info(m) + return m + + @method() + def status(self) -> 's': white_active = bool(self.state.active_whiteblocks) black_active = bool(self.state.active_blackblocks) m = "🟢 ad " inactive_bbs = " ".join( map(lambda b: "-" + b.name, self.state.inactive_blackblocks) ) + if self.state.pause is True: + return "🟡 ad paused" match (white_active, black_active): case (True, _): m += "wb: " @@ -150,15 +192,6 @@ class AntiDriftDaemon(dbus.service.Object): s = " ".join(map(lambda b: b.name, self.state.active_whiteblocks)) return f"intention is {s} work" if s else "no intention" - def log_window(self): - self.config.window_log_file.parent.mkdir(parents=True, exist_ok=True) - window = XWindow() - ts = int(time.time()) - intention = self.get_intention() - log_line = f"{ts}, {window.name}, {window.cls}, {intention}\n" - with self.config.window_log_file.open('a') as f: - f.write(log_line) - def log_window(self): window = XWindow() utc_timestamp = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') @@ -174,54 +207,31 @@ class AntiDriftDaemon(dbus.service.Object): writer = csv.writer(f) writer.writerow(log_line) - def run(self, debug: bool = False): - def _enforce(): - self.enforce() - GLib.timeout_add(self.config.polling_cycle_ms, _enforce) + async def enforce_pause(self): + xwindow.notify("Goint to minimize window because of pause...") + for _ in range(8): + await asyncio.sleep(1) + if not XWindow().is_active() or self.state.pause is False: + return - def _log(): - self.log_window() - ONE_MINUTE_IN_MS = 60 * 1000 - GLib.timeout_add(ONE_MINUTE_IN_MS, _log) - - # autorestart on file change for development - monitors = [] - files = [ - "antidrift.py", - "antidrift/daemon.py", - "antidrift/client.py", - "antidrift/config.py", - ] - if debug: - logging.warning("[red]Running in debug mode.[/red]") - for filename in files: - gio_file = Gio.File.new_for_path(filename) - monitor = gio_file.monitor_file(Gio.FileMonitorFlags.NONE, None) - monitor.connect("changed", reload_callback) - monitors.append(monitor) - - self.config.window_log_file.parent.mkdir(parents=True, exist_ok=True) - _enforce() - _log() - mainloop = GLib.MainLoop() - mainloop.run() - xwindow.notify(f"AntiDrift running.") - - def enforce(self): - if self.enforce_count >= self.enforce_value: - window = XWindow() - xwindow.notify(f"Minimize {window.name[:30]}.") + window = XWindow() + if window.is_active(): window.minimize() - self.enforce_count = 0 - elif self.enforce_count > 0 and window_is_blocked(self.state, True): - self.enforce_count += 1 - elif self.enforce_count == 0 and window_is_blocked(self.state): - self.enforce_count += 1 - delay = int(self.config.enforce_delay_ms / 1000) - xwindow.notify(f"AntiDrift will minimize in {delay}s.") - elif self.enforce_count > 0: - xwindow.notify("We are gucci again.") - self.enforce_count = 0 + + async def enforce(self): + if not window_is_blocked(self.state): + return + + delay = int(self.config.enforce_delay_ms / 1000) + for i in range(delay, 0, -1): + await asyncio.sleep(1) + xwindow.notify(f"AntiDrift will minimize in {i}s.") + if not window_is_blocked(self.state, silent=True): + xwindow.notify("We are gucci again.") + return + window = XWindow() + xwindow.notify(f"Minimize {window.name[:30]}.") + window.minimize() def window_is_blocked(state: State, silent: bool = False) -> bool: diff --git a/antidrift/xwindow.py b/antidrift/xwindow.py index 6526594..7e7cc4f 100644 --- a/antidrift/xwindow.py +++ b/antidrift/xwindow.py @@ -37,6 +37,9 @@ class XWindow: def kill(self): self._run(["windowkill", self.window]) + def is_active(self): + return True if self.name else False + def notify(message: str) -> None: """Notify user via the Xorg notify-send command.""" diff --git a/main.py b/main.py index 8f54b06..b5115ba 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import subprocess import argparse import psutil import rich +import asyncio from rich.logging import RichHandler from pathlib import Path @@ -25,10 +26,10 @@ def get_args(): parser.add_argument("--daemon", action="store_true", help="run daemon") parser.add_argument("--status", action="store_true", help="get status from daemon") parser.add_argument("--tailf", action="store_true", help="tail -f log file") - parser.add_argument( - "--start", metavar="whiteblock", nargs="+", help="start whiteblocks" - ) + parser.add_argument("--start", metavar="whiteblock", nargs="+", help="start whiteblocks") parser.add_argument("--stop", action="store_true", help="stop session") + parser.add_argument("--pause", action="store_true", help="pause antidrift") + parser.add_argument("--unpause", action="store_true", help="unpause antidrift") parser.add_argument("--schedule", metavar="blackblock", help="schedule blackblock") args = parser.parse_args() return args @@ -97,7 +98,8 @@ def main_daemon(): if newpid == 0: config = Config.load(os.path.expanduser("~/.config/antidrift.yaml")) init_logging(config.daemon_log_file) - AntiDriftDaemon(config).run() + daemon = AntiDriftDaemon(config) + asyncio.run(daemon.run()) else: if sys.argv[0] == "antidrift": kill_existing_antidrift() @@ -106,7 +108,8 @@ def main_daemon(): else: config = Config.load(os.path.expanduser("~/.config/antidrift.yaml")) init_logging(config.daemon_log_file, dev_mode=True) - AntiDriftDaemon(config).run(debug=True) + daemon = AntiDriftDaemon(config) + asyncio.run(daemon.run(debug=True)) def main() -> None: @@ -117,7 +120,7 @@ def main() -> None: main_daemon() else: config = Config.load(os.path.expanduser("~/.config/antidrift.yaml")) - antidrift.client.run(args, config) + asyncio.run(antidrift.client.run(args, config)) if __name__ == "__main__":