Fix lock when trying to stop while enforce sequence is active.
This commit is contained in:
29
antidrift.py
29
antidrift.py
@@ -14,7 +14,7 @@ DBusGMainLoop(set_as_default=True)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
|
||||
|
||||
def init_logging(log_file: Path, mode: str):
|
||||
def init_logging(log_file: Path):
|
||||
class DuplicateFilter(logging.Filter):
|
||||
def filter(self, record) -> bool:
|
||||
current_log = (record.module, record.levelno, record.msg)
|
||||
@@ -22,10 +22,12 @@ def init_logging(log_file: Path, mode: str):
|
||||
self.last_log = current_log
|
||||
return True
|
||||
return False
|
||||
format_str = f'%(asctime)s | %(levelname)-8s | {mode} | %(message)s'
|
||||
format_str = '[bold pale_green3]%(asctime)s[/bold pale_green3] | ' \
|
||||
'[light_steel_blue]%(levelname)-8s[/light_steel_blue] | ' \
|
||||
'%(message)s'
|
||||
logging.basicConfig(filename=log_file,
|
||||
format=format_str,
|
||||
datefmt='%H:%M:%S',
|
||||
datefmt='%a %H:%M:%S',
|
||||
encoding='utf-8',
|
||||
level=logging.DEBUG)
|
||||
logger = logging.getLogger()
|
||||
@@ -40,33 +42,16 @@ def check_for_xdotool():
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def tailf(config):
|
||||
import time
|
||||
from rich import print
|
||||
with open(config.log_file, 'r') as f:
|
||||
f.seek(0, 2)
|
||||
while True:
|
||||
line = f.readline()
|
||||
if not line:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
print(line.strip())
|
||||
|
||||
|
||||
def main() -> None:
|
||||
""" Main routine that dispatches to client or daemon """
|
||||
config = Config.load("config.yaml")
|
||||
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "tailf":
|
||||
tailf(config)
|
||||
return
|
||||
|
||||
check_for_xdotool()
|
||||
if client.antidrift_daemon_is_running():
|
||||
init_logging(config.log_file, "[blue]client[/blue]")
|
||||
init_logging(config.client_log_file)
|
||||
client.client_mode(config)
|
||||
else:
|
||||
init_logging(config.log_file, "[red]server[/red]")
|
||||
init_logging(config.daemon_log_file)
|
||||
add = AntiDriftDaemon(config)
|
||||
add.run()
|
||||
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
from antidrift.config import Config
|
||||
from typing import Optional, List
|
||||
from typing import Optional
|
||||
from rich import print
|
||||
import argparse
|
||||
import sys
|
||||
import logging
|
||||
import time
|
||||
import dbus
|
||||
import dbus.service
|
||||
|
||||
@@ -25,21 +24,37 @@ def antidrift_daemon_is_running() -> bool:
|
||||
return False
|
||||
reply = interface.status()
|
||||
if reply:
|
||||
logging.info(reply)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def client_mode(_config: Config):
|
||||
def tailf(config):
|
||||
with open(config.daemon_log_file, 'r') as f:
|
||||
f.seek(0, 2)
|
||||
while True:
|
||||
line = f.readline()
|
||||
if not line:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
print(line.strip())
|
||||
|
||||
|
||||
def client_mode(config: Config):
|
||||
parser = argparse.ArgumentParser(description='AntiDrift CLI.')
|
||||
parser.add_argument('--start', metavar='whiteblock', nargs='+',
|
||||
help='start session with whiteblocks')
|
||||
parser.add_argument('--stop', action='store_true',
|
||||
help='stop session')
|
||||
parser.add_argument('--tailf', action='store_true',
|
||||
help='tail -f log file')
|
||||
args = parser.parse_args()
|
||||
interface = get_dbus_interface()
|
||||
if args.start:
|
||||
reply = interface.start(args.start)
|
||||
elif args.stop:
|
||||
reply = interface.stop()
|
||||
elif args.tailf:
|
||||
tailf(config)
|
||||
else:
|
||||
reply = '[red]no command[/red]'
|
||||
print(reply)
|
||||
|
||||
@@ -15,10 +15,11 @@ class Config(BaseModel):
|
||||
whiteblocks: List[Block]
|
||||
active_blackblocks: List[Block] = []
|
||||
active_whiteblocks: List[Block] = []
|
||||
log_file: Path = Path()
|
||||
daemon_log_file: Path = Path()
|
||||
client_log_file: Path = Path()
|
||||
config_file: Path = Path()
|
||||
check_delay: int = 1000
|
||||
minimize_delay: int = 5
|
||||
polling_cycle_ms: int = 500
|
||||
enforce_delay_ms: int = 10000
|
||||
|
||||
class Config:
|
||||
extra = 'forbid'
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import antidrift.xwindow as xwindow
|
||||
from antidrift.config import Config
|
||||
from gi.repository import GLib, Gio
|
||||
from enum import Enum
|
||||
from typing import List
|
||||
import dbus
|
||||
import dbus.service
|
||||
@@ -17,8 +14,9 @@ OPATH = "/com/antidrift"
|
||||
|
||||
|
||||
def reload_callback(m, f, o, event):
|
||||
# Without this check, multiple 'ok's will be printed for each file change
|
||||
logging.warning("[bold green]Restart.[/bold green]")
|
||||
filename = f.get_basename()
|
||||
m = f"[dark_orange3]Restart after change in '{filename}'.[/dark_orange3]"
|
||||
logging.warning(m)
|
||||
os.execv(sys.executable, ['python3'] + sys.argv)
|
||||
|
||||
|
||||
@@ -31,13 +29,14 @@ class AntiDriftDaemon(dbus.service.Object):
|
||||
self.config = config
|
||||
self.config.active_whiteblocks = []
|
||||
self.config.active_blackblocks = self.config.blackblocks
|
||||
self.enforce_count = 0
|
||||
self.enforce_value = int(config.enforce_delay_ms / config.polling_cycle_ms)
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE,
|
||||
in_signature="", out_signature="s")
|
||||
def status(self) -> str:
|
||||
return "active"
|
||||
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE,
|
||||
in_signature="as", out_signature="s")
|
||||
def start(self, whiteblocks: List[str]) -> str:
|
||||
@@ -52,7 +51,8 @@ class AntiDriftDaemon(dbus.service.Object):
|
||||
else:
|
||||
fail_wbs.append(wb_name)
|
||||
if success_wbs:
|
||||
r = f"Start whiteblocks [blue]{', '.join(success_wbs)}[/blue]."
|
||||
wbs = ', '.join(success_wbs)
|
||||
r = f"Start whiteblocks [sky_blue3]{wbs}[/sky_blue3]."
|
||||
logging.info(r)
|
||||
else:
|
||||
r = "No whiteblocks started."
|
||||
@@ -70,26 +70,45 @@ class AntiDriftDaemon(dbus.service.Object):
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
|
||||
def run(self):
|
||||
def _enforce():
|
||||
enforce(self.config)
|
||||
GLib.timeout_add(self.config.check_delay, _enforce)
|
||||
self.enforce()
|
||||
GLib.timeout_add(self.config.polling_cycle_ms, _enforce)
|
||||
|
||||
# autorestart on file change for development
|
||||
monitors = []
|
||||
files = ["antidrift.py", "antidrift/daemon.py", "antidrift/client.py"]
|
||||
files = ["antidrift.py", "antidrift/daemon.py", "antidrift/client.py",
|
||||
"antidrift/config.py"]
|
||||
for filename in files:
|
||||
gio_file = Gio.File.new_for_path(filename)
|
||||
monitor = gio_file.monitor_file(Gio.FileMonitorFlags.NONE, None)
|
||||
monitor.connect("changed", reload_callback)
|
||||
monitors.append(monitor)
|
||||
|
||||
logging.info("Start AntiDriftDaemon.")
|
||||
logging.info("[rosy_brown]Start.[/rosy_brown]")
|
||||
_enforce()
|
||||
mainloop = GLib.MainLoop()
|
||||
mainloop.run()
|
||||
|
||||
def enforce(self):
|
||||
config = self.config
|
||||
# logging.debug(f"{self.enforce_count=} {self.enforce_value=}")
|
||||
|
||||
if self.enforce_count >= self.enforce_value:
|
||||
window = xwindow.XWindow()
|
||||
xwindow.notify(f"Minimize {window.name[:30]}.")
|
||||
window.minimize()
|
||||
self.enforce_count = 0
|
||||
elif self.enforce_count > 0 and window_is_blocked(config, True):
|
||||
self.enforce_count += 1
|
||||
elif self.enforce_count == 0 and window_is_blocked(config):
|
||||
self.enforce_count += 1
|
||||
delay = config.enforce_delay_ms
|
||||
xwindow.notify(f"AntiDrift will minimize in {delay} ms.")
|
||||
elif self.enforce_count > 0:
|
||||
xwindow.notify("We are gucci again.")
|
||||
self.enforce_count = 0
|
||||
|
||||
|
||||
def window_is_blocked(config: Config, silent: bool = False) -> bool:
|
||||
# These should be selectable in the future (not all at the same time)
|
||||
@@ -121,23 +140,3 @@ def window_is_blocked(config: Config, silent: bool = False) -> bool:
|
||||
if not silent:
|
||||
xwindow.notify(f"[red]{window.name[:30]}[/red] not on any whiteblock.")
|
||||
return True
|
||||
|
||||
|
||||
def enforce(config: Config):
|
||||
if not window_is_blocked(config):
|
||||
return
|
||||
delay = config.minimize_delay
|
||||
xwindow.notify(f"AntiDrift will minimize in {delay} seconds.")
|
||||
timer, timer_step = 0.0, 0.5
|
||||
while timer < config.minimize_delay:
|
||||
time.sleep(timer_step)
|
||||
timer += timer_step
|
||||
if not window_is_blocked(config, True):
|
||||
break
|
||||
window = xwindow.XWindow()
|
||||
if window_is_blocked(config):
|
||||
window = xwindow.XWindow()
|
||||
xwindow.notify(f"Minimize {window.name[:30]}.")
|
||||
window.minimize()
|
||||
else:
|
||||
xwindow.notify("We are gucci again.")
|
||||
|
||||
@@ -33,7 +33,7 @@ class XWindow:
|
||||
|
||||
def notify(message: str) -> None:
|
||||
""" Notify user via the Xorg notify-send command. """
|
||||
logging.debug(f"[yellow]notify[/yellow] {message}")
|
||||
logging.debug(f"{message} - [grey]notify[/grey]")
|
||||
env = {
|
||||
**os.environ,
|
||||
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
|
||||
|
||||
Reference in New Issue
Block a user