Implement scheduling of blackbox sessions.
This commit is contained in:
@@ -42,9 +42,11 @@ def tailf(config):
|
||||
def client_mode(config: Config):
|
||||
parser = argparse.ArgumentParser(description='AntiDrift CLI.')
|
||||
parser.add_argument('--start', metavar='whiteblock', nargs='+',
|
||||
help='start session with whiteblocks')
|
||||
help='start whiteblocks')
|
||||
parser.add_argument('--stop', action='store_true',
|
||||
help='stop session')
|
||||
parser.add_argument('--schedule', metavar='blackblock',
|
||||
help='schedule blackblock')
|
||||
parser.add_argument('--status', action='store_true',
|
||||
help='get status from daemon')
|
||||
parser.add_argument('--tailf', action='store_true',
|
||||
@@ -55,6 +57,8 @@ def client_mode(config: Config):
|
||||
reply = interface.start(args.start)
|
||||
elif args.stop:
|
||||
reply = interface.stop()
|
||||
elif args.schedule:
|
||||
reply = interface.schedule(args.schedule)
|
||||
elif args.tailf:
|
||||
tailf(config)
|
||||
elif args.status:
|
||||
|
||||
@@ -9,13 +9,12 @@ class Block(BaseModel):
|
||||
name: str
|
||||
keywords: List[str]
|
||||
kill: bool = False
|
||||
delay: int = 0
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
blackblocks: List[Block]
|
||||
whiteblocks: List[Block]
|
||||
active_blackblocks: List[Block] = []
|
||||
active_whiteblocks: List[Block] = []
|
||||
daemon_log_file: Path = Path()
|
||||
client_log_file: Path = Path()
|
||||
config_file: Path = Path()
|
||||
@@ -33,3 +32,9 @@ class Config(BaseModel):
|
||||
config = cls(**config_dict)
|
||||
config.config_file = Path(config_file)
|
||||
return config
|
||||
|
||||
|
||||
class State(BaseModel):
|
||||
active_blackblocks: List[Block] = []
|
||||
active_whiteblocks: List[Block] = []
|
||||
inactive_blackblocks: List[Block] = []
|
||||
|
||||
@@ -3,7 +3,7 @@ import os
|
||||
import sys
|
||||
import pwd
|
||||
import antidrift.xwindow as xwindow
|
||||
from antidrift.config import Config
|
||||
from antidrift.config import Config, State, Block
|
||||
from gi.repository import GLib, Gio
|
||||
from typing import List
|
||||
import dbus
|
||||
@@ -24,8 +24,8 @@ def reload_callback(m, f, o, event):
|
||||
class AntiDriftDaemon(dbus.service.Object):
|
||||
def __init__(self, config: Config):
|
||||
|
||||
user_name = os.environ["SUDO_USER"]
|
||||
user_uid = pwd.getpwnam(user_name)[2]
|
||||
user_name = os.environ.get("SUDO_USER", pwd.getpwuid(os.getuid()).pw_name)
|
||||
user_uid = pwd.getpwnam(user_name).pw_uid
|
||||
euid = os.geteuid()
|
||||
os.seteuid(user_uid)
|
||||
bus = dbus.bus.BusConnection(f"unix:path=/run/user/{user_uid}/bus")
|
||||
@@ -35,60 +35,102 @@ class AntiDriftDaemon(dbus.service.Object):
|
||||
bus_name = dbus.service.BusName(BUS_NAME, bus=bus)
|
||||
dbus.service.Object.__init__(self, bus_name, OPATH)
|
||||
self.config = config
|
||||
self.config.active_whiteblocks = []
|
||||
self.config.active_blackblocks = self.config.blackblocks
|
||||
self.reset_block_state()
|
||||
self.enforce_count = 0
|
||||
self.enforce_value = int(config.enforce_delay_ms / config.polling_cycle_ms)
|
||||
|
||||
def reset_block_state(self):
|
||||
self.state = State(
|
||||
active_blackblocks=self.config.blackblocks,
|
||||
active_whiteblocks=[],
|
||||
inactive_blackblocks=[])
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE,
|
||||
in_signature="as", out_signature="s")
|
||||
def start(self, whiteblocks: List[str]) -> str:
|
||||
self.config.active_whiteblocks = []
|
||||
self.reset_block_state()
|
||||
all_whiteblocks = {wb.name: wb for wb in self.config.whiteblocks}
|
||||
success_wbs = []
|
||||
fail_wbs = []
|
||||
for wb_name in whiteblocks:
|
||||
if wb_name in all_whiteblocks:
|
||||
self.config.active_whiteblocks.append(all_whiteblocks[wb_name])
|
||||
success_wbs.append(wb_name)
|
||||
success_wbs, fail_blocks = [], []
|
||||
for block_name in whiteblocks:
|
||||
if block_name in all_whiteblocks:
|
||||
self.state.active_whiteblocks.append(all_whiteblocks[block_name])
|
||||
success_wbs.append(block_name)
|
||||
else:
|
||||
fail_wbs.append(wb_name)
|
||||
fail_blocks.append(block_name)
|
||||
if success_wbs:
|
||||
wbs = ', '.join(success_wbs)
|
||||
r = f"Start whiteblocks [sky_blue3]{wbs}[/sky_blue3]."
|
||||
logging.info(r)
|
||||
else:
|
||||
r = "No whiteblocks started."
|
||||
if fail_wbs:
|
||||
m = f"No blackblocks [red]{', '.join(fail_wbs)}[/red]."
|
||||
if fail_blocks:
|
||||
m = f"No whiteblocks [red3]{', '.join(fail_blocks)}[/red3]."
|
||||
logging.warning(m)
|
||||
return r
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE,
|
||||
in_signature="s", out_signature="s")
|
||||
def schedule(self, blackblock_name: str) -> str:
|
||||
""" Schedule blackblock based if it has a non-zero timeout value. """
|
||||
all_blackblocks = {bb.name: bb for bb in self.config.blackblocks}
|
||||
if blackblock_name not in all_blackblocks:
|
||||
m = f"No blackblock [red3]{blackblock_name}[/red3]."
|
||||
logging.warning(m)
|
||||
return m
|
||||
|
||||
blackblock = all_blackblocks[blackblock_name]
|
||||
if blackblock.delay == 0:
|
||||
m = f"Blackblock [red3]{blackblock_name}[/red3] cannot be scheduled without delay."
|
||||
logging.warning(m)
|
||||
return m
|
||||
|
||||
def allow():
|
||||
self.allow_blackblock(blackblock)
|
||||
delay_ms = blackblock.delay * 1000 * 60
|
||||
GLib.timeout_add(delay_ms, allow)
|
||||
m = f"Scheduled [sky_blue3]{blackblock_name}[/sky_blue3] in {blackblock.delay} minutes."
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE,
|
||||
in_signature="", out_signature="s")
|
||||
def stop(self) -> str:
|
||||
self.config.active_whiteblocks = []
|
||||
self.config.active_blackblocks = self.config.blackblocks
|
||||
m = '[red]Stop[/red] all whitelists. Blacklist only mode.'
|
||||
self.reset_block_state()
|
||||
m = 'Blacklist only mode.'
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE,
|
||||
in_signature="", out_signature="s")
|
||||
def status(self) -> str:
|
||||
white_active = bool(self.config.active_whiteblocks)
|
||||
black_active = bool(self.config.active_blackblocks)
|
||||
white_active = bool(self.state.active_whiteblocks)
|
||||
black_active = bool(self.state.active_blackblocks)
|
||||
m = 'ad '
|
||||
inactive_bbs = ' '.join(map(lambda b: "-" + b.name, self.state.inactive_blackblocks))
|
||||
match (white_active, black_active):
|
||||
case (True, _):
|
||||
m += 'wb: '
|
||||
m += ' '.join(map(lambda b: b.name, self.config.active_whiteblocks))
|
||||
m += ' '.join(map(lambda b: b.name, self.state.active_whiteblocks))
|
||||
if inactive_bbs:
|
||||
m += ' '
|
||||
m += inactive_bbs
|
||||
case (False, True):
|
||||
m += 'bb'
|
||||
if inactive_bbs:
|
||||
m += ': '
|
||||
m += inactive_bbs
|
||||
case _:
|
||||
m = 'inactive'
|
||||
return m
|
||||
|
||||
def allow_blackblock(self, blackblock: Block):
|
||||
if blackblock in self.state.active_blackblocks:
|
||||
self.state.active_blackblocks.remove(blackblock)
|
||||
if blackblock not in self.state.inactive_blackblocks:
|
||||
self.state.inactive_blackblocks.append(blackblock)
|
||||
m = f"Blackblock [sky_blue3]{blackblock.name}[/sky_blue3] is now allowed."
|
||||
logging.info(m)
|
||||
|
||||
def run(self):
|
||||
def _enforce():
|
||||
self.enforce()
|
||||
@@ -110,29 +152,25 @@ class AntiDriftDaemon(dbus.service.Object):
|
||||
mainloop.run()
|
||||
|
||||
def enforce(self):
|
||||
config = self.config
|
||||
# logging.debug(f"{self.enforce_count=} {self.enforce_value=}")
|
||||
|
||||
if self.enforce_count >= self.enforce_value:
|
||||
window = xwindow.XWindow()
|
||||
xwindow.notify(f"Minimize {window.name[:30]}.")
|
||||
window.minimize()
|
||||
self.enforce_count = 0
|
||||
elif self.enforce_count > 0 and window_is_blocked(config, True):
|
||||
elif self.enforce_count > 0 and window_is_blocked(self.state, True):
|
||||
self.enforce_count += 1
|
||||
elif self.enforce_count == 0 and window_is_blocked(config):
|
||||
elif self.enforce_count == 0 and window_is_blocked(self.state):
|
||||
self.enforce_count += 1
|
||||
delay = int(config.enforce_delay_ms / 1000)
|
||||
delay = int(self.config.enforce_delay_ms / 1000)
|
||||
xwindow.notify(f"AntiDrift will minimize in {delay}s.")
|
||||
elif self.enforce_count > 0:
|
||||
xwindow.notify("We are gucci again.")
|
||||
self.enforce_count = 0
|
||||
|
||||
|
||||
def window_is_blocked(config: Config, silent: bool = False) -> bool:
|
||||
# These should be selectable in the future (not all at the same time)
|
||||
blackblocks = config.active_blackblocks
|
||||
whiteblocks = config.active_whiteblocks
|
||||
def window_is_blocked(state: State, silent: bool = False) -> bool:
|
||||
blackblocks = state.active_blackblocks
|
||||
whiteblocks = state.active_whiteblocks
|
||||
|
||||
window = xwindow.XWindow()
|
||||
if not window.keywords:
|
||||
|
||||
@@ -32,7 +32,7 @@ class XWindow:
|
||||
self._run(["windowquit", self.window])
|
||||
|
||||
def kill(self):
|
||||
self._run(["windowkill", self.window])
|
||||
self._run(["windowclose", self.window])
|
||||
|
||||
|
||||
def notify(message: str) -> None:
|
||||
|
||||
Reference in New Issue
Block a user