167 lines
6.0 KiB
Python
167 lines
6.0 KiB
Python
import logging
|
|
import os
|
|
import sys
|
|
import pwd
|
|
import antidrift.xwindow as xwindow
|
|
from antidrift.config import Config
|
|
from gi.repository import GLib, Gio
|
|
from typing import List
|
|
import dbus
|
|
import dbus.service
|
|
|
|
BUS_NAME = "com.antidrift"
|
|
IFACE = "com.antidrift"
|
|
OPATH = "/com/antidrift"
|
|
|
|
|
|
def reload_callback(m, f, o, event):
|
|
filename = f.get_basename()
|
|
m = f"[dark_orange3]Restart after change in '{filename}'.[/dark_orange3]"
|
|
logging.warning(m)
|
|
os.execv(sys.executable, ['python3'] + sys.argv)
|
|
|
|
|
|
class AntiDriftDaemon(dbus.service.Object):
|
|
def __init__(self, config: Config):
|
|
|
|
user_name = os.environ["SUDO_USER"]
|
|
user_uid = pwd.getpwnam(user_name)[2]
|
|
euid = os.geteuid()
|
|
os.seteuid(user_uid)
|
|
bus = dbus.bus.BusConnection(f"unix:path=/run/user/{user_uid}/bus")
|
|
os.seteuid(euid)
|
|
|
|
bus.request_name(BUS_NAME)
|
|
bus_name = dbus.service.BusName(BUS_NAME, bus=bus)
|
|
dbus.service.Object.__init__(self, bus_name, OPATH)
|
|
self.config = config
|
|
self.config.active_whiteblocks = []
|
|
self.config.active_blackblocks = self.config.blackblocks
|
|
self.enforce_count = 0
|
|
self.enforce_value = int(config.enforce_delay_ms / config.polling_cycle_ms)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE,
|
|
in_signature="as", out_signature="s")
|
|
def start(self, whiteblocks: List[str]) -> str:
|
|
self.config.active_whiteblocks = []
|
|
all_whiteblocks = {wb.name: wb for wb in self.config.whiteblocks}
|
|
success_wbs = []
|
|
fail_wbs = []
|
|
for wb_name in whiteblocks:
|
|
if wb_name in all_whiteblocks:
|
|
self.config.active_whiteblocks.append(all_whiteblocks[wb_name])
|
|
success_wbs.append(wb_name)
|
|
else:
|
|
fail_wbs.append(wb_name)
|
|
if success_wbs:
|
|
wbs = ', '.join(success_wbs)
|
|
r = f"Start whiteblocks [sky_blue3]{wbs}[/sky_blue3]."
|
|
logging.info(r)
|
|
else:
|
|
r = "No whiteblocks started."
|
|
if fail_wbs:
|
|
m = f"No blackblocks [red]{', '.join(fail_wbs)}[/red]."
|
|
logging.warning(m)
|
|
return r
|
|
|
|
@dbus.service.method(dbus_interface=IFACE,
|
|
in_signature="", out_signature="s")
|
|
def stop(self) -> str:
|
|
self.config.active_whiteblocks = []
|
|
self.config.active_blackblocks = self.config.blackblocks
|
|
m = '[red]Stop[/red] all whitelists. Blacklist only mode.'
|
|
logging.info(m)
|
|
return m
|
|
|
|
@dbus.service.method(dbus_interface=IFACE,
|
|
in_signature="", out_signature="s")
|
|
def status(self) -> str:
|
|
white_active = bool(self.config.active_whiteblocks)
|
|
black_active = bool(self.config.active_blackblocks)
|
|
m = 'ad '
|
|
match (white_active, black_active):
|
|
case (True, _):
|
|
m += 'wb: '
|
|
m += ' '.join(map(lambda b: b.name, self.config.active_whiteblocks))
|
|
case (False, True):
|
|
m += 'bb'
|
|
case _:
|
|
m = 'inactive'
|
|
return m
|
|
|
|
def run(self):
|
|
def _enforce():
|
|
self.enforce()
|
|
GLib.timeout_add(self.config.polling_cycle_ms, _enforce)
|
|
|
|
# autorestart on file change for development
|
|
monitors = []
|
|
files = ["antidrift.py", "antidrift/daemon.py", "antidrift/client.py",
|
|
"antidrift/config.py"]
|
|
for filename in files:
|
|
gio_file = Gio.File.new_for_path(filename)
|
|
monitor = gio_file.monitor_file(Gio.FileMonitorFlags.NONE, None)
|
|
monitor.connect("changed", reload_callback)
|
|
monitors.append(monitor)
|
|
|
|
logging.info("[rosy_brown]Start.[/rosy_brown]")
|
|
_enforce()
|
|
mainloop = GLib.MainLoop()
|
|
mainloop.run()
|
|
|
|
def enforce(self):
|
|
config = self.config
|
|
# logging.debug(f"{self.enforce_count=} {self.enforce_value=}")
|
|
|
|
if self.enforce_count >= self.enforce_value:
|
|
window = xwindow.XWindow()
|
|
xwindow.notify(f"Minimize {window.name[:30]}.")
|
|
window.minimize()
|
|
self.enforce_count = 0
|
|
elif self.enforce_count > 0 and window_is_blocked(config, True):
|
|
self.enforce_count += 1
|
|
elif self.enforce_count == 0 and window_is_blocked(config):
|
|
self.enforce_count += 1
|
|
delay = int(config.enforce_delay_ms / 1000)
|
|
xwindow.notify(f"AntiDrift will minimize in {delay}s.")
|
|
elif self.enforce_count > 0:
|
|
xwindow.notify("We are gucci again.")
|
|
self.enforce_count = 0
|
|
|
|
|
|
def window_is_blocked(config: Config, silent: bool = False) -> bool:
|
|
# These should be selectable in the future (not all at the same time)
|
|
blackblocks = config.active_blackblocks
|
|
whiteblocks = config.active_whiteblocks
|
|
|
|
window = xwindow.XWindow()
|
|
if not window.keywords:
|
|
return False
|
|
for b in blackblocks:
|
|
for k in b.keywords:
|
|
if k in window.keywords and b.kill:
|
|
window.kill()
|
|
xwindow.notify(f"Kill for {k} on {b.name}.")
|
|
logging.warning(f"Kill for [red]{k}[/red] on [red]{b.name}[/red].")
|
|
return True
|
|
elif k in window.keywords:
|
|
if not silent:
|
|
xwindow.notify(f"{window.name[:30]} blocked by {b.name}.")
|
|
logging.warning(f"[red]{window.name[:50]}[/red] "
|
|
f"blocked by [red]{b.name}[/red].")
|
|
return True
|
|
if not whiteblocks:
|
|
if not silent:
|
|
logging.debug("All non-blackblock windows are allowed.")
|
|
return False
|
|
for w in whiteblocks:
|
|
for k in w.keywords:
|
|
if k in window.keywords:
|
|
if not silent:
|
|
logging.debug(f"[pale_green3]{window.name[:30]}[/pale_green3] "
|
|
f"allowed by [sky_blue3]{w.name}[/sky_blue3].")
|
|
return False
|
|
if not silent:
|
|
xwindow.notify(f"'{window.name[:30]}…' not on any whiteblock.")
|
|
return True
|