diff --git a/antidrift/client.py b/antidrift/client.py index cda5cf3..65a9898 100644 --- a/antidrift/client.py +++ b/antidrift/client.py @@ -42,9 +42,11 @@ def tailf(config): def client_mode(config: Config): parser = argparse.ArgumentParser(description='AntiDrift CLI.') parser.add_argument('--start', metavar='whiteblock', nargs='+', - help='start session with whiteblocks') + help='start whiteblocks') parser.add_argument('--stop', action='store_true', help='stop session') + parser.add_argument('--schedule', metavar='blackblock', + help='schedule blackblock') parser.add_argument('--status', action='store_true', help='get status from daemon') parser.add_argument('--tailf', action='store_true', @@ -55,6 +57,8 @@ def client_mode(config: Config): reply = interface.start(args.start) elif args.stop: reply = interface.stop() + elif args.schedule: + reply = interface.schedule(args.schedule) elif args.tailf: tailf(config) elif args.status: diff --git a/antidrift/config.py b/antidrift/config.py index f3fbba3..0e653bc 100644 --- a/antidrift/config.py +++ b/antidrift/config.py @@ -9,13 +9,12 @@ class Block(BaseModel): name: str keywords: List[str] kill: bool = False + delay: int = 0 class Config(BaseModel): blackblocks: List[Block] whiteblocks: List[Block] - active_blackblocks: List[Block] = [] - active_whiteblocks: List[Block] = [] daemon_log_file: Path = Path() client_log_file: Path = Path() config_file: Path = Path() @@ -33,3 +32,9 @@ class Config(BaseModel): config = cls(**config_dict) config.config_file = Path(config_file) return config + + +class State(BaseModel): + active_blackblocks: List[Block] = [] + active_whiteblocks: List[Block] = [] + inactive_blackblocks: List[Block] = [] diff --git a/antidrift/daemon.py b/antidrift/daemon.py index 2263388..57c0950 100644 --- a/antidrift/daemon.py +++ b/antidrift/daemon.py @@ -3,7 +3,7 @@ import os import sys import pwd import antidrift.xwindow as xwindow -from antidrift.config import Config +from antidrift.config import Config, State, Block from gi.repository import GLib, Gio from typing import List import dbus @@ -24,8 +24,8 @@ def reload_callback(m, f, o, event): class AntiDriftDaemon(dbus.service.Object): def __init__(self, config: Config): - user_name = os.environ["SUDO_USER"] - user_uid = pwd.getpwnam(user_name)[2] + user_name = os.environ.get("SUDO_USER", pwd.getpwuid(os.getuid()).pw_name) + user_uid = pwd.getpwnam(user_name).pw_uid euid = os.geteuid() os.seteuid(user_uid) bus = dbus.bus.BusConnection(f"unix:path=/run/user/{user_uid}/bus") @@ -35,60 +35,102 @@ class AntiDriftDaemon(dbus.service.Object): bus_name = dbus.service.BusName(BUS_NAME, bus=bus) dbus.service.Object.__init__(self, bus_name, OPATH) self.config = config - self.config.active_whiteblocks = [] - self.config.active_blackblocks = self.config.blackblocks + self.reset_block_state() self.enforce_count = 0 self.enforce_value = int(config.enforce_delay_ms / config.polling_cycle_ms) + def reset_block_state(self): + self.state = State( + active_blackblocks=self.config.blackblocks, + active_whiteblocks=[], + inactive_blackblocks=[]) + @dbus.service.method(dbus_interface=IFACE, in_signature="as", out_signature="s") def start(self, whiteblocks: List[str]) -> str: - self.config.active_whiteblocks = [] + self.reset_block_state() all_whiteblocks = {wb.name: wb for wb in self.config.whiteblocks} - success_wbs = [] - fail_wbs = [] - for wb_name in whiteblocks: - if wb_name in all_whiteblocks: - self.config.active_whiteblocks.append(all_whiteblocks[wb_name]) - success_wbs.append(wb_name) + success_wbs, fail_blocks = [], [] + for block_name in whiteblocks: + if block_name in all_whiteblocks: + self.state.active_whiteblocks.append(all_whiteblocks[block_name]) + success_wbs.append(block_name) else: - fail_wbs.append(wb_name) + fail_blocks.append(block_name) if success_wbs: wbs = ', '.join(success_wbs) r = f"Start whiteblocks [sky_blue3]{wbs}[/sky_blue3]." logging.info(r) else: r = "No whiteblocks started." - if fail_wbs: - m = f"No blackblocks [red]{', '.join(fail_wbs)}[/red]." + if fail_blocks: + m = f"No whiteblocks [red3]{', '.join(fail_blocks)}[/red3]." logging.warning(m) return r + @dbus.service.method(dbus_interface=IFACE, + in_signature="s", out_signature="s") + def schedule(self, blackblock_name: str) -> str: + """ Schedule blackblock based if it has a non-zero timeout value. """ + all_blackblocks = {bb.name: bb for bb in self.config.blackblocks} + if blackblock_name not in all_blackblocks: + m = f"No blackblock [red3]{blackblock_name}[/red3]." + logging.warning(m) + return m + + blackblock = all_blackblocks[blackblock_name] + if blackblock.delay == 0: + m = f"Blackblock [red3]{blackblock_name}[/red3] cannot be scheduled without delay." + logging.warning(m) + return m + + def allow(): + self.allow_blackblock(blackblock) + delay_ms = blackblock.delay * 1000 * 60 + GLib.timeout_add(delay_ms, allow) + m = f"Scheduled [sky_blue3]{blackblock_name}[/sky_blue3] in {blackblock.delay} minutes." + logging.info(m) + return m + @dbus.service.method(dbus_interface=IFACE, in_signature="", out_signature="s") def stop(self) -> str: - self.config.active_whiteblocks = [] - self.config.active_blackblocks = self.config.blackblocks - m = '[red]Stop[/red] all whitelists. Blacklist only mode.' + self.reset_block_state() + m = 'Blacklist only mode.' logging.info(m) return m @dbus.service.method(dbus_interface=IFACE, in_signature="", out_signature="s") def status(self) -> str: - white_active = bool(self.config.active_whiteblocks) - black_active = bool(self.config.active_blackblocks) + white_active = bool(self.state.active_whiteblocks) + black_active = bool(self.state.active_blackblocks) m = 'ad ' + inactive_bbs = ' '.join(map(lambda b: "-" + b.name, self.state.inactive_blackblocks)) match (white_active, black_active): case (True, _): m += 'wb: ' - m += ' '.join(map(lambda b: b.name, self.config.active_whiteblocks)) + m += ' '.join(map(lambda b: b.name, self.state.active_whiteblocks)) + if inactive_bbs: + m += ' ' + m += inactive_bbs case (False, True): m += 'bb' + if inactive_bbs: + m += ': ' + m += inactive_bbs case _: m = 'inactive' return m + def allow_blackblock(self, blackblock: Block): + if blackblock in self.state.active_blackblocks: + self.state.active_blackblocks.remove(blackblock) + if blackblock not in self.state.inactive_blackblocks: + self.state.inactive_blackblocks.append(blackblock) + m = f"Blackblock [sky_blue3]{blackblock.name}[/sky_blue3] is now allowed." + logging.info(m) + def run(self): def _enforce(): self.enforce() @@ -110,29 +152,25 @@ class AntiDriftDaemon(dbus.service.Object): mainloop.run() def enforce(self): - config = self.config - # logging.debug(f"{self.enforce_count=} {self.enforce_value=}") - if self.enforce_count >= self.enforce_value: window = xwindow.XWindow() xwindow.notify(f"Minimize {window.name[:30]}.") window.minimize() self.enforce_count = 0 - elif self.enforce_count > 0 and window_is_blocked(config, True): + elif self.enforce_count > 0 and window_is_blocked(self.state, True): self.enforce_count += 1 - elif self.enforce_count == 0 and window_is_blocked(config): + elif self.enforce_count == 0 and window_is_blocked(self.state): self.enforce_count += 1 - delay = int(config.enforce_delay_ms / 1000) + delay = int(self.config.enforce_delay_ms / 1000) xwindow.notify(f"AntiDrift will minimize in {delay}s.") elif self.enforce_count > 0: xwindow.notify("We are gucci again.") self.enforce_count = 0 -def window_is_blocked(config: Config, silent: bool = False) -> bool: - # These should be selectable in the future (not all at the same time) - blackblocks = config.active_blackblocks - whiteblocks = config.active_whiteblocks +def window_is_blocked(state: State, silent: bool = False) -> bool: + blackblocks = state.active_blackblocks + whiteblocks = state.active_whiteblocks window = xwindow.XWindow() if not window.keywords: diff --git a/antidrift/xwindow.py b/antidrift/xwindow.py index b1d4ccc..497ac30 100644 --- a/antidrift/xwindow.py +++ b/antidrift/xwindow.py @@ -32,7 +32,7 @@ class XWindow: self._run(["windowquit", self.window]) def kill(self): - self._run(["windowkill", self.window]) + self._run(["windowclose", self.window]) def notify(message: str) -> None: