Compare commits

..

2 Commits

Author SHA1 Message Date
95dcfa72d6 Remove Python code 2024-07-05 10:53:50 -04:00
3b7d88f279 Implment rough TUI version of new workflow
Merge in Python stuff and then we will probably get
rid of it and continue in Rust only.
2024-07-05 10:52:12 -04:00
14 changed files with 311 additions and 1096 deletions

146
.gitignore vendored
View File

@@ -1,147 +1,4 @@
config.yaml # Generated by Cargo will have compiled files and executables
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# ---> Rust
# Generated by Cargo
# will have compiled files and executables
debug/ debug/
target/ target/
@@ -154,4 +11,3 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information # MSVC Windows builds of rustc generate these, which store debugging information
*.pdb *.pdb

9
Cargo.toml Normal file
View File

@@ -0,0 +1,9 @@
[package]
name = "antidrift"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.86"
ratatui = "0.27.0"
regex = "1.10.5"

View File

@@ -1,60 +0,0 @@
# AntiDrift
Utilize your computer purposefully.
## Make executable and install
```
pip install pyinstaller --user
pyinstaller --onefile antidrift.py
sudo cp dist/antidrift /usr/bin
```
## Dependencies
- dbus-python
- glib
## Create sudoers configuration
Create a file `antidrift` in `/etc/sudoers.d`. This allows antidrift to run
itself with sudo ultimately making it unkillable from the regular user.
```
user hostname = (root) NOPASSWD: /usr/bin/antidrift
```
## Autostart with systemd
- Create `~/.config/systemd/user/antidrift.service`
- Add configuration below and save
- Run `systemctl --user enable antidrift.service`
```
[Unit]
Description=AntiDrift
[Service]
ExecStart=antidrift
[Install]
WantedBy=default.target
```
## Autostart via desktop file
Create a file `antidrift.desktop` in `/etc/xdg/autostart`.
Add the following content to the file.
```
[Desktop Entry]
Name=AntiDrift
Exec=antidrift
Terminal=false
Type=Application
StartupNotify=false
```
Your window manager will now start antidrift automatically.

View File

@@ -1,32 +0,0 @@
{
"folders": [
{
"path": "."
}
],
"launch": {
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "FocusFriend Debug",
"type": "python",
"request": "launch",
"program": "focusfriend.py",
"console": "integratedTerminal",
"args": ["--debug"]
}
]
},
"settings": {
"python.linting.pylintEnabled": true,
"python.linting.enabled": true,
"python.linting.pylintArgs": ["--errors-only"]
}
}

View File

@@ -1 +0,0 @@
""" AntiDrift module init """

View File

@@ -1,33 +0,0 @@
from dbus_next.auth import Authenticator, _AuthResponse
class AuthExternal(Authenticator):
"""An authenticator class for the external auth protocol for use with the
:class:`MessageBus <dbus_next.message_bus.BaseMessageBus>`.
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
"""
def __init__(self, user_uid):
self.user_uid = user_uid
self.negotiate_unix_fd = False
self.negotiating_fds = False
def _authentication_start(self, negotiate_unix_fd=False) -> str:
self.negotiate_unix_fd = negotiate_unix_fd
hex_uid = str(self.user_uid).encode().hex()
return f"AUTH EXTERNAL {hex_uid}"
def _receive_line(self, line: str):
response, args = _AuthResponse.parse(line)
if response is _AuthResponse.OK:
if self.negotiate_unix_fd:
self.negotiating_fds = True
return "NEGOTIATE_UNIX_FD"
else:
return "BEGIN"
if response is _AuthResponse.AGREE_UNIX_FD:
return "BEGIN"
raise AuthError(f"authentication failed: {response.value}: {args}")

View File

