Switch to new configuration file. Fix bug when finding window title.
This commit is contained in:
1
config.json
Symbolic link
1
config.json
Symbolic link
@@ -0,0 +1 @@
|
||||
/home/felixm/.config/focusfriend/config.json
|
||||
26
config.py
26
config.py
@@ -1,9 +1,10 @@
|
||||
import json
|
||||
import os
|
||||
from typing import List
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class BlockList(BaseModel):
|
||||
class Block(BaseModel):
|
||||
name: str = ''
|
||||
prefix: str = ''
|
||||
postfix: str = ''
|
||||
@@ -11,14 +12,21 @@ class BlockList(BaseModel):
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
directory: str
|
||||
blocks: List[Block]
|
||||
start_as_user: bool = True
|
||||
sleep_time: int = 1
|
||||
focusfriend_py: str = "focusfriend.py"
|
||||
window_names: str = "window_names.txt"
|
||||
|
||||
class Config:
|
||||
extra = 'forbid'
|
||||
|
||||
blocklists: List[BlockList]
|
||||
|
||||
|
||||
def load_config(config_file: str) -> Config:
|
||||
with open(config_file, 'r', encoding='utf8') as f:
|
||||
config_dict = json.load(f)
|
||||
return Config(**config_dict)
|
||||
|
||||
@classmethod
|
||||
def load_config(cls, config_file: str) -> Config:
|
||||
config_file = os.path.expanduser(config_file)
|
||||
with open(config_file, 'r', encoding='utf8') as f:
|
||||
config_dict = json.load(f)
|
||||
config = cls(**config_dict)
|
||||
config.directory = os.path.expanduser(config.directory)
|
||||
return config
|
||||
|
||||
@@ -1,31 +1,32 @@
|
||||
{
|
||||
"python.linting.pylintEnabled": false,
|
||||
"python.linting.enabled": false,
|
||||
"folders": [
|
||||
{
|
||||
"path": "."
|
||||
}
|
||||
],
|
||||
"launch": {
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python: Current File",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "${file}",
|
||||
"console": "integratedTerminal"
|
||||
},
|
||||
{
|
||||
"name": "FocusFriend Debug",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "focusfriend.py",
|
||||
"console": "integratedTerminal",
|
||||
"args": [
|
||||
"--debug",
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
"folders": [
|
||||
{
|
||||
"path": "."
|
||||
}
|
||||
],
|
||||
"launch": {
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python: Current File",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "${file}",
|
||||
"console": "integratedTerminal"
|
||||
},
|
||||
{
|
||||
"name": "FocusFriend Debug",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "focusfriend.py",
|
||||
"console": "integratedTerminal",
|
||||
"args": ["--debug"]
|
||||
}
|
||||
]
|
||||
},
|
||||
"settings": {
|
||||
"python.linting.pylintEnabled": true,
|
||||
"python.linting.enabled": true,
|
||||
"python.linting.pylintArgs": ["--errors-only"]
|
||||
}
|
||||
}
|
||||
|
||||
180
focusfriend.py
180
focusfriend.py
@@ -3,18 +3,12 @@
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import shutil
|
||||
import sys
|
||||
import time
|
||||
from typing import List, Tuple, Set
|
||||
import psutil
|
||||
import config
|
||||
|
||||
|
||||
FOCUSFRIEND_PY = "focusfriend.py"
|
||||
CONFIG_FILE = "config.json"
|
||||
BLOCKER_CONFIG_DIR = "/home/{}/.config/focusfriend"
|
||||
BLOCKED_BROWSER_WORDS_TXT = "blocked_browser_words.txt"
|
||||
WINDOW_NAMES_TXT = "window_names.txt"
|
||||
from config import Config
|
||||
|
||||
|
||||
def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool:
|
||||
@@ -26,13 +20,14 @@ def is_window_blocked(window_name: str, blocked: List[re.Pattern]) -> bool:
|
||||
|
||||
def get_active_window_name_and_pid() -> Tuple[str, str]:
|
||||
cmd = ["xdotool", "getactivewindow", "getwindowname", "getwindowpid"]
|
||||
p = subprocess.run(cmd, capture_output=True, check=True)
|
||||
p = subprocess.run(cmd, capture_output=True)
|
||||
if p.returncode != 0:
|
||||
return ("", "")
|
||||
window_name, window_pid, _ = p.stdout.decode().split("\n")
|
||||
return (window_name, window_pid)
|
||||
|
||||
|
||||
def kill_sequence(blocked: List[re.Pattern]) -> None:
|
||||
|
||||
def to_display(name: str) -> str:
|
||||
return name if len(name) < 30 else name[:30] + "..."
|
||||
|
||||
@@ -49,39 +44,30 @@ def kill_sequence(blocked: List[re.Pattern]) -> None:
|
||||
|
||||
|
||||
def notify(message: str) -> None:
|
||||
"""Notify user via the Xorg notify-send command.
|
||||
|
||||
Args:
|
||||
message (str): Message shown to the user.
|
||||
"""
|
||||
""" Notify user via the Xorg notify-send command. """
|
||||
env = {
|
||||
**os.environ,
|
||||
"DBUS_SESSION_BUS_ADDRESS": "unix:path=/run/user/1000/bus"
|
||||
}
|
||||
user = env["SUDO_USER"]
|
||||
cmd = ["runuser", "-m", "-u", user, "notify-send", message]
|
||||
user = env.get("SUDO_USER", None)
|
||||
if user is None:
|
||||
cmd = ["notify-send", message]
|
||||
else:
|
||||
cmd = ["runuser", "-m", "-u", user, "notify-send", message]
|
||||
subprocess.run(cmd, env=env)
|
||||
|
||||
|
||||
def get_config_dir() -> str:
|
||||
user = os.environ.get("SUDO_USER", False) or os.environ["USER"]
|
||||
config_dir = BLOCKER_CONFIG_DIR.format(user)
|
||||
assert(os.path.isdir(config_dir))
|
||||
return config_dir
|
||||
|
||||
|
||||
def write_pid_file() -> str:
|
||||
def write_pid_file(config: Config) -> str:
|
||||
p = psutil.Process()
|
||||
pid_file = os.path.join(get_config_dir(), f"{p.pid}.pid")
|
||||
pid_file = os.path.join(config.directory, f"{p.pid}.pid")
|
||||
with open(pid_file, "w") as f:
|
||||
f.write(pid_file)
|
||||
f.write(str(p.pid))
|
||||
return pid_file
|
||||
|
||||
|
||||
def terminate_existing_blocker() -> None:
|
||||
def terminate_existing_blocker(config: Config) -> None:
|
||||
this_pid = psutil.Process().pid
|
||||
config_dir = get_config_dir()
|
||||
pid_files = [f for f in os.listdir(config_dir) if f.endswith(".pid")]
|
||||
pid_files = [f for f in config.directory if f.endswith(".pid")]
|
||||
for pid_file in pid_files:
|
||||
pid = int(pid_file.replace(".pid", ""))
|
||||
if this_pid == pid:
|
||||
@@ -90,87 +76,83 @@ def terminate_existing_blocker() -> None:
|
||||
p = psutil.Process(pid)
|
||||
p.terminate()
|
||||
except psutil.NoSuchProcess:
|
||||
pass
|
||||
os.remove(os.path.join(config_dir, pid_file))
|
||||
print(f"Could not terminate {p.pid=}.")
|
||||
pid_file = os.path.join(config.directory, pid_file)
|
||||
try:
|
||||
os.remove(pid_file)
|
||||
except PermissionError:
|
||||
print(f"Could not remove {pid_file=}.")
|
||||
|
||||
|
||||
def init() -> None:
|
||||
terminate_existing_blocker()
|
||||
write_pid_file()
|
||||
|
||||
|
||||
def load_blocked_browser_words() -> List[re.Pattern]:
|
||||
config_dir = get_config_dir()
|
||||
blocked_words_file = os.path.join(config_dir, BLOCKED_BROWSER_WORDS_TXT)
|
||||
assert(os.path.isfile(blocked_words_file))
|
||||
def load_blocked_patterns(config: Config) -> List[re.Pattern]:
|
||||
blocked = []
|
||||
with open(blocked_words_file, "r") as f:
|
||||
for line in f.readlines():
|
||||
line = line.strip()
|
||||
r = re.compile(f"{line}.*(Firefox|Chromium)", flags=re.IGNORECASE)
|
||||
for block in config.blocks:
|
||||
for item in block.items:
|
||||
s = "".join([block.prefix, item, block.postfix])
|
||||
r = re.compile(s, re.IGNORECASE)
|
||||
blocked.append(r)
|
||||
return blocked
|
||||
|
||||
|
||||
def load_window_names() -> Set[str]:
|
||||
config_dir = get_config_dir()
|
||||
window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT)
|
||||
def init(config: Config) -> None:
|
||||
terminate_existing_blocker(config)
|
||||
write_pid_file(config)
|
||||
|
||||
|
||||
def run_checks(config: Config) -> None:
|
||||
# shutil.which("xdotool")
|
||||
# That's what we do if anything goes wrong:
|
||||
# xkill --all
|
||||
pass
|
||||
|
||||
|
||||
def kill_window_if_blocked(blocked: List[re.Pattern]) -> None:
|
||||
window_name, window_pid = get_active_window_name_and_pid()
|
||||
if is_window_blocked(window_name, blocked):
|
||||
kill_sequence(blocked)
|
||||
|
||||
|
||||
def main_root(config: Config) -> None:
|
||||
init(config)
|
||||
blocked = load_blocked_patterns(config)
|
||||
while True:
|
||||
time.sleep(config.sleep_time)
|
||||
run_checks(config)
|
||||
kill_window_if_blocked(blocked)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
""" Run main_root as root except while debugging. """
|
||||
config_path = "~/.config/focusfriend/config.json"
|
||||
config = Config.load_config(config_path)
|
||||
|
||||
if config.start_as_user:
|
||||
terminate_existing_blocker(config)
|
||||
main_root(config)
|
||||
|
||||
if os.geteuid() == 0:
|
||||
newpid = os.fork()
|
||||
if newpid == 0:
|
||||
main_root(config)
|
||||
else:
|
||||
cmd = ["sudo", config.focusfriend_py] + sys.argv[1:]
|
||||
subprocess.Popen(cmd)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
def load_window_names(config: Config) -> Set[str]:
|
||||
window_names_file = os.path.join(config.directory, config.window_names)
|
||||
if not os.path.isfile(window_names_file):
|
||||
return set()
|
||||
with open(window_names_file, "r") as f:
|
||||
return {l.strip() for l in f.readlines()}
|
||||
|
||||
|
||||
def write_window_names(window_names: Set[str]) -> None:
|
||||
config_dir = get_config_dir()
|
||||
window_names_file = os.path.join(config_dir, WINDOW_NAMES_TXT)
|
||||
def write_window_names(config: Config, window_names: Set[str]) -> None:
|
||||
window_names_file = os.path.join(config.directory, config.window_names)
|
||||
window_names = "\n".join(sorted(list(window_names)))
|
||||
with open(window_names_file, "w") as f:
|
||||
f.write(window_names)
|
||||
|
||||
|
||||
def load_config() -> config.Config:
|
||||
config_dir = get_config_dir()
|
||||
config_file = os.path.join(config_dir, CONFIG_FILE)
|
||||
config_obj = config.load_config(config_file)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
# config = load_config()
|
||||
init()
|
||||
blocked = load_blocked_browser_words()
|
||||
window_names = load_window_names()
|
||||
counter = 0
|
||||
while True:
|
||||
time.sleep(1)
|
||||
window_name, window_pid = get_active_window_name_and_pid()
|
||||
window_names.add(window_name)
|
||||
if is_window_blocked(window_name, blocked):
|
||||
kill_sequence(blocked)
|
||||
if counter % 60 == 59:
|
||||
write_window_names(window_names)
|
||||
counter += 1
|
||||
|
||||
|
||||
def sudo_run() -> None:
|
||||
"""Run main as root except while debugging.
|
||||
"""
|
||||
try:
|
||||
if sys.argv[1] == "--debug":
|
||||
terminate_existing_blocker()
|
||||
main()
|
||||
return
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
if os.geteuid() == 0:
|
||||
newpid = os.fork()
|
||||
if newpid == 0:
|
||||
main()
|
||||
else:
|
||||
cmd = ["sudo", FOCUSFRIEND_PY] + sys.argv[1:]
|
||||
subprocess.Popen(cmd)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sudo_run()
|
||||
f.write(window_names)
|
||||
Reference in New Issue
Block a user