Compare commits
15 Commits
d97a4885cb
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| a44d52b402 | |||
| 1a89906ae8 | |||
| d84b83c6d5 | |||
| d4aa36f9b9 | |||
| 10b5511ff9 | |||
| 4fbd53efa5 | |||
| 0d50da7ceb | |||
| 288ef6a9c4 | |||
| f13dc7f10e | |||
| abad26d9d2 | |||
| 49bafa2567 | |||
| 860fe615ab | |||
| 38a03f063d | |||
| 95dcfa72d6 | |||
| 3b7d88f279 |
146
.gitignore
vendored
146
.gitignore
vendored
@@ -1,147 +1,4 @@
|
||||
config.yaml
|
||||
# ---> Python
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# ---> Rust
|
||||
# Generated by Cargo
|
||||
# will have compiled files and executables
|
||||
# Generated by Cargo will have compiled files and executables
|
||||
debug/
|
||||
target/
|
||||
|
||||
@@ -154,4 +11,3 @@ Cargo.lock
|
||||
|
||||
# MSVC Windows builds of rustc generate these, which store debugging information
|
||||
*.pdb
|
||||
|
||||
|
||||
15
Cargo.toml
Normal file
15
Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "antidrift"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.86"
|
||||
ratatui = "0.27.0"
|
||||
regex = "1.10.5"
|
||||
shellexpand = "3.1.0"
|
||||
serde = { version = "1.0", features = ["derive", "rc"] }
|
||||
serde_json = "1.0"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = { version = "0.3", features = ["winuser", "processthreadsapi"] }
|
||||
65
README.md
65
README.md
@@ -1,60 +1,13 @@
|
||||
# AntiDrift
|
||||
|
||||
Utilize your computer purposefully.
|
||||
Just my personal productivity tool. It asks me about my intention for the next
|
||||
work session. Until I have provided an intention, duration, and start a
|
||||
session, it forcefully minimizes all windows. It then records all active
|
||||
windows during the session. At the end, it allows me to rate how relevant each
|
||||
window was and calculates a session score from that.
|
||||
|
||||
## Make executable and install
|
||||
|
||||
```
|
||||
pip install pyinstaller --user
|
||||
pyinstaller --onefile antidrift.py
|
||||
sudo cp dist/antidrift /usr/bin
|
||||
```
|
||||
|
||||
## Dependencies
|
||||
|
||||
- dbus-python
|
||||
- glib
|
||||
|
||||
## Create sudoers configuration
|
||||
|
||||
Create a file `antidrift` in `/etc/sudoers.d`. This allows antidrift to run
|
||||
itself with sudo ultimately making it unkillable from the regular user.
|
||||
|
||||
```
|
||||
user hostname = (root) NOPASSWD: /usr/bin/antidrift
|
||||
```
|
||||
|
||||
## Autostart with systemd
|
||||
|
||||
- Create `~/.config/systemd/user/antidrift.service`
|
||||
- Add configuration below and save
|
||||
- Run `systemctl --user enable antidrift.service`
|
||||
|
||||
```
|
||||
[Unit]
|
||||
Description=AntiDrift
|
||||
|
||||
[Service]
|
||||
ExecStart=antidrift
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
```
|
||||
|
||||
## Autostart via desktop file
|
||||
|
||||
Create a file `antidrift.desktop` in `/etc/xdg/autostart`.
|
||||
|
||||
Add the following content to the file.
|
||||
|
||||
```
|
||||
[Desktop Entry]
|
||||
Name=AntiDrift
|
||||
Exec=antidrift
|
||||
Terminal=false
|
||||
Type=Application
|
||||
StartupNotify=false
|
||||
```
|
||||
|
||||
Your window manager will now start antidrift automatically.
|
||||
To use AntiDrift, run `cargo run --release` directly, or `cargo build --release`
|
||||
and copy the binary into your `PATH`.
|
||||
|
||||
Under Linux, we use `xdotool` to get window titles and minimize windows. Under
|
||||
Windows, we use the package `winapi` for the same functionality.
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
{
|
||||
"folders": [
|
||||
{
|
||||
"path": "."
|
||||
}
|
||||
],
|
||||
"launch": {
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python: Current File",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "${file}",
|
||||
"console": "integratedTerminal"
|
||||
},
|
||||
{
|
||||
"name": "FocusFriend Debug",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "focusfriend.py",
|
||||
"console": "integratedTerminal",
|
||||
"args": ["--debug"]
|
||||
}
|
||||
]
|
||||
},
|
||||
"settings": {
|
||||
"python.linting.pylintEnabled": true,
|
||||
"python.linting.enabled": true,
|
||||
"python.linting.pylintArgs": ["--errors-only"]
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
""" AntiDrift module init """
|
||||
@@ -1,33 +0,0 @@
|
||||
from dbus_next.auth import Authenticator, _AuthResponse
|
||||
|
||||
|
||||
class AuthExternal(Authenticator):
|
||||
"""An authenticator class for the external auth protocol for use with the
|
||||
:class:`MessageBus <dbus_next.message_bus.BaseMessageBus>`.
|
||||
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
|
||||
"""
|
||||
|
||||
def __init__(self, user_uid):
|
||||
self.user_uid = user_uid
|
||||
self.negotiate_unix_fd = False
|
||||
self.negotiating_fds = False
|
||||
|
||||
def _authentication_start(self, negotiate_unix_fd=False) -> str:
|
||||
self.negotiate_unix_fd = negotiate_unix_fd
|
||||
hex_uid = str(self.user_uid).encode().hex()
|
||||
return f"AUTH EXTERNAL {hex_uid}"
|
||||
|
||||
def _receive_line(self, line: str):
|
||||
response, args = _AuthResponse.parse(line)
|
||||
|
||||
if response is _AuthResponse.OK:
|
||||
if self.negotiate_unix_fd:
|
||||
self.negotiating_fds = True
|
||||
return "NEGOTIATE_UNIX_FD"
|
||||
else:
|
||||
return "BEGIN"
|
||||
|
||||
if response is _AuthResponse.AGREE_UNIX_FD:
|
||||
return "BEGIN"
|
||||
|
||||
raise AuthError(f"authentication failed: {response.value}: {args}")
|
||||
@@ -1,60 +0,0 @@
|
||||
import time
|
||||
from dbus_next.aio import MessageBus
|
||||
from dbus_next import BusType
|
||||
from dbus_next.errors import DBusError
|
||||
from antidrift.config import Config
|
||||
from antidrift.evaluate import evaluate
|
||||
from argparse import Namespace
|
||||
from rich import print
|
||||
from antidrift.daemon import IFACE, OPATH, BUS_NAME
|
||||
|
||||
|
||||
async def get_dbus_interface():
|
||||
try:
|
||||
bus = await MessageBus(bus_type=BusType.SESSION).connect()
|
||||
introspection = await bus.introspect(BUS_NAME, OPATH)
|
||||
proxy_obj = bus.get_proxy_object(BUS_NAME, OPATH, introspection)
|
||||
return proxy_obj.get_interface(IFACE)
|
||||
except DBusError:
|
||||
return None
|
||||
|
||||
|
||||
async def run(args: Namespace, config: Config):
|
||||
if args.evaluate:
|
||||
evaluate(config)
|
||||
return
|
||||
|
||||
interface = await get_dbus_interface()
|
||||
reply = "🟡 ad daemon active but no command"
|
||||
if interface is None:
|
||||
reply = "🔴 ad inactive"
|
||||
elif args.start:
|
||||
reply = await interface.call_start(args.start)
|
||||
elif args.stop:
|
||||
reply = await interface.call_stop()
|
||||
elif args.pause:
|
||||
reply = await interface.call_pause()
|
||||
elif args.block is not None:
|
||||
reply = await interface.call_block(args.block)
|
||||
elif args.intention is not None:
|
||||
reply = await interface.call_intention(args.intention)
|
||||
elif args.unpause:
|
||||
reply = await interface.call_unpause()
|
||||
elif args.schedule:
|
||||
reply = await interface.call_schedule(args.schedule)
|
||||
elif args.tailf:
|
||||
tailf(config)
|
||||
elif args.status:
|
||||
reply = await interface.call_status()
|
||||
print(reply)
|
||||
|
||||
|
||||
def tailf(config):
|
||||
with open(config.daemon_log_file, "r") as f:
|
||||
f.seek(0, 2)
|
||||
while True:
|
||||
line = f.readline()
|
||||
if not line:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
print(line.strip())
|
||||
@@ -1,68 +0,0 @@
|
||||
import os
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Block(BaseModel):
|
||||
name: str
|
||||
keywords: List[str]
|
||||
kill: bool = False
|
||||
delay: int = 0
|
||||
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
blackblocks: List[Block]
|
||||
whiteblocks: List[Block]
|
||||
window_log_file: Path = Path("~/tmp/antidrift/history.log")
|
||||
daemon_log_file: Path = Path()
|
||||
client_log_file: Path = Path()
|
||||
config_file: Path = Path()
|
||||
polling_cycle_ms: int = 2000
|
||||
enforce_delay_ms: int = 5000
|
||||
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
|
||||
@classmethod
|
||||
def load(cls, config_file: str) -> Config:
|
||||
config_file = os.path.expanduser(config_file)
|
||||
with open(config_file, "r") as f:
|
||||
config_dict = yaml.safe_load(f)
|
||||
config = cls(**config_dict)
|
||||
config.config_file = Path(config_file)
|
||||
# Expand the paths for the log files
|
||||
config.window_log_file = Path(
|
||||
os.path.expanduser(config.window_log_file))
|
||||
config.daemon_log_file = Path(
|
||||
os.path.expanduser(config.daemon_log_file))
|
||||
config.client_log_file = Path(
|
||||
os.path.expanduser(config.client_log_file))
|
||||
return config
|
||||
|
||||
def save(self) -> None:
|
||||
config_file = self.config_file
|
||||
config_dict = self.dict()
|
||||
|
||||
# convert Path objects to strings
|
||||
for key, value in config_dict.items():
|
||||
if isinstance(value, Path):
|
||||
config_dict[key] = str(value)
|
||||
|
||||
with open(config_file, "w") as f:
|
||||
yaml.safe_dump(config_dict, f)
|
||||
|
||||
|
||||
class State(BaseModel):
|
||||
active_blackblocks: List[Block] = []
|
||||
active_whiteblocks: List[Block] = []
|
||||
inactive_blackblocks: List[Block] = []
|
||||
pause: bool = False
|
||||
intention: str = ""
|
||||
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
@@ -1,313 +0,0 @@
|
||||
from datetime import datetime
|
||||
import csv
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
import asyncio
|
||||
|
||||
import antidrift.xwindow as xwindow
|
||||
from antidrift.xwindow import XWindow
|
||||
from antidrift.config import Config, State, Block
|
||||
from antidrift.auth import AuthExternal
|
||||
|
||||
from dbus_next.aio import MessageBus
|
||||
from dbus_next.service import ServiceInterface, method
|
||||
from dbus_next import BusType
|
||||
|
||||
BUS_NAME = "com.antidrift"
|
||||
IFACE = "com.antidrift"
|
||||
OPATH = "/com/antidrift"
|
||||
s = "no pyflakes warning"
|
||||
|
||||
|
||||
class AntiDriftDaemon(ServiceInterface):
|
||||
def __init__(self, config: Config):
|
||||
super().__init__(IFACE)
|
||||
self.config = config
|
||||
self.reset_block_state()
|
||||
self.enforce_count = 0
|
||||
self.enforce_value = int(
|
||||
config.enforce_delay_ms / config.polling_cycle_ms)
|
||||
|
||||
async def init_bus(self):
|
||||
"""
|
||||
We are switching the effective UID to the target user's UID in order to
|
||||
connect to the D-Bus session bus with the correct permissions and
|
||||
authentication. Once the D-Bus connection is established, we restore
|
||||
the original effective UID to maintain the appropriate privilege
|
||||
levels.
|
||||
"""
|
||||
user_name = os.environ.get(
|
||||
"SUDO_USER", pwd.getpwuid(os.getuid()).pw_name)
|
||||
user_uid = pwd.getpwnam(user_name).pw_uid
|
||||
euid = os.geteuid()
|
||||
os.seteuid(user_uid)
|
||||
auth = AuthExternal(user_uid)
|
||||
bus_address = f"unix:path=/run/user/{user_uid}/bus"
|
||||
bus = MessageBus(bus_address=bus_address,
|
||||
bus_type=BusType.SESSION, auth=auth)
|
||||
await bus.connect()
|
||||
bus.export(OPATH, self)
|
||||
await bus.request_name(BUS_NAME)
|
||||
os.seteuid(euid)
|
||||
return bus
|
||||
|
||||
async def run(self, debug: bool = False):
|
||||
_ = await self.init_bus()
|
||||
|
||||
async def _enforce():
|
||||
while True:
|
||||
if self.state.pause is True:
|
||||
await self.enforce_pause()
|
||||
else:
|
||||
await self.enforce()
|
||||
await asyncio.sleep(self.config.polling_cycle_ms / 1000)
|
||||
|
||||
async def _log():
|
||||
while True:
|
||||
if self.state.pause is False:
|
||||
self.log_window()
|
||||
await asyncio.sleep(60) # Sleep for 60 seconds
|
||||
|
||||
# Start _enforce and _log as background tasks
|
||||
asyncio.create_task(_enforce())
|
||||
asyncio.create_task(_log())
|
||||
|
||||
xwindow.notify("✅ Antidrift running.")
|
||||
stop = asyncio.Event()
|
||||
await stop.wait()
|
||||
|
||||
def reset_block_state(self):
|
||||
self.state = State(
|
||||
active_blackblocks=self.config.blackblocks,
|
||||
active_whiteblocks=[],
|
||||
inactive_blackblocks=[],
|
||||
)
|
||||
|
||||
@method()
|
||||
def start(self, whiteblocks: "as") -> "s":
|
||||
self.reset_block_state()
|
||||
all_whiteblocks = {wb.name: wb for wb in self.config.whiteblocks}
|
||||
success_wbs, fail_blocks = [], []
|
||||
for block_name in whiteblocks:
|
||||
if block_name in all_whiteblocks:
|
||||
self.state.active_whiteblocks.append(
|
||||
all_whiteblocks[block_name])
|
||||
success_wbs.append(block_name)
|
||||
else:
|
||||
fail_blocks.append(block_name)
|
||||
if success_wbs:
|
||||
wbs = ", ".join(success_wbs)
|
||||
r = f"Start whiteblocks [sky_blue3]{wbs}[/sky_blue3]."
|
||||
logging.info(r)
|
||||
else:
|
||||
r = "No whiteblocks started."
|
||||
if fail_blocks:
|
||||
m = f"No whiteblocks [red3]{', '.join(fail_blocks)}[/red3]."
|
||||
logging.warning(m)
|
||||
return r
|
||||
|
||||
@method()
|
||||
def schedule(self, blackblock_name: "s") -> "s":
|
||||
"""Schedule blackblock based if it has a non-zero timeout value."""
|
||||
all_blackblocks = {bb.name: bb for bb in self.config.blackblocks}
|
||||
if blackblock_name not in all_blackblocks:
|
||||
m = f"No blackblock [red3]{blackblock_name}[/red3]."
|
||||
logging.warning(m)
|
||||
return m
|
||||
|
||||
blackblock = all_blackblocks[blackblock_name]
|
||||
if blackblock.delay == 0:
|
||||
m = f"Blackblock [red3]{blackblock_name}[/red3] cannot be scheduled without delay."
|
||||
logging.warning(m)
|
||||
return m
|
||||
|
||||
def allow():
|
||||
self.allow_blackblock(blackblock)
|
||||
|
||||
delay_sec = blackblock.delay * 60
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.call_later(delay_sec, allow)
|
||||
m = f"Scheduled [sky_blue3]{blackblock.name}[/sky_blue3] in {blackblock.delay} minutes."
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
@method()
|
||||
def stop(self) -> "s":
|
||||
self.reset_block_state()
|
||||
m = "Blacklist only mode."
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
@method()
|
||||
def pause(self) -> "s":
|
||||
self.state.pause = True
|
||||
m = "Antidrift paused."
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
@method()
|
||||
def unpause(self) -> "s":
|
||||
self.state.pause = False
|
||||
m = "Antidrift unpaused."
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
@method()
|
||||
def intention(self, intention: "s") -> "s":
|
||||
self.state.intention = intention
|
||||
m = f"Antidrift intention set to '{intention}'."
|
||||
logging.info(m)
|
||||
return m
|
||||
|
||||
@method()
|
||||
def block(self, block: "s") -> "s":
|
||||
# self.state.intention = intention
|
||||
if self.state.active_blackblocks and \
|
||||
block not in self.state.active_blackblocks[0].keywords:
|
||||
self.state.active_blackblocks[0].keywords.append(block)
|
||||
self.config.save()
|
||||
m = f"✅Antidrift add block '{block}'."
|
||||
logging.info(m)
|
||||
return m
|
||||
return f"⚠️ '{block}' not added."
|
||||
|
||||
@method()
|
||||
def status(self) -> "s":
|
||||
white_active = bool(self.state.active_whiteblocks)
|
||||
black_active = bool(self.state.active_blackblocks)
|
||||
m = "🟢 ad "
|
||||
inactive_bbs = " ".join(
|
||||
map(lambda b: "-" + b.name, self.state.inactive_blackblocks)
|
||||
)
|
||||
if self.state.pause is True:
|
||||
return "🟡 ad paused"
|
||||
match (white_active, black_active):
|
||||
case (True, _):
|
||||
m += "wb: "
|
||||
m += " ".join(map(lambda b: b.name,
|
||||
self.state.active_whiteblocks))
|
||||
if inactive_bbs:
|
||||
m += " "
|
||||
m += inactive_bbs
|
||||
case (False, True):
|
||||
m += "bb"
|
||||
if inactive_bbs:
|
||||
m += ": "
|
||||
m += inactive_bbs
|
||||
case _:
|
||||
m = "inactive"
|
||||
if self.state.intention:
|
||||
m += f" 🎯 {self.state.intention}"
|
||||
return m
|
||||
|
||||
def allow_blackblock(self, blackblock: Block):
|
||||
if blackblock in self.state.active_blackblocks:
|
||||
self.state.active_blackblocks.remove(blackblock)
|
||||
if blackblock not in self.state.inactive_blackblocks:
|
||||
self.state.inactive_blackblocks.append(blackblock)
|
||||
m = f"Blackblock [sky_blue3]{blackblock.name}[/sky_blue3] is now allowed."
|
||||
logging.info(m)
|
||||
|
||||
def log_window(self):
|
||||
window = XWindow()
|
||||
utc_timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
# Remove time strings and numbers with decimal from window.name
|
||||
window_name = re.sub(r"\b\d\d:\d\d\b", "", window.name)
|
||||
window_name = re.sub(r"\b\d+.\d\d\b", "", window_name)
|
||||
window_name = re.sub(r"[+-]\d+.\d+%", "", window_name)
|
||||
|
||||
intention = self.state.intention
|
||||
log_line = [utc_timestamp, window_name, window.cls, intention]
|
||||
with self.config.window_log_file.open("a", newline="") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(log_line)
|
||||
|
||||
async def enforce_pause(self):
|
||||
if XWindow().is_active() is False:
|
||||
return
|
||||
xwindow.notify("⚠️ No active windows during pause!")
|
||||
for _ in range(8):
|
||||
await asyncio.sleep(1)
|
||||
window = XWindow()
|
||||
if not window.is_active() or self.state.pause is False:
|
||||
xwindow.notify("✅ Thanks.")
|
||||
return
|
||||
|
||||
window = XWindow()
|
||||
if window.is_active():
|
||||
window.minimize()
|
||||
|
||||
async def enforce(self):
|
||||
if not window_is_blocked(self.state):
|
||||
return
|
||||
|
||||
delay = int(self.config.enforce_delay_ms / 1000)
|
||||
for i in range(delay, 0, -1):
|
||||
await asyncio.sleep(1)
|
||||
xwindow.notify(f"⚠️ AntiDrift will minimize in {i}s.")
|
||||
if not window_is_blocked(self.state, silent=True):
|
||||
xwindow.notify("✅ We are gucci again.")
|
||||
return
|
||||
window = XWindow()
|
||||
xwindow.notify(f"⛔ Minimize {window.name[:30]}.")
|
||||
window.minimize()
|
||||
|
||||
|
||||
def window_is_blocked(state: State, silent: bool = False) -> bool:
|
||||
blackblocks = state.active_blackblocks
|
||||
whiteblocks = state.active_whiteblocks
|
||||
|
||||
window = XWindow()
|
||||
if not window.keywords:
|
||||
return False
|
||||
|
||||
def keyword_matches_window(keyword: str, window: XWindow):
|
||||
if keyword.startswith("/") and keyword.endswith("/"):
|
||||
try:
|
||||
r = re.compile(keyword[1:-1], re.IGNORECASE)
|
||||
if r.findall(window.name):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except re.error:
|
||||
m = f"Invalid regex [red3]{keyword}[/red3]."
|
||||
logging.warning(m)
|
||||
return False
|
||||
else:
|
||||
if k in window.keywords:
|
||||
return True
|
||||
return False
|
||||
|
||||
for b in blackblocks:
|
||||
for k in b.keywords:
|
||||
if keyword_matches_window(k, window) and b.kill:
|
||||
window.kill()
|
||||
xwindow.notify(f"⛔ Kill for {k} on {b.name}.")
|
||||
return True
|
||||
elif keyword_matches_window(k, window):
|
||||
if not silent:
|
||||
xwindow.notify(f"{window.name[:30]} blocked by {b.name}.")
|
||||
logging.warning(
|
||||
f"[red]{window.name[:50]}[/red] "
|
||||
f"blocked by [red]{b.name}[/red]."
|
||||
)
|
||||
return True
|
||||
if not whiteblocks:
|
||||
if not silent:
|
||||
logging.debug("ℹ️ All non-blackblock windows are allowed.")
|
||||
return False
|
||||
for w in whiteblocks:
|
||||
for k in w.keywords:
|
||||
if keyword_matches_window(k, window):
|
||||
if not silent:
|
||||
logging.debug(
|
||||
f"[pale_green3]{window.name[:30]}[/pale_green3] "
|
||||
f"allowed by [sky_blue3]{w.name}[/sky_blue3]."
|
||||
)
|
||||
return False
|
||||
if not silent:
|
||||
xwindow.notify(f"'{window.name[:30]}…' not on any whiteblock.")
|
||||
return True
|
||||
@@ -1,193 +0,0 @@
|
||||
import csv
|
||||
import keyring
|
||||
import requests
|
||||
import json
|
||||
import antidrift.xwindow as xwindow
|
||||
from antidrift.config import Config
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class Datapoint:
|
||||
timestamp: datetime
|
||||
title: str
|
||||
tool: str
|
||||
intention: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Evaluation:
|
||||
level: str
|
||||
reason: str
|
||||
|
||||
|
||||
def filter_today(datapoints: List[Datapoint]) -> List[Datapoint]:
|
||||
today = datetime.now().date()
|
||||
return [d for d in datapoints if d.timestamp.date() == today]
|
||||
|
||||
|
||||
def filter_last_hour(datapoints: List[Datapoint]) -> List[Datapoint]:
|
||||
one_hour_ago = datetime.now() - timedelta(minutes=50)
|
||||
return [d for d in datapoints if d.timestamp >= one_hour_ago]
|
||||
|
||||
|
||||
def evaluate(config: Config):
|
||||
log_file = config.window_log_file
|
||||
datapoints: List[Datapoint] = []
|
||||
|
||||
with open(log_file, "r") as file:
|
||||
reader = csv.reader(file)
|
||||
for row in reader:
|
||||
timestamp_str, title, tool, intention = row
|
||||
if title != "":
|
||||
timestamp = datetime.fromisoformat(timestamp_str)
|
||||
datapoint = Datapoint(timestamp, title, tool, intention)
|
||||
datapoints.append(datapoint)
|
||||
|
||||
datapoints = filter_last_hour(datapoints)
|
||||
durations = defaultdict(timedelta)
|
||||
prev_datapoint = None
|
||||
prev_evaluation = None
|
||||
for d in datapoints:
|
||||
if d.title == "":
|
||||
continue
|
||||
|
||||
# Get evaluation of current datapoint
|
||||
result = evaluate_datapoint(d.title, d.tool, d.intention)
|
||||
evaluation = parse_result(result)
|
||||
|
||||
# If there was a previous datapoint and evaluation
|
||||
if prev_datapoint and prev_evaluation:
|
||||
# Calculate time difference between current and previous datapoint
|
||||
time_diff = d.timestamp - prev_datapoint.timestamp
|
||||
# Add this time difference to the corresponding level's duration
|
||||
durations[prev_evaluation.level] += time_diff
|
||||
|
||||
# Update previous datapoint and evaluation
|
||||
prev_datapoint = d
|
||||
prev_evaluation = evaluation
|
||||
|
||||
# Print durations for each level
|
||||
for level, duration in durations.items():
|
||||
print(f"Level: {level}, Duration: {duration}")
|
||||
|
||||
|
||||
def parse_result(result: str) -> Optional[Evaluation]:
|
||||
try:
|
||||
content = json.loads(result.strip())
|
||||
return Evaluation(content["level"], content["reason"])
|
||||
except (ValueError, KeyError):
|
||||
return None
|
||||
|
||||
|
||||
@lru_cache
|
||||
def evaluate_datapoint(title, tool, intention) -> Optional[str]:
|
||||
messages = []
|
||||
api_key = keyring.get_password("openai-api-key", "felixm")
|
||||
prompt = get_prompt(title, tool, intention)
|
||||
|
||||
instruction = "You are productivity rater GPT and classify work sessions."
|
||||
messages.append({"role": "system", "content": instruction})
|
||||
messages.append({"role": "user", "content": prompt})
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
}
|
||||
|
||||
BASE_ENDPOINT = "https://api.openai.com/v1"
|
||||
body = {"model": "gpt-4", "messages": messages}
|
||||
|
||||
try:
|
||||
r = requests.post(
|
||||
f"{BASE_ENDPOINT}/chat/completions", headers=headers, json=body, timeout=10
|
||||
)
|
||||
except requests.ConnectionError:
|
||||
xwindow.notify("Antidrift - GPT - Connection error")
|
||||
return None
|
||||
except requests.Timeout:
|
||||
xwindow.notify("Antidrift - GPT - Timeout")
|
||||
return None
|
||||
|
||||
if r.status_code == 200:
|
||||
response = r.json()
|
||||
message_response = response["choices"][0]["message"]
|
||||
return message_response["content"]
|
||||
else:
|
||||
xwindow.notify(f"Antidrift - GPT - Response error status code {r.status_code}")
|
||||
return None
|
||||
|
||||
|
||||
def get_prompt(title: str, tool: str, intention: str) -> str:
|
||||
return f"""
|
||||
Rate how well that title and tool matches the intention.
|
||||
|
||||
Use one of the following levels:
|
||||
|
||||
deep work, shallow work, good media, bad media, inappropriate
|
||||
|
||||
Return your response as JSON object with the attributes 'level' and 'reason'.
|
||||
|
||||
Adult or other inappropriate NSFW content always scores 'inappropriate'.
|
||||
|
||||
Examples:
|
||||
|
||||
Intention: Work on coding.
|
||||
Tool: VS Code
|
||||
Title: main.py - GoalGuard - Code - OSS
|
||||
|
||||
Response:
|
||||
{{
|
||||
"level": "deep work",
|
||||
"reason": "The user uses VS code to work on coding."
|
||||
}}
|
||||
|
||||
Intention: Watch educational video.
|
||||
Tool: Firefox
|
||||
Title: World's hardest jigsaw puzzle - YouTube - Mozilla Firefox
|
||||
|
||||
Response:
|
||||
{{
|
||||
"level": "good media",
|
||||
"reason": "The user does the desired activity, and it seems educational."
|
||||
}}
|
||||
|
||||
Intention: no intention
|
||||
Tool: Firefox
|
||||
Title: Reddit - Mozilla Firefox
|
||||
|
||||
Response:
|
||||
{{
|
||||
"level": "bad media",
|
||||
"reason": "The user does not have an intention and wastes time on reddit."
|
||||
}}
|
||||
|
||||
Intention: Watch educational video.
|
||||
Tool: Firefox
|
||||
Title: 8tube.com - Mozilla Firefox Private Browsing
|
||||
|
||||
Response:
|
||||
{{
|
||||
"level": "inapproriate",
|
||||
"reason": "The user consumes adult content."
|
||||
}}
|
||||
|
||||
Intention: no intention
|
||||
Tool: Firefox
|
||||
Title: Amazing Marvin - Daily Tasks — Mozilla Firefox
|
||||
|
||||
Response:
|
||||
{{
|
||||
"level": "shallow work",
|
||||
"reason": "The user works on their task list but does not engage in deep work."
|
||||
}}
|
||||
|
||||
Intention: {intention}
|
||||
Tool: {tool}
|
||||
Title: {title}
|
||||
|
||||
Response:
|
||||
"""
|
||||
@@ -1,59 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
import pwd
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
|
||||
class XWindow:
|
||||
def __init__(self):
|
||||
self.window = self._run(["getactivewindow"])
|
||||
if not self.window:
|
||||
self.name = ""
|
||||
self.cls = ""
|
||||
self.pid = ""
|
||||
else:
|
||||
self.name = self._run(["getwindowname", self.window])
|
||||
self.cls = self._run(["getwindowclassname", self.window])
|
||||
self.pid = self._run(["getwindowpid", self.window])
|
||||
self.keywords = list(re.findall(r"\w+", self.name.lower()))
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"<XWindow '{self.name[:20]}' '{self.cls[:20]}' active: {self.is_active()}>"
|
||||
)
|
||||
|
||||
def _run(self, cmd) -> str:
|
||||
cmd = ["xdotool"] + cmd
|
||||
p = subprocess.run(cmd, capture_output=True)
|
||||
if p.returncode != 0:
|
||||
return ""
|
||||
return p.stdout.decode().strip()
|
||||
|
||||
def minimize(self):
|
||||
self._run(["windowminimize", self.window])
|
||||
|
||||
def quit(self):
|
||||
self._run(["windowquit", self.window])
|
||||
|
||||
def kill(self):
|
||||
self._run(["windowkill", self.window])
|
||||
|
||||
def is_active(self) -> bool:
|
||||
current_desktop = self._run(["get_desktop"])
|
||||
window_desktop = self._run(["get_desktop_for_window", self.window])
|
||||
return True if self.name and current_desktop == window_desktop else False
|
||||
|
||||
|
||||
def notify(message: str) -> None:
|
||||
"""Notify user via the Xorg notify-send command."""
|
||||
logging.debug(f"{message} - [grey]notify[/grey]")
|
||||
env = dict(os.environ)
|
||||
user = env.get("SUDO_USER", None)
|
||||
if user is None:
|
||||
cmd = ["notify-send", message]
|
||||
else:
|
||||
uid = pwd.getpwnam(user)[2]
|
||||
env["DBUS_SESSION_BUS_ADDRESS"] = f"unix:path=/run/user/{uid}/bus"
|
||||
cmd = ["runuser", "-m", "-u", user, "notify-send", message]
|
||||
subprocess.run(cmd, env=env)
|
||||
132
main.py
132
main.py
@@ -1,132 +0,0 @@
|
||||
import logging
|
||||
import shutil
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
import argparse
|
||||
import psutil
|
||||
import asyncio
|
||||
from rich.logging import RichHandler
|
||||
from pathlib import Path
|
||||
|
||||
import antidrift.client
|
||||
from antidrift.daemon import AntiDriftDaemon
|
||||
from antidrift.config import Config
|
||||
|
||||
|
||||
def get_args():
|
||||
parser = argparse.ArgumentParser(description="AntiDrift CLI.")
|
||||
parser.add_argument("--daemon", action="store_true", help="run daemon")
|
||||
parser.add_argument("--evaluate", action="store_true", help="evaluate day")
|
||||
parser.add_argument(
|
||||
"--intention", metavar="intention", help="set intention", default=None, type=str
|
||||
)
|
||||
parser.add_argument(
|
||||
"--block", metavar="block", help="add to block", default=None, type=str
|
||||
)
|
||||
parser.add_argument("--pause", action="store_true", help="pause antidrift")
|
||||
parser.add_argument("--schedule", metavar="blackblock", help="schedule blackblock")
|
||||
parser.add_argument(
|
||||
"--start", metavar="whiteblock", nargs="+", help="start whiteblocks"
|
||||
)
|
||||
parser.add_argument("--status", action="store_true", help="get status from daemon")
|
||||
parser.add_argument("--stop", action="store_true", help="stop session")
|
||||
parser.add_argument("--tailf", action="store_true", help="tail -f log file")
|
||||
parser.add_argument("--unpause", action="store_true", help="unpause antidrift")
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
def init_logging(log_file: Path, dev_mode: bool = False):
|
||||
class DuplicateFilter(logging.Filter):
|
||||
def filter(self, record) -> bool:
|
||||
current_log = (record.module, record.levelno, record.msg)
|
||||
if current_log != getattr(self, "last_log", None):
|
||||
self.last_log = current_log
|
||||
return True
|
||||
return False
|
||||
|
||||
if dev_mode:
|
||||
format_str = "%(message)s" # RichHandler will handle the formatting
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format=format_str,
|
||||
datefmt="%a %H:%M:%S",
|
||||
handlers=[RichHandler(rich_tracebacks=True, markup=True)],
|
||||
)
|
||||
else:
|
||||
format_str = (
|
||||
"[bold pale_green3]%(asctime)s[/bold pale_green3] | "
|
||||
"[light_steel_blue]%(levelname)-8s[/light_steel_blue] | "
|
||||
"%(message)s"
|
||||
)
|
||||
logging.basicConfig(
|
||||
filename=log_file,
|
||||
format=format_str,
|
||||
datefmt="%a %H:%M:%S",
|
||||
encoding="utf-8",
|
||||
level=logging.DEBUG,
|
||||
)
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.addFilter(DuplicateFilter())
|
||||
|
||||
|
||||
def check_for_xdotool():
|
||||
"""Check if xdotool is in path and exit if not"""
|
||||
result = shutil.which("xdotool")
|
||||
if not result:
|
||||
logging.critical("Please install xdotool")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def kill_existing_antidrift():
|
||||
current_pid = os.getpid()
|
||||
for proc in psutil.process_iter(["pid", "name", "exe"]):
|
||||
if (
|
||||
proc.info["name"] == "/usr/bin/antidrift"
|
||||
or proc.info["exe"] == "/usr/bin/antidrift"
|
||||
):
|
||||
if proc.info["pid"] == current_pid:
|
||||
continue # don't this process
|
||||
try:
|
||||
proc.kill()
|
||||
except psutil.AccessDenied:
|
||||
pass
|
||||
except psutil.NoSuchProcess:
|
||||
pass
|
||||
|
||||
|
||||
def main_daemon():
|
||||
if os.geteuid() == 0:
|
||||
newpid = os.fork()
|
||||
if newpid == 0:
|
||||
config = Config.load(os.path.expanduser("~/.config/antidrift.yaml"))
|
||||
init_logging(config.daemon_log_file)
|
||||
daemon = AntiDriftDaemon(config)
|
||||
asyncio.run(daemon.run())
|
||||
else:
|
||||
if sys.argv[0] == "antidrift":
|
||||
kill_existing_antidrift()
|
||||
cmd = ["sudo", "antidrift", "--daemon"]
|
||||
subprocess.Popen(cmd)
|
||||
else:
|
||||
config = Config.load(os.path.expanduser("~/.config/antidrift.yaml"))
|
||||
init_logging(config.daemon_log_file, dev_mode=True)
|
||||
daemon = AntiDriftDaemon(config)
|
||||
asyncio.run(daemon.run(debug=True))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main routine that dispatches to client or daemon"""
|
||||
check_for_xdotool()
|
||||
args = get_args()
|
||||
if args.daemon:
|
||||
main_daemon()
|
||||
else:
|
||||
config = Config.load(os.path.expanduser("~/.config/antidrift.yaml"))
|
||||
asyncio.run(antidrift.client.run(args, config))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
20
src/constants.rs
Normal file
20
src/constants.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
pub const APP_TITLE: &str = "AntiDrift";
|
||||
pub const DEFAULT_DURATION: &str = "25";
|
||||
pub const DURATION_TITLE: &str = "Duration";
|
||||
pub const INTENTION_TITLE: &str = "Intention";
|
||||
pub const PAUSED: &str = "paused";
|
||||
pub const PREVIOUS_SESSIONS_TITLE: &str = "Previous Sessions";
|
||||
pub const PROVIDE_INTENTION: &str = "Provide intention! ";
|
||||
pub const PROVIDE_VALID_DURATION: &str = "Provide valid duration in minutes! ";
|
||||
pub const RATE_TITLES: &str = "Press 1, 2, 3 to rate titles!";
|
||||
pub const READY_TO_START: &str = "Ready to start next session.";
|
||||
pub const SESSION_IN_PROGRESS: &str = "Session In-Progress";
|
||||
pub const SESSION_PAUSED: &str = "Session is paused. Unpause with 'p'.";
|
||||
pub const SESSION_STATS_TITLE: &str = "Session Stats";
|
||||
pub const STATUS_FILE: &str = "~/.antidrift_status";
|
||||
pub const HISTORY_FILE: &str = "~/.antidrift_history.jsonl";
|
||||
pub const STATUS_TITLE: &str = "Status";
|
||||
pub const STATUS_CONFIGURE: &str = "🔄 antidrift configure session";
|
||||
pub const STATUS_PAUSED: &str = "⏸ antidrift paused";
|
||||
pub const STATUS_RATE_SESSION: &str = "☑ rate antidrift session!";
|
||||
pub const STATUS_QUIT: &str = "antidrift shutting down";
|
||||
560
src/main.rs
Normal file
560
src/main.rs
Normal file
@@ -0,0 +1,560 @@
|
||||
use anyhow::Result;
|
||||
use serde::{Serialize, Serializer};
|
||||
use shellexpand;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{stdout, Write};
|
||||
use std::rc::Rc;
|
||||
use std::time::{Duration, Instant}; // <--- Add this
|
||||
mod constants;
|
||||
mod window;
|
||||
|
||||
use ratatui::{
|
||||
crossterm::{
|
||||
event::{self, Event, KeyCode},
|
||||
execute,
|
||||
terminal::{
|
||||
disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen, SetTitle,
|
||||
},
|
||||
ExecutableCommand,
|
||||
},
|
||||
prelude::*,
|
||||
style::Color,
|
||||
widgets::*,
|
||||
};
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum State {
|
||||
InputIntention,
|
||||
InputDuration,
|
||||
InProgress,
|
||||
Paused,
|
||||
End,
|
||||
ShouldQuit,
|
||||
}
|
||||
|
||||
fn serialize_duration<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_u64(duration.as_secs())
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SessionResult {
|
||||
intention: String,
|
||||
#[serde(serialize_with = "serialize_duration")]
|
||||
duration: Duration,
|
||||
session_ratings: Vec<SessionRating>,
|
||||
rating: u8,
|
||||
rating_f64: f64,
|
||||
}
|
||||
|
||||
impl SessionResult {
|
||||
fn rate(&mut self) {
|
||||
let mut rating = 0_f64;
|
||||
let total_duration = self
|
||||
.session_ratings
|
||||
.iter()
|
||||
.map(|r| r.duration)
|
||||
.fold(Duration::ZERO, |acc, dur| acc.saturating_add(dur));
|
||||
|
||||
for session_rating in &self.session_ratings {
|
||||
let ratio: f64 = session_rating.duration.as_secs_f64() / total_duration.as_secs_f64();
|
||||
rating += (session_rating.rating as f64) * ratio;
|
||||
}
|
||||
|
||||
self.rating_f64 = rating;
|
||||
|
||||
if rating > 2.5 {
|
||||
self.rating = 3;
|
||||
} else if rating > 2.0 {
|
||||
self.rating = 2;
|
||||
} else if rating > 1.0 {
|
||||
self.rating = 1;
|
||||
} else {
|
||||
self.rating = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
struct SessionRating {
|
||||
window_title: Rc<String>,
|
||||
duration: Duration,
|
||||
rating: u8,
|
||||
}
|
||||
|
||||
struct App {
|
||||
state: State,
|
||||
user_intention: String,
|
||||
user_duration_str: String,
|
||||
user_duration: Duration,
|
||||
current_window_title: Rc<String>,
|
||||
current_window_time: Instant,
|
||||
session_start: Instant,
|
||||
session_stats: HashMap<Rc<String>, Duration>,
|
||||
session_remaining: Duration,
|
||||
session_ratings: Vec<SessionRating>,
|
||||
session_ratings_index: usize,
|
||||
session_results: Vec<SessionResult>,
|
||||
last_tick_50ms: Instant,
|
||||
last_tick_1s: Instant,
|
||||
}
|
||||
|
||||
fn save_session(session: &SessionResult) {
|
||||
let path = shellexpand::tilde(constants::HISTORY_FILE).to_string();
|
||||
|
||||
// 1. Open file in append mode, create if missing
|
||||
let file = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path);
|
||||
|
||||
if let Ok(mut f) = file {
|
||||
// 2. Serialize to JSON
|
||||
if let Ok(json) = serde_json::to_string(session) {
|
||||
// 3. Write newline appended
|
||||
let _ = writeln!(f, "{}", json);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl App {
|
||||
fn new() -> Self {
|
||||
let window_title = window::get_title_clean();
|
||||
|
||||
App {
|
||||
state: State::InputIntention,
|
||||
user_intention: String::new(),
|
||||
user_duration_str: constants::DEFAULT_DURATION.to_string(),
|
||||
user_duration: Duration::ZERO,
|
||||
current_window_title: window_title.into(),
|
||||
current_window_time: Instant::now(),
|
||||
session_start: Instant::now(),
|
||||
session_stats: HashMap::new(),
|
||||
session_remaining: Duration::ZERO,
|
||||
session_ratings: Vec::new(),
|
||||
session_ratings_index: 0,
|
||||
session_results: Vec::new(),
|
||||
last_tick_50ms: Instant::now(),
|
||||
last_tick_1s: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_ticks(&mut self) {
|
||||
if self.last_tick_50ms.elapsed() >= Duration::from_millis(50) {
|
||||
self.tick_50ms();
|
||||
}
|
||||
if self.last_tick_1s.elapsed() >= Duration::from_secs(1) {
|
||||
self.tick_1s();
|
||||
}
|
||||
}
|
||||
|
||||
fn to_in_progress(&mut self) {
|
||||
if self.user_intention.len() == 0 || self.user_duration == Duration::ZERO {
|
||||
return;
|
||||
}
|
||||
self.state = State::InProgress;
|
||||
self.current_window_time = Instant::now();
|
||||
self.session_start = self.current_window_time;
|
||||
self.session_stats = HashMap::new();
|
||||
self.session_ratings_index = 0;
|
||||
}
|
||||
|
||||
fn to_end(&mut self) {
|
||||
self.state = State::End;
|
||||
self.session_ratings = session_stats_as_vec(&self.session_stats);
|
||||
}
|
||||
|
||||
fn cleanup(&self) {
|
||||
let path = shellexpand::tilde(constants::STATUS_FILE).to_string();
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
fn write_status(&self) {
|
||||
let status = match self.state {
|
||||
State::InputIntention | State::InputDuration => constants::STATUS_CONFIGURE.to_string(),
|
||||
State::InProgress => format!(
|
||||
"🎯 {} - {}",
|
||||
self.user_intention,
|
||||
duration_as_str(&self.session_remaining)
|
||||
),
|
||||
State::Paused => format!(
|
||||
"{} - {}",
|
||||
constants::STATUS_PAUSED,
|
||||
duration_as_str(&self.session_remaining)
|
||||
),
|
||||
State::End => constants::STATUS_RATE_SESSION.to_string(),
|
||||
State::ShouldQuit => constants::STATUS_QUIT.to_string(),
|
||||
};
|
||||
|
||||
let path = shellexpand::tilde(constants::STATUS_FILE).to_string();
|
||||
if let Ok(mut file) = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&path)
|
||||
{
|
||||
let _ = file.write_all(status.as_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
fn tick_50ms(&mut self) {
|
||||
match self.state {
|
||||
State::InputIntention | State::InputDuration => {
|
||||
if let Ok(user_duration_mins) = self.user_duration_str.parse::<u64>() {
|
||||
self.user_duration = Duration::from_secs(user_duration_mins * 60);
|
||||
} else {
|
||||
self.user_duration = Duration::ZERO;
|
||||
}
|
||||
|
||||
window::minimize_other(&constants::APP_TITLE);
|
||||
}
|
||||
State::InProgress => {
|
||||
let elapsed = self.session_start.elapsed();
|
||||
self.session_remaining = self.user_duration.saturating_sub(elapsed);
|
||||
update_session_stats(self);
|
||||
|
||||
if self.session_remaining.is_zero() {
|
||||
self.to_end();
|
||||
}
|
||||
}
|
||||
State::ShouldQuit => {}
|
||||
State::Paused => {
|
||||
window::minimize_other(&constants::APP_TITLE);
|
||||
update_session_stats(self);
|
||||
}
|
||||
State::End => {
|
||||
window::minimize_other(&constants::APP_TITLE);
|
||||
}
|
||||
}
|
||||
|
||||
self.last_tick_50ms = Instant::now();
|
||||
}
|
||||
|
||||
fn tick_1s(&mut self) {
|
||||
self.last_tick_1s = Instant::now();
|
||||
self.write_status();
|
||||
|
||||
if self.state == State::Paused {
|
||||
self.user_duration = self.user_duration.saturating_add(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
|
||||
fn timeout(&self) -> Duration {
|
||||
Duration::from_millis(50).saturating_sub(self.last_tick_50ms.elapsed())
|
||||
}
|
||||
|
||||
fn get_session_results(&self) -> Vec<Line<'_>> {
|
||||
self.session_results
|
||||
.iter()
|
||||
.map(|r| {
|
||||
Line::from(Span::styled(
|
||||
format!("{} {}", duration_as_str(&r.duration), r.intention,),
|
||||
match r.rating {
|
||||
2 => Style::new().fg(Color::LightYellow),
|
||||
3 => Style::new().fg(Color::LightGreen),
|
||||
_ => Style::new().fg(Color::LightRed),
|
||||
},
|
||||
))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn get_session_stats(&self) -> Vec<Line<'_>> {
|
||||
let mut zero_encountered = if self.state != State::End {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
self.session_ratings
|
||||
.iter()
|
||||
.map(|s| {
|
||||
Line::from(Span::styled(
|
||||
format!("{}: {}", duration_as_str(&s.duration), s.window_title),
|
||||
match s.rating {
|
||||
0 if !zero_encountered => {
|
||||
zero_encountered = true;
|
||||
Style::new().fg(Color::LightBlue)
|
||||
}
|
||||
0 if zero_encountered => Style::new(),
|
||||
1 => Style::new().fg(Color::LightRed),
|
||||
2 => Style::new().fg(Color::LightYellow),
|
||||
3 => Style::new().fg(Color::LightGreen),
|
||||
_ => Style::new().fg(Color::LightBlue),
|
||||
},
|
||||
))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn duration_as_str(duration: &Duration) -> String {
|
||||
format!(
|
||||
"{:3}:{:02}",
|
||||
duration.as_secs() / 60,
|
||||
duration.as_secs() % 60
|
||||
)
|
||||
}
|
||||
|
||||
fn session_stats_as_vec(session_stats: &HashMap<Rc<String>, Duration>) -> Vec<SessionRating> {
|
||||
let mut stats: Vec<_> = session_stats
|
||||
.iter()
|
||||
.filter(|(_, duration)| duration.as_secs() > 30)
|
||||
.map(|(title, duration)| SessionRating {
|
||||
window_title: title.clone(),
|
||||
duration: duration.clone(),
|
||||
rating: 0,
|
||||
})
|
||||
.collect();
|
||||
stats.sort_by(|a, b| b.duration.cmp(&a.duration));
|
||||
stats
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
enable_raw_mode()?;
|
||||
stdout().execute(EnterAlternateScreen)?;
|
||||
execute!(stdout(), SetTitle(constants::APP_TITLE))?;
|
||||
let mut terminal = Terminal::new(CrosstermBackend::new(stdout()))?;
|
||||
let mut app = App::new();
|
||||
|
||||
while app.state != State::ShouldQuit {
|
||||
terminal.draw(|frame| ui(frame, &app))?;
|
||||
handle_events(&mut app)?;
|
||||
app.handle_ticks();
|
||||
}
|
||||
|
||||
app.cleanup();
|
||||
disable_raw_mode()?;
|
||||
stdout().execute(LeaveAlternateScreen)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_events(app: &mut App) -> Result<()> {
|
||||
if !event::poll(app.timeout())? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Event::Key(key) = event::read()? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if key.kind != event::KeyEventKind::Press {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if key.code == KeyCode::Esc {
|
||||
app.state = State::ShouldQuit;
|
||||
}
|
||||
|
||||
match app.state {
|
||||
State::InputIntention => match key.code {
|
||||
KeyCode::Enter => app.state = State::InputDuration,
|
||||
KeyCode::Tab => app.state = State::InputDuration,
|
||||
KeyCode::Backspace => {
|
||||
let _ = app.user_intention.pop();
|
||||
}
|
||||
KeyCode::Char(c) => {
|
||||
app.user_intention.push(c);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
State::InputDuration => match key.code {
|
||||
KeyCode::Enter => app.to_in_progress(),
|
||||
KeyCode::Tab => app.state = State::InputIntention,
|
||||
KeyCode::Backspace => {
|
||||
let _ = app.user_duration_str.pop();
|
||||
}
|
||||
KeyCode::Char(c) if c.is_ascii_digit() => {
|
||||
app.user_duration_str.push(c);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
State::InProgress => match key.code {
|
||||
KeyCode::Char('q') => {
|
||||
app.to_end();
|
||||
}
|
||||
KeyCode::Char('p') => {
|
||||
app.state = State::Paused;
|
||||
}
|
||||
KeyCode::Char('a') => {
|
||||
app.user_duration = app.user_duration.saturating_add(Duration::from_secs(60));
|
||||
}
|
||||
KeyCode::Char('x') => {
|
||||
app.user_duration = app.user_duration.saturating_sub(Duration::from_secs(60));
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
State::Paused => {
|
||||
if key.code == KeyCode::Char('p') {
|
||||
app.state = State::InProgress;
|
||||
}
|
||||
}
|
||||
State::ShouldQuit => (),
|
||||
State::End => {
|
||||
let code = match key.code {
|
||||
KeyCode::Char('1') => 1,
|
||||
KeyCode::Char('2') => 2,
|
||||
KeyCode::Char('3') => 3,
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
if app.session_ratings_index < app.session_ratings.len() && code != 0 {
|
||||
app.session_ratings[app.session_ratings_index].rating = code;
|
||||
app.session_ratings_index += 1;
|
||||
}
|
||||
|
||||
if app.session_ratings_index >= app.session_ratings.len() {
|
||||
app.state = State::InputIntention;
|
||||
let mut session_result = SessionResult {
|
||||
intention: app.user_intention.clone(),
|
||||
duration: app.session_start.elapsed(),
|
||||
session_ratings: std::mem::take(&mut app.session_ratings),
|
||||
rating: 0,
|
||||
rating_f64: 0.0,
|
||||
};
|
||||
session_result.rate();
|
||||
save_session(&session_result);
|
||||
app.session_results.push(session_result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_session_stats(app: &mut App) {
|
||||
let window_title = if app.state == State::Paused {
|
||||
constants::PAUSED.to_string().into()
|
||||
} else {
|
||||
window::get_title_clean().into()
|
||||
};
|
||||
|
||||
let delta = app.current_window_time.elapsed();
|
||||
if app.current_window_title != window_title || (delta > Duration::from_secs(1)) {
|
||||
let entry = app
|
||||
.session_stats
|
||||
.entry(app.current_window_title.clone())
|
||||
.or_insert_with(|| Duration::default());
|
||||
*entry += app.current_window_time.elapsed();
|
||||
app.current_window_time = Instant::now();
|
||||
app.current_window_title = window_title;
|
||||
app.session_ratings = session_stats_as_vec(&app.session_stats);
|
||||
}
|
||||
}
|
||||
|
||||
fn ui(frame: &mut Frame, app: &App) {
|
||||
let layout = Layout::vertical([
|
||||
Constraint::Min(3),
|
||||
Constraint::Min(3),
|
||||
Constraint::Percentage(100),
|
||||
Constraint::Min(3),
|
||||
]);
|
||||
let [layout_intention, layout_duration, layout_titles, layout_status] =
|
||||
layout.areas(frame.size());
|
||||
|
||||
let border_type_intention = if app.state == State::InputIntention {
|
||||
BorderType::Thick
|
||||
} else {
|
||||
BorderType::Plain
|
||||
};
|
||||
let input_intention = Line::from(Span::raw(&app.user_intention));
|
||||
frame.render_widget(
|
||||
Paragraph::new(input_intention).block(
|
||||
Block::bordered()
|
||||
.border_type(border_type_intention)
|
||||
.title(constants::INTENTION_TITLE),
|
||||
),
|
||||
layout_intention,
|
||||
);
|
||||
|
||||
let input_duration: Vec<Line> = match app.state {
|
||||
State::InProgress | State::Paused => {
|
||||
let s = duration_as_str(&app.session_remaining);
|
||||
vec![Line::from(Span::raw(s))]
|
||||
}
|
||||
_ => {
|
||||
vec![Line::from(Span::raw(&app.user_duration_str))]
|
||||
}
|
||||
};
|
||||
let border_type_duration = if app.state == State::InputDuration {
|
||||
BorderType::Thick
|
||||
} else {
|
||||
BorderType::Plain
|
||||
};
|
||||
frame.render_widget(
|
||||
Paragraph::new(input_duration).block(
|
||||
Block::bordered()
|
||||
.border_type(border_type_duration)
|
||||
.title(constants::DURATION_TITLE),
|
||||
),
|
||||
layout_duration,
|
||||
);
|
||||
|
||||
if app.state == State::InputIntention || app.state == State::InputDuration {
|
||||
let results = app.get_session_results();
|
||||
frame.render_widget(
|
||||
Paragraph::new(results)
|
||||
.block(Block::bordered().title(constants::PREVIOUS_SESSIONS_TITLE)),
|
||||
layout_titles,
|
||||
);
|
||||
} else {
|
||||
let stats = app.get_session_stats();
|
||||
frame.render_widget(
|
||||
Paragraph::new(stats).block(Block::bordered().title(constants::SESSION_STATS_TITLE)),
|
||||
layout_titles,
|
||||
);
|
||||
}
|
||||
|
||||
let mut spans: Vec<Span> = Vec::new();
|
||||
if app.user_intention.len() == 0 {
|
||||
let span = Span::styled(
|
||||
constants::PROVIDE_INTENTION,
|
||||
Style::new().fg(Color::LightRed),
|
||||
);
|
||||
spans.push(span);
|
||||
}
|
||||
|
||||
if app.user_duration.is_zero() {
|
||||
let span = Span::styled(
|
||||
constants::PROVIDE_VALID_DURATION,
|
||||
Style::new().fg(Color::LightRed),
|
||||
);
|
||||
spans.push(span);
|
||||
}
|
||||
|
||||
match app.state {
|
||||
State::InputIntention | State::InputDuration => {
|
||||
if spans.len() == 0 {
|
||||
let span = Span::styled(
|
||||
constants::READY_TO_START,
|
||||
Style::new().fg(Color::LightGreen),
|
||||
);
|
||||
spans.push(span);
|
||||
}
|
||||
}
|
||||
State::InProgress => {
|
||||
let span = Span::styled(
|
||||
constants::SESSION_IN_PROGRESS,
|
||||
Style::new().fg(Color::LightGreen),
|
||||
);
|
||||
spans.push(span);
|
||||
}
|
||||
State::Paused => {
|
||||
let span = Span::styled(constants::SESSION_PAUSED, Style::new().fg(Color::LightBlue));
|
||||
spans.push(span);
|
||||
}
|
||||
State::ShouldQuit => {}
|
||||
State::End => {
|
||||
let span = Span::styled(constants::RATE_TITLES, Style::new().fg(Color::LightBlue));
|
||||
spans.push(span);
|
||||
}
|
||||
}
|
||||
|
||||
let input_status: Vec<Line> = vec![Line::from(spans)];
|
||||
frame.render_widget(
|
||||
Paragraph::new(input_status).block(Block::bordered().title(constants::STATUS_TITLE)),
|
||||
layout_status,
|
||||
);
|
||||
}
|
||||
70
src/window/linux.rs
Normal file
70
src/window/linux.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
use regex::Regex;
|
||||
use std::{process::Command, process::Output, str};
|
||||
|
||||
pub fn get_title_clean() -> String {
|
||||
let title = get_window_info().title;
|
||||
let re = Regex::new(r"-?\d+([:.]\d+)+%?").unwrap();
|
||||
re.replace_all(&title, "").to_string()
|
||||
}
|
||||
|
||||
pub fn minimize_other(title: &str) {
|
||||
let window_info = get_window_info();
|
||||
if &window_info.title != title {
|
||||
run(&format!("xdotool windowminimize {}", window_info.wid));
|
||||
}
|
||||
}
|
||||
|
||||
struct WindowInfo {
|
||||
title: String,
|
||||
_class: String,
|
||||
wid: String,
|
||||
}
|
||||
|
||||
fn run(cmd: &str) -> Option<String> {
|
||||
let output = Command::new("sh").arg("-c").arg(cmd).output();
|
||||
|
||||
let Ok(Output {
|
||||
status,
|
||||
stdout,
|
||||
stderr: _,
|
||||
}) = output
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
|
||||
if status.code() != Some(0) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let Ok(output_str) = str::from_utf8(&stdout) else {
|
||||
return None;
|
||||
};
|
||||
|
||||
Some(output_str.trim().to_string())
|
||||
}
|
||||
|
||||
fn get_window_info() -> WindowInfo {
|
||||
let none = WindowInfo {
|
||||
title: "none".to_string(),
|
||||
_class: "none".to_string(),
|
||||
wid: "".to_string(),
|
||||
};
|
||||
|
||||
let Some(wid) = run("xdotool getactivewindow") else {
|
||||
return none;
|
||||
};
|
||||
|
||||
let Some(class) = run(&format!("xdotool getwindowclassname {wid}")) else {
|
||||
return none;
|
||||
};
|
||||
|
||||
let Some(title) = run(&format!("xdotool getwindowname {wid}")) else {
|
||||
return none;
|
||||
};
|
||||
|
||||
WindowInfo {
|
||||
title,
|
||||
_class: class,
|
||||
wid,
|
||||
}
|
||||
}
|
||||
9
src/window/mod.rs
Normal file
9
src/window/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
#[cfg(target_os = "windows")]
|
||||
mod windows;
|
||||
#[cfg(target_os = "windows")]
|
||||
pub use windows::*;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
mod linux;
|
||||
#[cfg(target_os = "linux")]
|
||||
pub use linux::*;
|
||||
36
src/window/windows.rs
Normal file
36
src/window/windows.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use regex::Regex;
|
||||
use std::{ffi::OsString, os::windows::ffi::OsStringExt};
|
||||
use winapi::shared::windef::HWND;
|
||||
use winapi::um::winuser::{GetForegroundWindow, GetWindowTextW, ShowWindow, SW_MINIMIZE};
|
||||
|
||||
pub fn get_title_clean() -> String {
|
||||
let title = get_window_info().title;
|
||||
let re = Regex::new(r"-?\d+([:.]\d+)+%?").unwrap();
|
||||
re.replace_all(&title, "").to_string()
|
||||
}
|
||||
|
||||
pub fn minimize_other(title: &str) {
|
||||
let window_info = get_window_info();
|
||||
if window_info.title != title {
|
||||
unsafe {
|
||||
ShowWindow(window_info.hwnd, SW_MINIMIZE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct WindowInfo {
|
||||
title: String,
|
||||
hwnd: HWND,
|
||||
}
|
||||
|
||||
fn get_window_info() -> WindowInfo {
|
||||
unsafe {
|
||||
let hwnd = GetForegroundWindow();
|
||||
let mut text: [u16; 512] = [0; 512];
|
||||
let len = GetWindowTextW(hwnd, text.as_mut_ptr(), text.len() as i32) as usize;
|
||||
let title = OsString::from_wide(&text[..len])
|
||||
.to_string_lossy()
|
||||
.into_owned();
|
||||
WindowInfo { title, hwnd }
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user