@@ -1,60 +0,0 @@
import time
from dbus_next.aio import MessageBus
from dbus_next import BusType
from dbus_next.errors import DBusError
from antidrift.config import Config
from antidrift.evaluate import evaluate
from argparse import Namespace
from rich import print
from antidrift.daemon import IFACE, OPATH, BUS_NAME
async def get_dbus_interface():
try:
bus = await MessageBus(bus_type=BusType.SESSION).connect()
introspection = await bus.introspect(BUS_NAME, OPATH)
proxy_obj = bus.get_proxy_object(BUS_NAME, OPATH, introspection)
return proxy_obj.get_interface(IFACE)
except DBusError:
return None
async def run(args: Namespace, config: Config):
if args.evaluate:
evaluate(config)
return
interface = await get_dbus_interface()
reply = "🟡 ad daemon active but no command"
if interface is None:
reply = "🔴 ad inactive"
elif args.start:
reply = await interface.call_start(args.start)
elif args.stop:
reply = await interface.call_stop()
elif args.pause:
reply = await interface.call_pause()
elif args.block is not None:
reply = await interface.call_block(args.block)
elif args.intention is not None:
reply = await interface.call_intention(args.intention)
elif args.unpause:
reply = await interface.call_unpause()
elif args.schedule:
reply = await interface.call_schedule(args.schedule)
elif args.tailf:
tailf(config)
elif args.status:
reply = await interface.call_status()
print(reply)
def tailf(config):
with open(config.daemon_log_file, "r") as f:
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.1)
else:
print(line.strip())

View File

@@ -1,68 +0,0 @@
import os
import yaml
from pathlib import Path
from typing import List
from pydantic import BaseModel
class Block(BaseModel):
name: str
keywords: List[str]
kill: bool = False
delay: int = 0
class Config:
extra = "forbid"
class Config(BaseModel):
blackblocks: List[Block]
whiteblocks: List[Block]
window_log_file: Path = Path("~/tmp/antidrift/history.log")
daemon_log_file: Path = Path()
client_log_file: Path = Path()
config_file: Path = Path()
polling_cycle_ms: int = 2000
enforce_delay_ms: int = 5000
class Config:
extra = "forbid"
@classmethod
def load(cls, config_file: str) -> Config:
config_file = os.path.expanduser(config_file)
with open(config_file, "r") as f:
config_dict = yaml.safe_load(f)
config = cls(**config_dict)
config.config_file = Path(config_file)
# Expand the paths for the log files
config.window_log_file = Path(
os.path.expanduser(config.window_log_file))
config.daemon_log_file = Path(
os.path.expanduser(config.daemon_log_file))
config.client_log_file = Path(
os.path.expanduser(config.client_log_file))
return config
def save(self) -> None:
config_file = self.config_file
config_dict = self.dict()
# convert Path objects to strings
for key, value in config_dict.items():
if isinstance(value, Path):
config_dict[key] = str(value)
with open(config_file, "w") as f:
yaml.safe_dump(config_dict, f)
class State(BaseModel):
active_blackblocks: List[Block] = []
active_whiteblocks: List[Block] = []
inactive_blackblocks: List[Block] = []
pause: bool = False
intention: str = ""
class Config:
extra = "forbid"

View File

@@ -1,313 +0,0 @@
from datetime import datetime
import csv
import logging
import os
import pwd
import re
import asyncio
import antidrift.xwindow as xwindow
from antidrift.xwindow import XWindow
from antidrift.config import Config, State, Block
from antidrift.auth import AuthExternal
from dbus_next.aio import MessageBus
from dbus_next.service import ServiceInterface, method
from dbus_next import BusType
BUS_NAME = "com.antidrift"
IFACE = "com.antidrift"
OPATH = "/com/antidrift"
s = "no pyflakes warning"
class AntiDriftDaemon(ServiceInterface):
def __init__(self, config: Config):
super().__init__(IFACE)
self.config = config
self.reset_block_state()
self.enforce_count = 0
self.enforce_value = int(
config.enforce_delay_ms / config.polling_cycle_ms)
async def init_bus(self):
"""
We are switching the effective UID to the target user's UID in order to
connect to the D-Bus session bus with the correct permissions and
authentication. Once the D-Bus connection is established, we restore
the original effective UID to maintain the appropriate privilege
levels.
"""
user_name = os.environ.get(
"SUDO_USER", pwd.getpwuid(os.getuid()).pw_name)
user_uid = pwd.getpwnam(user_name).pw_uid
euid = os.geteuid()
os.seteuid(user_uid)
auth = AuthExternal(user_uid)
bus_address = f"unix:path=/run/user/{user_uid}/bus"
bus = MessageBus(bus_address=bus_address,
bus_type=BusType.SESSION, auth=auth)
await bus.connect()
bus.export(OPATH, self)
await bus.request_name(BUS_NAME)
os.seteuid(euid)
return bus
async def run(self, debug: bool = False):
_ = await self.init_bus()
async def _enforce():
while True:
if self.state.pause is True:
await self.enforce_pause()
else:
await self.enforce()
await asyncio.sleep(self.config.polling_cycle_ms / 1000)
async def _log():
while True:
if self.state.pause is False:
self.log_window()
await asyncio.sleep(60) # Sleep for 60 seconds
# Start _enforce and _log as background tasks
asyncio.create_task(_enforce())
asyncio.create_task(_log())
xwindow.notify("✅ Antidrift running.")
stop = asyncio.Event()
await stop.wait()
def reset_block_state(self):
self.state = State(
active_blackblocks=self.config.blackblocks,
active_whiteblocks=[],
inactive_blackblocks=[],
)
@method()
def start(self, whiteblocks: "as") -> "s":
self.reset_block_state()
all_whiteblocks = {wb.name: wb for wb in self.config.whiteblocks}
success_wbs, fail_blocks = [], []
for block_name in whiteblocks:
if block_name in all_whiteblocks:
self.state.active_whiteblocks.append(
all_whiteblocks[block_name])
success_wbs.append(block_name)
else:
fail_blocks.append(block_name)
if success_wbs:
wbs = ", ".join(success_wbs)
r = f"Start whiteblocks [sky_blue3]{wbs}[/sky_blue3]."
logging.info(r)
else:
r = "No whiteblocks started."
if fail_blocks:
m = f"No whiteblocks [red3]{', '.join(fail_blocks)}[/red3]."
logging.warning(m)
return r
@method()
def schedule(self, blackblock_name: "s") -> "s":
"""Schedule blackblock based if it has a non-zero timeout value."""
all_blackblocks = {bb.name: bb for bb in self.config.blackblocks}
if blackblock_name not in all_blackblocks:
m = f"No blackblock [red3]{blackblock_name}[/red3]."
logging.warning(m)
return m
blackblock = all_blackblocks[blackblock_name]
if blackblock.delay == 0:
m = f"Blackblock [red3]{blackblock_name}[/red3] cannot be scheduled without delay."
logging.warning(m)
return m
def allow():
self.allow_blackblock(blackblock)
delay_sec = blackblock.delay * 60
loop = asyncio.get_event_loop()
loop.call_later(delay_sec, allow)
m = f"Scheduled [sky_blue3]{blackblock.name}[/sky_blue3] in {blackblock.delay} minutes."
logging.info(m)
return m
@method()
def stop(self) -> "s":
self.reset_block_state()
m = "Blacklist only mode."
logging.info(m)
return m
@method()
def pause(self) -> "s":
self.state.pause = True
m = "Antidrift paused."
logging.info(m)
return m
@method()
def unpause(self) -> "s":
self.state.pause = False
m = "Antidrift unpaused."
logging.info(m)
return m
@method()
def intention(self, intention: "s") -> "s":
self.state.intention = intention
m = f"Antidrift intention set to '{intention}'."
logging.info(m)
return m
@method()
def block(self, block: "s") -> "s":
# self.state.intention = intention
if self.state.active_blackblocks and \
block not in self.state.active_blackblocks[0].keywords:
self.state.active_blackblocks[0].keywords.append(block)
self.config.save()
m = f"✅Antidrift add block '{block}'."
logging.info(m)
return m
return f"⚠️ '{block}' not added."
@method()
def status(self) -> "s":
white_active = bool(self.state.active_whiteblocks)
black_active = bool(self.state.active_blackblocks)
m = "🟢 ad "
inactive_bbs = " ".join(
map(lambda b: "-" + b.name, self.state.inactive_blackblocks)
)
if self.state.pause is True:
return "🟡 ad paused"
match (white_active, black_active):
case (True, _):
m += "wb: "
m += " ".join(map(lambda b: b.name,
self.state.active_whiteblocks))
if inactive_bbs:
m += " "
m += inactive_bbs
case (False, True):
m += "bb"
if inactive_bbs:
m += ": "
m += inactive_bbs
case _:
m = "inactive"
if self.state.intention:
m += f" 🎯 {self.state.intention}"
return m
def allow_blackblock(self, blackblock: Block):
if blackblock in self.state.active_blackblocks:
self.state.active_blackblocks.remove(blackblock)
if blackblock not in self.state.inactive_blackblocks:
self.state.inactive_blackblocks.append(blackblock)
m = f"Blackblock [sky_blue3]{blackblock.name}[/sky_blue3] is now allowed."
logging.info(m)
def log_window(self):
window = XWindow()
utc_timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Remove time strings and numbers with decimal from window.name
window_name = re.sub(r"\b\d\d:\d\d\b", "", window.name)
window_name = re.sub(r"\b\d+.\d\d\b", "", window_name)
window_name = re.sub(r"[+-]\d+.\d+%", "", window_name)
intention = self.state.intention
log_line = [utc_timestamp, window_name, window.cls, intention]
with self.config.window_log_file.open("a", newline="") as f:
writer = csv.writer(f)
writer.writerow(log_line)
async def enforce_pause(self):
if XWindow().is_active() is False:
return
xwindow.notify("⚠️ No active windows during pause!")
for _ in range(8):
await asyncio.sleep(1)
window = XWindow()
if not window.is_active() or self.state.pause is False:
xwindow.notify("✅ Thanks.")
return
window = XWindow()
if window.is_active():
window.minimize()
async def enforce(self):
if not window_is_blocked(self.state):
return
delay = int(self.config.enforce_delay_ms / 1000)
for i in range(delay, 0, -1):
await asyncio.sleep(1)
xwindow.notify(f"⚠️ AntiDrift will minimize in {i}s.")
if not window_is_blocked(self.state, silent=True):
xwindow.notify("✅ We are gucci again.")
return
window = XWindow()
xwindow.notify(f"⛔ Minimize {window.name[:30]}.")
window.minimize()
def window_is_blocked(state: State, silent: bool = False) -> bool:
blackblocks = state.active_blackblocks
whiteblocks = state.active_whiteblocks
window = XWindow()
if not window.keywords:
return False
def keyword_matches_window(keyword: str, window: XWindow):
if keyword.startswith("/") and keyword.endswith("/"):
try:
r = re.compile(keyword[1:-1], re.IGNORECASE)
if r.findall(window.name):
return True
else:
return False
except re.error:
m = f"Invalid regex [red3]{keyword}[/red3]."
logging.warning(m)
return False
else:
if k in window.keywords:
return True
return False
for b in blackblocks:
for k in b.keywords:
if keyword_matches_window(k, window) and b.kill:
window.kill()
xwindow.notify(f"⛔ Kill for {k} on {b.name}.")
return True
elif keyword_matches_window(k, window):
if not silent:
xwindow.notify(f"{window.name[:30]} blocked by {b.name}.")
logging.warning(
f"[red]{window.name[:50]}[/red] "
f"blocked by [red]{b.name}[/red]."
)
return True
if not whiteblocks:
if not silent:
logging.debug(" All non-blackblock windows are allowed.")
return False
for w in whiteblocks:
for k in w.keywords:
if keyword_matches_window(k, window):
if not silent:
logging.debug(
f"[pale_green3]{window.name[:30]}[/pale_green3] "
f"allowed by [sky_blue3]{w.name}[/sky_blue3]."
)
return False
if not silent:
xwindow.notify(f"'{window.name[:30]}' not on any whiteblock.")
return True

View File

@@ -1,193 +0,0 @@
import csv
import keyring
import requests
import json
import antidrift.xwindow as xwindow
from antidrift.config import Config
from collections import defaultdict
from datetime import datetime, timedelta
from dataclasses import dataclass
from functools import lru_cache
from typing import List, Optional
@dataclass
class Datapoint:
timestamp: datetime
title: str
tool: str
intention: str
@dataclass
class Evaluation:
level: str
reason: str
def filter_today(datapoints: List[Datapoint]) -> List[Datapoint]:
today = datetime.now().date()
return [d for d in datapoints if d.timestamp.date() == today]
def filter_last_hour(datapoints: List[Datapoint]) -> List[Datapoint]:
one_hour_ago = datetime.now() - timedelta(minutes=50)
return [d for d in datapoints if d.timestamp >= one_hour_ago]
def evaluate(config: Config):
log_file = config.window_log_file
datapoints: List[Datapoint] = []
with open(log_file, "r") as file:
reader = csv.reader(file)
for row in reader:
timestamp_str, title, tool, intention = row
if title != "":
timestamp = datetime.fromisoformat(timestamp_str)
datapoint = Datapoint(timestamp, title, tool, intention)
datapoints.append(datapoint)
datapoints = filter_last_hour(datapoints)
durations = defaultdict(timedelta)
prev_datapoint = None
prev_evaluation = None
for d in datapoints:
if d.title == "":
continue
# Get evaluation of current datapoint
result = evaluate_datapoint(d.title, d.tool, d.intention)
evaluation = parse_result(result)
# If there was a previous datapoint and evaluation
if prev_datapoint and prev_evaluation:
# Calculate time difference between current and previous datapoint
time_diff = d.timestamp - prev_datapoint.timestamp
# Add this time difference to the corresponding level's duration
durations[prev_evaluation.level] += time_diff
# Update previous datapoint and evaluation
prev_datapoint = d
prev_evaluation = evaluation
# Print durations for each level
for level, duration in durations.items():
print(f"Level: {level}, Duration: {duration}")
def parse_result(result: str) -> Optional[Evaluation]:
try:
content = json.loads(result.strip())
return Evaluation(content["level"], content["reason"])
except (ValueError, KeyError):
return None
@lru_cache
def evaluate_datapoint(title, tool, intention) -> Optional[str]:
messages = []
api_key = keyring.get_password("openai-api-key", "felixm")
prompt = get_prompt(title, tool, intention)
instruction = "You are productivity rater GPT and classify work sessions."
messages.append({"role": "system", "content": instruction})
messages.append({"role": "user", "content": prompt})
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
BASE_ENDPOINT = "https://api.openai.com/v1"
body = {"model": "gpt-4", "messages": messages}
try:
r = requests.post(
f"{BASE_ENDPOINT}/chat/completions", headers=headers, json=body, timeout=10
)
except requests.ConnectionError:
xwindow.notify("Antidrift - GPT - Connection error")
return None
except requests.Timeout:
xwindow.notify("Antidrift - GPT - Timeout")
return None
if r.status_code == 200:
response = r.json()
message_response = response["choices"][0]["message"]
return message_response["content"]
else:
xwindow.notify(f"Antidrift - GPT - Response error status code {r.status_code}")
return None
def get_prompt(title: str, tool: str, intention: str) -> str:
return f"""
Rate how well that title and tool matches the intention.
Use one of the following levels:
deep work, shallow work, good media, bad media, inappropriate
Return your response as JSON object with the attributes 'level' and 'reason'.
Adult or other inappropriate NSFW content always scores 'inappropriate'.
Examples:
Intention: Work on coding.
Tool: VS Code
Title: main.py - GoalGuard - Code - OSS
Response:
{{
"level": "deep work",
"reason": "The user uses VS code to work on coding."
}}
Intention: Watch educational video.
Tool: Firefox
Title: World's hardest jigsaw puzzle - YouTube - Mozilla Firefox
Response:
{{
"level": "good media",
"reason": "The user does the desired activity, and it seems educational."
}}
Intention: no intention
Tool: Firefox
Title: Reddit - Mozilla Firefox
Response:
{{
"level": "bad media",
"reason": "The user does not have an intention and wastes time on reddit."
}}
Intention: Watch educational video.
Tool: Firefox
Title: 8tube.com - Mozilla Firefox Private Browsing
Response:
{{
"level": "inapproriate",
"reason": "The user consumes adult content."
}}
Intention: no intention
Tool: Firefox
Title: Amazing Marvin - Daily Tasks — Mozilla Firefox
Response:
{{
"level": "shallow work",
"reason": "The user works on their task list but does not engage in deep work."
}}
Intention: {intention}
Tool: {tool}
Title: {title}
Response:
"""

View File

@@ -1,59 +0,0 @@
import os
import re
import pwd
import subprocess
import logging
class XWindow:
def __init__(self):
self.window = self._run(["getactivewindow"])
if not self.window:
self.name = ""
self.cls = ""
self.pid = ""
else:
self.name = self._run(["getwindowname", self.window])
self.cls = self._run(["getwindowclassname", self.window])
self.pid = self._run(["getwindowpid", self.window])
self.keywords = list(re.findall(r"\w+", self.name.lower()))
def __repr__(self):
return (
f"<XWindow '{self.name[:20]}' '{self.cls[:20]}' active: {self.is_active()}>"
)
def _run(self, cmd) -> str:
cmd = ["xdotool"] + cmd
p = subprocess.run(cmd, capture_output=True)
if p.returncode != 0:
return ""
return p.stdout.decode().strip()
def minimize(self):
self._run(["windowminimize", self.window])
def quit(self):
self._run(["windowquit", self.window])
def kill(self):
self._run(["windowkill", self.window])
def is_active(self) -> bool:
current_desktop = self._run(["get_desktop"])
window_desktop = self._run(["get_desktop_for_window", self.window])
return True if self.name and current_desktop == window_desktop else False
def notify(message: str) -> None:
"""Notify user via the Xorg notify-send command."""
logging.debug(f"{message} - [grey]notify[/grey]")
env = dict(os.environ)
user = env.get("SUDO_USER", None)
if user is None:
cmd = ["notify-send", message]
else:
uid = pwd.getpwnam(user)[2]
env["DBUS_SESSION_BUS_ADDRESS"] = f"unix:path=/run/user/{uid}/bus"
cmd = ["runuser", "-m", "-u", user, "notify-send", message]
subprocess.run(cmd, env=env)

132
main.py
View File

@@ -1,132 +0,0 @@
import logging
import shutil
import sys
import os
import subprocess
import argparse
import psutil
import asyncio
from rich.logging import RichHandler
from pathlib import Path
import antidrift.client
from antidrift.daemon import AntiDriftDaemon
from antidrift.config import Config
def get_args():
parser = argparse.ArgumentParser(description="AntiDrift CLI.")
parser.add_argument("--daemon", action="store_true", help="run daemon")
parser.add_argument("--evaluate", action="store_true", help="evaluate day")
parser.add_argument(
"--intention", metavar="intention", help="set intention", default=None, type=str
)
parser.add_argument(
"--block", metavar="block", help="add to block", default=None, type=str
)
parser.add_argument("--pause", action="store_true", help="pause antidrift")
parser.add_argument("--schedule", metavar="blackblock", help="schedule blackblock")
parser.add_argument(
"--start", metavar="whiteblock", nargs="+", help="start whiteblocks"
)
parser.add_argument("--status", action="store_true", help="get status from daemon")
parser.add_argument("--stop", action="store_true", help="stop session")
parser.add_argument("--tailf", action="store_true", help="tail -f log file")
parser.add_argument("--unpause", action="store_true", help="unpause antidrift")
args = parser.parse_args()
return args
def init_logging(log_file: Path, dev_mode: bool = False):
class DuplicateFilter(logging.Filter):
def filter(self, record) -> bool:
current_log = (record.module, record.levelno, record.msg)
if current_log != getattr(self, "last_log", None):
self.last_log = current_log
return True
return False
if dev_mode:
format_str = "%(message)s" # RichHandler will handle the formatting
logging.basicConfig(
level=logging.DEBUG,
format=format_str,
datefmt="%a %H:%M:%S",
handlers=[RichHandler(rich_tracebacks=True, markup=True)],
)
else:
format_str = (
"[bold pale_green3]%(asctime)s[/bold pale_green3] | "
"[light_steel_blue]%(levelname)-8s[/light_steel_blue] | "
"%(message)s"
)
logging.basicConfig(
filename=log_file,
format=format_str,
datefmt="%a %H:%M:%S",
encoding="utf-8",
level=logging.DEBUG,
)
logger = logging.getLogger()
logger.addFilter(DuplicateFilter())
def check_for_xdotool():
"""Check if xdotool is in path and exit if not"""
result = shutil.which("xdotool")
if not result:
logging.critical("Please install xdotool")
sys.exit(1)
def kill_existing_antidrift():
current_pid = os.getpid()
for proc in psutil.process_iter(["pid", "name", "exe"]):
if (
proc.info["name"] == "/usr/bin/antidrift"
or proc.info["exe"] == "/usr/bin/antidrift"
):
if proc.info["pid"] == current_pid:
continue # don't this process
try:
proc.kill()
except psutil.AccessDenied:
pass
except psutil.NoSuchProcess:
pass
def main_daemon():
if os.geteuid() == 0:
newpid = os.fork()
if newpid == 0:
config = Config.load(os.path.expanduser("~/.config/antidrift.yaml"))
init_logging(config.daemon_log_file)
daemon = AntiDriftDaemon(config)
asyncio.run(daemon.run())
else:
if sys.argv[0] == "antidrift":
kill_existing_antidrift()
cmd = ["sudo", "antidrift", "--daemon"]
subprocess.Popen(cmd)
else:
config = Config.load(os.path.expanduser("~/.config/antidrift.yaml"))
init_logging(config.daemon_log_file, dev_mode=True)
daemon = AntiDriftDaemon(config)
asyncio.run(daemon.run(debug=True))
def main() -> None:
"""Main routine that dispatches to client or daemon"""
check_for_xdotool()
args = get_args()
if args.daemon:
main_daemon()
else:
config = Config.load(os.path.expanduser("~/.config/antidrift.yaml"))
asyncio.run(antidrift.client.run(args, config))
if __name__ == "__main__":
main()

235
src/main.rs Normal file
View File

@@ -0,0 +1,235 @@
use anyhow::Result;
use std::collections::HashMap;
use std::io::stdout;
use std::rc::Rc;
use std::time::{Duration, Instant};
mod window;
use ratatui::{
crossterm::{
event::{self, Event, KeyCode},
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
ExecutableCommand,
},
prelude::*,
widgets::*,
};
#[derive(PartialEq)]
enum State {
InputIntention,
InputDuration,
InProgress,
ShouldQuit,
}
struct App {
state: State,
user_intention: String,
user_duration_str: String,
user_duration: Duration,
current_window_title: Rc<String>,
current_window_time: Instant,
session_start: Instant,
session_stats: HashMap<Rc<String>, Duration>,
last_tick_50ms: Instant,
last_tick_1s: Instant,
}
impl App {
fn new() -> Self {
let window_title = window::get_title_clean();
App {
state: State::InputIntention,
user_intention: String::new(),
user_duration_str: String::new(),
user_duration: Duration::new(0, 0),
current_window_title: window_title.into(),
current_window_time: Instant::now(),
session_start: Instant::now(),
session_stats: HashMap::new(),
last_tick_50ms: Instant::now(),
last_tick_1s: Instant::now(),
}
}
fn handle_ticks(&mut self) {
if self.last_tick_50ms.elapsed() >= Duration::from_millis(50) {
self.tick_50ms();
}
if self.last_tick_1s.elapsed() >= Duration::from_secs(1) {
self.tick_1s();
}
}
fn to_in_progress(&mut self) {
let Ok(user_duration) = self.user_duration_str.parse::<u64>() else {
// TODO: Print error to the user
return;
};
self.user_duration = Duration::from_secs(user_duration * 60);
self.state = State::InProgress;
self.current_window_time = Instant::now();
self.session_start = self.current_window_time;
self.session_stats = HashMap::new();
}
fn tick_50ms(&mut self) {
match self.state {
State::InputIntention | State::InputDuration => {
window::minimize_other("kitty");
}
State::InProgress => {
update_session_stats(self);
}
State::ShouldQuit => {},
}
self.last_tick_50ms = Instant::now();
}
fn tick_1s(&mut self) {
self.last_tick_1s = Instant::now();
}
fn timeout(&self) -> Duration {
Duration::from_millis(50).saturating_sub(self.last_tick_50ms.elapsed())
}
}
fn duration_as_str(duration: &Duration) -> String {
format!(
"{:3}:{:02}",
duration.as_secs() / 60,
duration.as_secs() % 60
)
}
fn session_stats_to_lines(session_stats: &HashMap<Rc<String>, Duration>) -> Vec<Line> {
let mut stats: Vec<_> = session_stats
.iter()
.map(|(title, duration)| (title.clone(), *duration))
.collect();
stats.sort_by(|a, b| b.1.cmp(&a.1));
stats
.iter()
.map(|(title, duration)| {
Line::from(Span::raw(format!(
"{}: {}",
duration_as_str(&duration),
title
)))
})
.collect()
}
fn main() -> Result<()> {
enable_raw_mode()?;
stdout().execute(EnterAlternateScreen)?;
let mut terminal = Terminal::new(CrosstermBackend::new(stdout()))?;
let mut app = App::new();
while app.state != State::ShouldQuit {
terminal.draw(|frame| ui(frame, &app))?;
handle_events(&mut app)?;
app.handle_ticks();
}
disable_raw_mode()?;
stdout().execute(LeaveAlternateScreen)?;
Ok(())
}
fn handle_events(app: &mut App) -> Result<()> {
if !event::poll(app.timeout())? {
return Ok(());
}
let Event::Key(key) = event::read()? else {
return Ok(());
};
if key.kind != event::KeyEventKind::Press {
return Ok(());
}
if key.code == KeyCode::Esc {
app.state = State::ShouldQuit;
}
if app.state == State::InputIntention {
match key.code {
KeyCode::Enter => app.state = State::InputDuration,
KeyCode::Backspace => {
let _ = app.user_intention.pop();
}
KeyCode::Char(c) => {
app.user_intention.push(c);
}
_ => {}
}
} else if app.state == State::InputDuration {
match key.code {
KeyCode::Enter => app.to_in_progress(),
KeyCode::Backspace => {
let _ = app.user_duration_str.pop();
}
KeyCode::Char(c) if c.is_ascii_digit() => {
app.user_duration_str.push(c);
}
_ => {}
}
}
Ok(())
}
fn update_session_stats(app: &mut App) {
let window_title = window::get_title_clean().into();
let delta = app.current_window_time.elapsed();
if app.current_window_title != window_title || (delta > Duration::from_secs(1)) {
let entry = app
.session_stats
.entry(app.current_window_title.clone())
.or_insert_with(|| Duration::default());
*entry += app.current_window_time.elapsed();
app.current_window_time = Instant::now();
app.current_window_title = window_title;
}
}
fn ui(frame: &mut Frame, app: &App) {
let layout = Layout::vertical([
Constraint::Min(3),
Constraint::Min(3),
Constraint::Percentage(100),
]);
let [layout_intention, layout_countdown, layout_titles] = layout.areas(frame.size());
let input: Vec<Line> = vec![Line::from(Span::raw(&app.user_intention))];
frame.render_widget(
Paragraph::new(input).block(Block::bordered().title("Intention")),
layout_intention,
);
let input: Vec<Line> = if app.state == State::InProgress {
let remaining = app.user_duration.saturating_sub(app.session_start.elapsed());
let s = duration_as_str(&remaining);
vec![Line::from(Span::raw(s))]
} else {
vec![Line::from(Span::raw(&app.user_duration_str))]
};
frame.render_widget(
Paragraph::new(input).block(Block::bordered().title("Duration")),
layout_countdown,
);
let stats = session_stats_to_lines(&app.session_stats);
frame.render_widget(
Paragraph::new(stats).block(Block::bordered().title("Session")),
layout_titles,
);
}

66
src/window.rs Normal file
View File

@@ -0,0 +1,66 @@
use regex::Regex;
use std::{process::Command, process::Output, str};
pub fn get_title_clean() -> String {
let title = get_window_info().title;
let re = Regex::new(r"-?\d+([:.]\d+)+%?").unwrap();
re.replace_all(&title, "").to_string()
}
pub fn minimize_other(class: &str) {
let window_info = get_window_info();
if &window_info.class != class {
run(&format!("xdotool windowminimize {}", window_info.wid));
}
}
struct WindowInfo {
title: String,
class: String,
wid: String,
}
fn run(cmd: &str) -> Option<String> {
let output = Command::new("sh").arg("-c").arg(cmd).output();
let Ok(Output {
status,
stdout,
stderr: _,
}) = output
else {
return None;
};
if status.code() != Some(0) {
return None;
}
let Ok(output_str) = str::from_utf8(&stdout) else {
return None;
};
Some(output_str.trim().to_string())
}
fn get_window_info() -> WindowInfo {
let none = WindowInfo {
title: "none".to_string(),
class: "none".to_string(),
wid: "".to_string(),
};
let Some(wid) = run("xdotool getactivewindow") else {
return none;
};
let Some(class) = run(&format!("xdotool getwindowclassname {wid}")) else {
return none;
};
let Some(title) = run(&format!("xdotool getwindowname {wid}")) else {
return none;
};
WindowInfo { title, class, wid }
}