From db8802dd37cf3891efec75de08a7097ec51822bd Mon Sep 17 00:00:00 2001 From: Felix Martin Date: Thu, 20 May 2021 14:27:51 -0400 Subject: [PATCH] Add my existing implementation --- Makefile | 24 +++++ merger.py | 0 ping.py | 78 ++++++++++++++ prompter.py | 219 ++++++++++++++++++++++++++++++++++++++++ pyqt/README | 2 + pyqt/pingdialog.ui | 54 ++++++++++ pyqt/tagtime.py | 129 +++++++++++++++++++++++ taglog.py | 189 ++++++++++++++++++++++++++++++++++ tagtime.py | 105 +++++++++++++++++++ tagtimerc.py | 98 ++++++++++++++++++ tests/tagtimerc | 67 ++++++++++++ tests/test_ping.py | 121 ++++++++++++++++++++++ tests/test_taglog.py | 67 ++++++++++++ tests/test_tagtimerc.py | 26 +++++ tests/test_timer.py | 77 ++++++++++++++ tests/user1.log | 8 ++ tests/user2.log | 9 ++ timer.py | 94 +++++++++++++++++ 18 files changed, 1367 insertions(+) create mode 100644 Makefile create mode 100644 merger.py create mode 100644 ping.py create mode 100644 prompter.py create mode 100644 pyqt/README create mode 100644 pyqt/pingdialog.ui create mode 100644 pyqt/tagtime.py create mode 100644 taglog.py create mode 100644 tagtime.py create mode 100644 tagtimerc.py create mode 100644 tests/tagtimerc create mode 100644 tests/test_ping.py create mode 100644 tests/test_taglog.py create mode 100644 tests/test_tagtimerc.py create mode 100644 tests/test_timer.py create mode 100644 tests/user1.log create mode 100644 tests/user2.log create mode 100644 timer.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..35c251f --- /dev/null +++ b/Makefile @@ -0,0 +1,24 @@ +PY?=python3 + +default: daemon + +help: + @echo 'Makefile for TagTimePy ' + @echo ' ' + @echo 'Usage: ' + @echo ' make clean remove temporary files ' + @echo ' make test run tests ' + @echo ' make daemon run TagTime daemon ' + @echo ' make autoflake run autoflake on py files ' + +clean: + @rm -rf __pycache__ + @rm -rf **/__pycache__ + +test: + @$(PY) -m unittest tests/*.py + @$(PY) -m doctest tests/*.py + +daemon: + @$(PY) tagtime.py daemon + diff --git a/merger.py b/merger.py new file mode 100644 index 0000000..e69de29 diff --git a/ping.py b/ping.py new file mode 100644 index 0000000..8128a9b --- /dev/null +++ b/ping.py @@ -0,0 +1,78 @@ +import re +import datetime +from dataclasses import dataclass +from typing import List + + +@dataclass +class Ping: + """ + A ping is a single line in the log file consisting of a UNIX time stamp, + followed by tags (where each individual string surrounded by spaces is a + tag), followed by optional comments between square brackets or parentheses. + The original Perl implementation puts a human readable representation of + the UNIX time stamp into square brackets. Here are two example pings: + + 1601557948 morning_pages [2020.10.01 09:12:28 THU] + 1601560369 work_call another_tag [2020.10.01 09:52:49 THU] + """ + time: int # UNIX time stamp + tags: List[str] # each separate word is a tag + comments: List[str] # each string between [] or () is a comment + line: str = "" # the whole line as found in the log file + + r_time = re.compile("^\s*\d{9,11}") + r_spaces = re.compile("\s+") + r_comment_parens = re.compile("\(([^\)]*)\)") + r_comment_square = re.compile("\[([^\]]*)\]") + + def line_to_ping(line: str): + """ + Parses a string into a Ping object. Raises Exception on failure. + + >>> line_to_ping(" 1601557948 t (c)") + Ping(time=1601557948, tags=['t'], comments=['c'], line=' 1601557948 t (c)') + """ + time = int(Ping.r_time.match(line).group()) + tags = Ping.get_tags(line) + comments = Ping.r_comment_parens.findall(line) + \ + Ping.r_comment_square.findall(line) + return Ping(time, tags, comments, line) + + def get_tags(line): + """Extracts the tags from a tag line.""" + line = Ping.r_time.sub("", line) + line = Ping.r_comment_parens.sub("", line) + line = Ping.r_comment_square.sub("", line) + line = Ping.r_spaces.sub(" ", line) + return line.split() + + def ping_to_line(ping, annotate_time=False, line_length=79) -> str: + tags = " ".join([t.strip() for t in ping.tags]) + comments = " ".join(["[" + c.strip() + "]" for c in ping.comments]) + line = "{} {} {}".format(ping.time, tags, comments) + if annotate_time: + line = Ping.add_time_annotation(ping.time, line, line_length) + return line + + def add_time_annotation(time: int, line: str, line_length: int) -> str: + """Appends human readable date/time in square brackets to line.""" + remaining_length = line_length - len(line) + if remaining_length > 24: + strf = " [%Y.%m.%d %H:%M:%S %a]" + elif remaining_length > 18: + strf = " [%m.%d %H:%M:%S %a]" + elif remaining_length > 15: + strf = " [%d %H:%M:%S %a]" + elif remaining_length > 12: + strf = " [%H:%M:%S %a]" + elif remaining_length > 9: + strf = " [%H:%M %a]" + elif remaining_length > 5: + strf = " [%H:%M]" + else: + strf = " [%M]" + time_comment = datetime.datetime.fromtimestamp(time).strftime(strf) + remaining_length -= len(time_comment) + line += " " * remaining_length + time_comment + return line diff --git a/prompter.py b/prompter.py new file mode 100644 index 0000000..a455459 --- /dev/null +++ b/prompter.py @@ -0,0 +1,219 @@ +import os +import time +import logging +import subprocess +import tempfile + +from typing import List +from tagtimerc import TagTimeRc +from timer import Timer +from ping import Ping +from taglog import TagLog + + +class Prompter: + """ + Prompter tries to replicate the behavior of the original TagTime Perl + implementation. + """ + + def __init__(self, tagtimerc: TagTimeRc, gui: bool = False): + self.rc = tagtimerc + self.timer = Timer(self.rc.seed, self.rc.urping, self.rc.gap) + self.taglog = TagLog(self.rc.log_file) + self.gui = gui + + def _get_prompt(self, time: int, previous_tags: str) -> str: + """Create prompt string for terminal mode.""" + age = self.timer.time_now() - time + time_str = Ping.add_time_annotation(time, "", 13).strip() + if age < 0: + prompt_str = "You are from the future? Good job!\n" \ + "What will you be doing then? {}\n" + elif age < 10: + prompt_str = "It's tag time! What are you doing RIGHT NOW {}?\n" + else: + warning = "WARNING This popup is {} seconds late WARNING\n".format( + age) + len_warning = len(warning) - 1 + prompt_str = "-" * len_warning + "\n" + prompt_str += warning + prompt_str += "-" * len_warning + "\n" + prompt_str += "It's tag time! What were you doing {}?\n" + prompt_str = prompt_str.format(time_str) + if previous_tags: + m = "Ditto (\") to repeat prev tags: {}\n".format(previous_tags) + prompt_str += m + prompt_str += "> " + return prompt_str + + def ping(self, time: int, previous_tags: str = ""): + """Ask the user for tags and log them.""" + prompt_str = self._get_prompt(time, previous_tags) + input_str = input(prompt_str) + if input_str == '"': + tags = previous_tags.split() + else: + tags = input_str.split() + p = Ping(time, tags, []) + self.taglog.log_ping(p, True, self.rc.linelen) + + def _get_next_ping_time(self) -> int: + """ + Figure out the next ping after the last one that's in the log file. + + A ping is on schedule if it was generated from the seed/urping pair + used by self.timer and off schedule, otherwise.There are three cases. + + 1. If no ping exists yet, next_ping_time is the latest time on schedule + before launch time. + 2. If the last ping is on schedule, next_ping_time is the time after + the time of the last ping. + 3. If the last ping is not on schedule, next_ping_time is the next + time after the last ping that is on schedule. + + Note that the third case is different from the original implementation: + + $nxtping = prevping($launchTime); # original implementation in Perl + $nxtping = nextping($lstping); # this implementation in Perl + + The original implementation could have the effect that next_ping has a + lower value than last_ping which means the resulting pings would not be + sorted by ascending time-stamp anymore. + """ + launch_time = self.timer.time_now() + if not self.taglog.exists(): + next_time = self.timer.prev(launch_time) + else: + last_ping = self.taglog.last_ping() + last_time = last_ping.time + self.timer.prev(last_time) + next_time = self.timer.next() + if last_time == next_time: + next_time = self.timer.next() + else: + line = last_ping.line.strip() + m = "{}".format(line) + logging.warning(m) + return next_time + + def _call(self, args: List[str]): + """ + Create subprocess with the provided args. Use PIPEs to avoid "read from + shell input/output error" leaking into the stderr of this process. + """ + subprocess.call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + def _prompt_and_log_xt(self, next_time: int): + """ + Open a second instance of TagTime with the ping command to prompt the + user for their current tags. The response is written into a temporary + file. Read out the temporary file here and log the ping into the main + log file. + """ + + # Create a temporary file and get it's name. Then delete the object so + # that the other process can access the file. According to the + # documentation it depends on the platform whether the file could be + # accessed from other processes while it is open [1]. + # [1] https://docs.python.org/3.8/library/tempfile.html + temp_file = tempfile.NamedTemporaryFile() + temp_file_name = temp_file.name + del temp_file + + args = self.rc.xt.split() # Launch terminal + args += ["-T", "TagTimer"] # Title is TagTime + args += ["-e"] + self.rc.tagtimecmd # Execute tagtimecmd in terminal + args += ["ping"] # Ping command + args += ["--time", str(next_time)] # Log time + args += ["--log_file"] # Explicitely provide log file + args += [temp_file_name] # Log into the temporary file + args += ["--previous_tags"] # Provide perevious tags for ditto + args += [" ".join(self.taglog.last_tags())] + + self._call(args) + + # Read the ping from the temporary file + ping = None + try: + with open(temp_file_name, "r") as f: + line = f.read() + ping = Ping.line_to_ping(line) + os.remove(temp_file_name) + except EnvironmentError: + logging.warning("Cannot open temporary file. Log error.") + except AttributeError: + logging.warning("Could not read Ping from temporary file.") + + if ping: + # If we got a ping log the line (don't mess with the order of the + # tags/comments. + self.taglog.log_line(ping.line) + else: + # Otherwise, create a new ping object with error tags. + ping = Ping(next_time, self.rc.tags_err, []) + self.taglog.log_ping(ping, True, self.rc.linelen) + + def _prompt_and_log(self, next_time: int): + if self.gui: + self._prompt_and_log_tk(next_time) + else: + self._prompt_and_log_xt(next_time) + + def _prompt_and_log_tk(self, next_time: int): + """Open GUI prompt via TK and log tags.""" + import tkinter as tk + from tkinter import simpledialog + + root = tk.Tk() + root.withdraw() + + previous_tags = " ".join(self.taglog.last_tags()) + prompt_str = self._get_prompt(next_time, previous_tags) + input_str = simpledialog.askstring(title="TagTime", + prompt=prompt_str) + if input_str == "": + tags = self.rc.tags_err + elif input_str == '"': + tags = previous_tags.split() + else: + tags = input_str.split() + + ping = Ping(next_time, tags, []) + self.taglog.log_ping(ping, True, self.rc.linelen) + + def _open_editor(self): + if not self.rc.ed: + return + args = self.rc.ed.split() # Launch terminal + args += [self.rc.log_file] # and open the log file + self._call(args) + + def daemon(self): + next_time = self._get_next_ping_time() + launch_time = self.timer.time_now() + open_editor = False + + # If we missed any pings by more than $retrothresh seconds for no + # apparent reason, then assume the computer was off and auto-log them. + while next_time < (launch_time - self.rc.retrothresh): + p = Ping(next_time, self.rc.tags_off, []) + self.taglog.log_ping(p, True, self.rc.linelen) + next_time = self.timer.next() + open_editor = True + + while True: + now = self.timer.time_now() + if next_time < (now - self.rc.retrothresh): + p = Ping(next_time, self.rc.tags_afk, []) + self.taglog.log_ping(p, True, self.rc.linelen) + next_time = self.timer.next() + open_editor = True + elif next_time < now: + self._prompt_and_log(next_time) + next_time = self.timer.next() + else: + if open_editor: + self._open_editor() + open_editor = False + time.sleep(30) diff --git a/pyqt/README b/pyqt/README new file mode 100644 index 0000000..3256d6a --- /dev/null +++ b/pyqt/README @@ -0,0 +1,2 @@ +PyQT implementation of TagTime by Arthur Breitman. + diff --git a/pyqt/pingdialog.ui b/pyqt/pingdialog.ui new file mode 100644 index 0000000..3ee41f6 --- /dev/null +++ b/pyqt/pingdialog.ui @@ -0,0 +1,54 @@ + + +PingDialog + + + + + 0 + 0 + 400 + 150 + + + + + 0 + 0 + + + + TagTime + + + Tag your current activity + + + + + + + + + + + + + + + + + + &Tag + + + + + + + + + + + + diff --git a/pyqt/tagtime.py b/pyqt/tagtime.py new file mode 100644 index 0000000..65d24b0 --- /dev/null +++ b/pyqt/tagtime.py @@ -0,0 +1,129 @@ +import ctypes +import sqlite3 +from datetime import datetime, timedelta +import math +import sys +from PyQt4 import QtCore, QtGui, uic + +#todo: a gui to save the tags in the popup +#todo: a main window that goes in the status bar with configuration for account / sync + +def xor_shift(x): + x ^= (ctypes.c_uint64(x).value << 21) + x ^= (ctypes.c_uint64(x).value >> 35) + x ^= (ctypes.c_uint64(x).value << 4) + return ctypes.c_uint64(x).value + +def clean_tag(tag): + return tag.lower().strip() + +class Storage: + create_pings_query = """Create TABLE if not exists pings + (seed INTEGER PRIMARY KEY, time NUMERIC, answered NUMERIC)""" + create_tags_query = """Create TABLE if not exists tags + (id INTEGER PRIMARY KEY, tag TEXT UNIQUE)""" + create_tagged_query = """Create TABLE if not exists tagged + (id INTEGER PRIMARY KEY, tag_id INTEGER, seed INTEGER)""" + + insert_ping_query = """INSERT INTO pings VALUES(?,?,?)""" + insert_tag_query = """INSERT OR IGNORE INTO tags (tag) VALUES(?)""" + insert_tagged_query = """INSERT INTO tagged (tag_id, seed) SELECT id, ? FROM tags WHERE tag = ?""" + + def __init__(self): + self.conn = sqlite3.connect('storage.db') + self.cursor = self.conn.cursor() + self.cursor.execute( Storage.create_pings_query ) + self.cursor.execute( Storage.create_tags_query ) + self.cursor.execute( Storage.create_tagged_query) + self.conn.commit() + + def save_ping(self, ping): + #sqllite needs to store the seed as a signed 64 bit integer + seed = ctypes.c_int64(ping.seed).value + print "saving ping #%(seed)d for time #%(time)s" % {'seed':seed, 'time':datetime.now()} + + self.cursor.execute( Storage.insert_ping_query, + (seed, ping.time, ping.answered) ) + self.cursor.executemany( Storage.insert_tag_query, [[t] for t in ping.tags if len(t) > 0] ) + self.cursor.executemany( Storage.insert_tagged_query, [[seed, tag] for tag in ping.tags if len(tag) > 0] ) + self.conn.commit() + +class Ping: + def __init__(self, seed, time): + self.seed = seed + self.time = time + self.answered = False + + def reply(self, tags): + self.tags = map( clean_tag, tags ) + self.answered = True + + def next_ping(self): + dt = timedelta( minutes=-45*math.log( 1 - self.seed / float(1<<64) ) ) + if timedelta < 0: + print timedelta, self.seed + return Ping( xor_shift(self.seed), self.time + dt ) + + +class PingDialog(QtGui.QWidget): + + def onTagButtonClicked(self): + self.hide() + tags = self.tagEdit.text().__str__().split(',') + self.ping.reply(tags) + app.storage.save_ping(self.ping) + + def __init__(self, ping): + super(PingDialog, self).__init__() + self.ping = ping + uic.loadUi('pingdialog.ui', self) + self.label.setText( datetime.now().strftime("%c") + ": what are you doing right now?" ) + self.tagButton.clicked.connect(self.onTagButtonClicked) + self.show() + self.activateWindow() + self.raise_() + +class Control(QtGui.QApplication): + + def wake(self): + print "time activated" + p = self.current_ping + self.current_ping = p.next_ping() + dt = self.current_ping.time - datetime.now() + self.timer.singleShot(dt.total_seconds() * 1000, self.wake) + print "Setting timer in %f seconds" % dt.total_seconds() + self.ping_dialog = PingDialog(p) + + + def __init__(self, *args): + QtGui.QApplication.__init__(self, *args) + + #ping storage + self.storage = Storage() + + # find current ping by iterating until finding the next ping + #TODO: use latest ping from web database or local storage! + p = Ping( 1234, datetime.fromtimestamp(1.335e9)) + while p.time < datetime.now(): + p = p.next_ping() + self.current_ping = p + + #set alarm for ping + #TODO: check potential race condition + self.timer = QtCore.QTimer() + dt = self.current_ping.time - datetime.now() + print "Setting timer in %f seconds" % dt.total_seconds() + + self.timer.singleShot(dt.total_seconds() * 1000, self.wake) + + +def main(): + global app + app = Control(sys.argv) + app.setQuitOnLastWindowClosed(False) + sys.exit(app.exec_()) + +if __name__ == '__main__': + main() + + diff --git a/taglog.py b/taglog.py new file mode 100644 index 0000000..15e7727 --- /dev/null +++ b/taglog.py @@ -0,0 +1,189 @@ +import os +import sys +import logging +import timer +from typing import List, Tuple, Iterable +from ping import Ping + + +class TagLog: + """ + Thread-safe abstraction of log file. Enables easy implementation of + different log file type if desired. + """ + + def __init__(self, log_file: str) -> None: + self.log_file = log_file + self.lock_file = self.log_file + ".lock" + self.got_lock = False + self.acquire_lock() + + def __del__(self): + self.release_lock() + + def acquire_lock(self): + """ Acquire lock file and terminate if it already exists. """ + if os.path.isfile(self.lock_file): + m = "Could not get lock {}".format(self.lock_file) + logging.debug(m) + sys.exit(1) + else: + with open(self.lock_file, 'w') as f: + f.write("") + self.got_lock = True + logging.debug("Acquired lock_file={}".format(self.lock_file)) + + def release_lock(self): + """ + Release lock. Executed during object destruction, so no need to call + explicitly. + """ + if not self.got_lock: + return + + if os.path.isfile(self.lock_file): + os.remove(self.lock_file) + logging.debug("Released lock_file={}".format(self.lock_file)) + else: + logging.debug("Lock already released.") + + def exists(self): + """True if log file exists and contains at least one valid ping.""" + try: + self.last_ping() + except (FileNotFoundError, AttributeError): + return False + return True + + def last_ping(self) -> Ping: + """Returns last Ping if there is one. Exception otherwise.""" + with open(self.log_file, 'r') as f: + ping = Ping.line_to_ping(f.readlines()[-1]) + return ping + + def last_ping_time(self) -> int: + """Returns last ping time f there is one. Exception otherwise.""" + return self.last_ping().time + + def last_tags(self) -> List[str]: + """Returs the tags of the last ping.""" + try: + return self.last_ping().tags + except: + return [] + + def all_pings(self) -> Iterable[Ping]: + """Returns an iterator over all pings in the log.""" + with open(self.log_file, 'r') as f: + for line in f.readlines(): + try: + yield Ping.line_to_ping(line) + except AttributeError: + logging.error("Invalid line {}".format(line)) + + def grep(self, start: int, end: int) -> List[Ping]: + """Returns all pings between start and end as an iterator.""" + for ping in self.all_pings(): + if ping.time >= start and ping.time <= end: + yield ping + + def validate(self, verbose: bool = True) -> bool: + """ + Iterates over log and returns True if all lines are valid Pings. + Returns False otherwise. Also prints some statistics. + """ + unique_tags = set() + total_tags, valid_pings, invalid_pings = 0, 0, 0 + previous_time, initial_time, last_time = 0, 0, 0 + with open(self.log_file, 'r') as f: + for line_number, line in enumerate(f.readlines()): + try: + p = Ping.line_to_ping(line) + unique_tags |= set(p.tags) + valid_pings += 1 + total_tags += len(p.tags) + if initial_time == 0: + initial_time = p.time + if p.time <= previous_time: + m = "Line {} not ascending. Please fix order." + logging.error(m.format(line_number)) + else: + last_time = p.time + previous_time = p.time + except AttributeError: + m = "Line {}: {} is not a valid ping." + logging.error(m.format(line_number, line.strip())) + invalid_pings += 1 + + start_time = timer.time_to_str(initial_time) + print("You have started tagging on {}.".format(start_time)) + print("Your TagTime log contains {} valid pings.".format(valid_pings)) + if invalid_pings > 0: + print("It also contains {} invalid pings.".format(invalid_pings)) + m = "You have logged {} unique tags and {} tags in total." + print(m.format(len(unique_tags), total_tags)) + last_time = timer.time_to_str(last_time) + print("You have last tagged on {}.".format(last_time)) + + if invalid_pings > 0: + return False + return True + + + def get_tags(self) -> List[Tuple[int, str]]: + """ + Returns a list of tuples where the first element is an integer and the + second element is a tag-string sorted from highest to lowest number of + occurences. For example, when called on a log with this content: + + 1600805511 quz bar [2020.09.22 16:11:51 TUE] + 1600806947 quz off [2020.09.22 16:35:47 TUE] + + The function would return: + + [(2, quz), (1, bar), (1, off)] + """ + if not self.exists(): + return [] + + def add_tags(new_tags, tag_bag): + for t in new_tags: + try: + tag_bag[t] += 1 + except KeyError: + tag_bag[t] = 1 + + tags = {} + with open(self.log_file, 'r') as f: + for line in f.readlines(): + try: + p = Ping.line_to_ping(line) + except: + continue + add_tags(p.tags, tags) + tags = [(value, key) for key, value in tags.items()] + return sorted(tags, reverse=True) + + def log_ping(self, ping: Ping, annotate_time: bool = False, line_len: int = 79): + """ + Creates a line from time, tags, and comments, and writes it into the + log. + + If linelen is provided log_ping replicates the behavior of the original + Perl implementation and creates human-readable date/time annotation + comment right aligned to linelen. + """ + line = Ping.ping_to_line(ping, annotate_time, line_len) + with open(self.log_file, 'a') as f: + logging.debug("Logged {}".format(line)) + f.write(line + "\n") + + def log_line(self, line: str): + """Append line to log file. No questions asked.""" + with open(self.log_file, 'a') as f: + if line.endswith("\n"): + f.write(line) + line = line.strip() + else: + f.write(line + "\n") + logging.debug("Logged {}".format(line)) diff --git a/tagtime.py b/tagtime.py new file mode 100644 index 0000000..8d5f242 --- /dev/null +++ b/tagtime.py @@ -0,0 +1,105 @@ +import sys +import logging +import argparse + +import prompter +import tagtimerc +import timer +import taglog + + +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog="TagTime", + description=("TagTime compatible with " + "the original Perl implementation."), + formatter_class=argparse.RawTextHelpFormatter) + + parser.add_argument('--rc', + metavar='tagtimerc', + default='~/.tagtimerc', + help="""Path to tagtimerc (default: %(default)s)""", + type=str) + parser.set_defaults(func=None) + subparsers = parser.add_subparsers() + + p1 = subparsers.add_parser('daemon', help="Run in daemon mode") + p1.set_defaults(func=daemon) + p1.add_argument('--prompt', metavar="mode", + choices=["legacy", "gui"], + default="legacy", type=str, + help='Prompt via another terminal or via GUI popup.') + + p2 = subparsers.add_parser('ping', help="Answer a single ping") + p2.add_argument('--time', type=int, help="Ping time") + p2.add_argument('--log_file', type=str, help='Ping into this file') + p2.add_argument('--previous_tags', type=str, + help='String of previous tags for ditto feature.') + p2.set_defaults(func=ping) + + p3 = subparsers.add_parser('merge', help="Merge two or more logs") + p3.set_defaults(func=merge) + + p4 = subparsers.add_parser('validate', help="Validate log file") + p4.add_argument('--log_file', type=str, help='Grep from this log') + p4.set_defaults(func=validate) + + p5 = subparsers.add_parser('grep', help='Get pings between start and end') + p5.add_argument('--log_file', type=str, help='Grep from this log') + p5.set_defaults(func=grep) + p5.add_argument('start', type=str, help='Start in %Y.%m.%d') + p5.add_argument('end', type=str, help='End in %Y.%m.%d') + + return parser.parse_args() + + +def daemon(rc: tagtimerc.TagTimeRc, args: argparse.Namespace): + if args.prompt == "gui": + p = prompter.Prompter(rc, True) + else: + p = prompter.Prompter(rc) + p.daemon() + + +def ping(rc: tagtimerc.TagTimeRc, args: argparse.Namespace): + rc.log_file = args.log_file if args.log_file else rc.log_file + p = prompter.Prompter(rc) + args.time = args.time if args.time else p.timer.time_now() + p.ping(args.time, args.previous_tags) + + +def merge(rc: tagtimerc.TagTimeRc, args: argparse.Namespace): + raise Exception() + + +def validate(rc: tagtimerc.TagTimeRc, args: argparse.Namespace): + if args.log_file: + rc.log_file = args.log_file + log = taglog.TagLog(rc.log_file) + log.validate() + + +def grep(rc: tagtimerc.TagTimeRc, args: argparse.Namespace): + if args.log_file: + rc.log_file = args.log_file + log = taglog.TagLog(rc.log_file) + start = timer.date_str_to_time(args.start) + end = timer.date_str_to_time(args.end) + for p in log.grep(start, end): + print(p.line.strip()) + + +def main(): + logging.basicConfig(format='%(message)s', level=logging.DEBUG) + args = get_args() + rc = tagtimerc.parse_tagtimerc(args.rc) + + if args.func: + args.func(rc, args) + else: + logging.error("Usage: TagTime Option (or --help)") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tagtimerc.py b/tagtimerc.py new file mode 100644 index 0000000..f799029 --- /dev/null +++ b/tagtimerc.py @@ -0,0 +1,98 @@ +import os +import re +import sys +from dataclasses import dataclass, field +from typing import List + + +def get_tagtime_command() -> List[str]: + """ + The original Perl implementation spawns another Perl script in a terminal + to ask the user for their current tags. To replicate this behavior we find + out the python executable and the location of the tagtime.py script. + """ + python_exe = sys.executable + tagtimerc_py = os.path.abspath(__file__) + tagtime_py = tagtimerc_py.replace("tagtimerc.py", "tagtime.py") + return [python_exe, tagtime_py] + + +@dataclass +class TagTimeRc: + log_file: str + ed: str + xt: str + retrothresh: int + gap: int + urping: int + seed: int + linelen: int + catchup: int + tags_off: List[str] = field(default_factory=lambda: + ["afk", "off", "RETRO"]) + tags_afk: List[str] = field(default_factory=lambda: ["afk", "RETRO"]) + tags_err: List[str] = field(default_factory=lambda: ["err"]) + tagtimecmd: List[str] = field(default_factory=get_tagtime_command) + + +def value_to_int(value: str) -> int: + """ + The tagtimerc file is itself a Perl script. That means the assignments to + the configuration parameters can be Perl expressions. We could replicate + that behavior via `eval`, but we don't want to do that because it allows + arbitrary code execution. Instead we only support multiplication for now + (`45*60`). + """ + try: + return int(value) + except ValueError: + pass + result = 1 + for v in value.split("*"): + result *= int(v) + return result + + +def parse_tagtimerc(tagtimerc_path: str) -> TagTimeRc: + """ + Parses the configuration attributes from a tagtimerc file into Python. + + All lines that start with a dollar sign are configuration lines. These + lines are than split into key and value. This function only considers + attributes that are part of TagTimeRc. + """ + s = ("^\s*" # potential leading whitespace + "\$(\w+)" # key as group (variables in Perl start with $) + "\s*=\s*" # equal sign including potential whitespaces around it + "\"?" # potential opening quote + "([^\";]+)" # value (everything that is not a semicolon or quote) + "\"?" # potential closing quote + ";") # semicolon to terminate key value pair + re_config_line = re.compile(s) + tagtimerc_path = os.path.expanduser(tagtimerc_path) + with open(tagtimerc_path, 'r') as f: + key_value_pairs = {m.groups()[0].lower(): m.groups()[1] + for line in f.readlines() + if (m := re_config_line.match(line))} + + # Get dictionary of the TagTimeRc attribute types. + tagtimerc_types = TagTimeRc.__annotations__ + + # Process the key-value-pairs. Keep expected pairs and store them into + # kwarsg. + kwargs = {} + for key, value in key_value_pairs.items(): + if key in tagtimerc_types and tagtimerc_types[key] is int: + kwargs[key] = value_to_int(value) + elif key == "logf": + # Give special treatman to logf, because it may contain other + # variables (`$logf = "$path$usr.log";`) Iterate over those + # variables and replace them with their value. + for variable in re.findall("\$\w+", value): + var = variable.replace("$", "") + value = value.replace(variable, key_value_pairs[var]) + kwargs["log_file"] = value + elif key in tagtimerc_types: + kwargs[key] = value + + return TagTimeRc(**kwargs) diff --git a/tests/tagtimerc b/tests/tagtimerc new file mode 100644 index 0000000..89a3579 --- /dev/null +++ b/tests/tagtimerc @@ -0,0 +1,67 @@ +# Settings for TagTime. +# This file must be in your home directory, called .tagtimerc +# NB: restart the daemon (tagtimed.pl) if you change this file. + +$usr = "auser"; # CHANGEME to your username +$path = "/home/auser/dev/TagTime/"; # CHANGEME to your path to tagtime +$logf = "$path$usr.log"; # log file for pings + +# If you're using windows, you'll need cygwin and to set this flag to 1: +$cygwin = 0; # CHANGEME to 1 if you're using windows/cygwin. + +$ED = "/usr/bin/vim +"; # CHANGEME if you don't like vi (eg: /usr/bin/pico) +$XT = "/usr/bin/st"; # CHANGEME to your path to xterm + +# Get your personal Beeminder auth token (after signing in) from +# https://www.beeminder.com/api/v1/auth_token.json +$beemauth = "abc123"; # CHANGEME to your personal beeminder auth token + +# WARNING WARNING WARNING: +# if you point this at a beeminder goal with data that was not generated from +# this tagtime log, it will DELETE ALL OF YOUR DATA +# CHANGEME by adding entries for each beeminder graph you want to auto-update: +%beeminder = ( + #"alice/work" => "job", # all "job" pings get added to bmndr.com/alice/work + #"bob/play" => ["fun","whee"], # pings w/ "fun" and/or "whee" sent to bob/play + + # ADVANCED USAGE: regular expressions + # pings tagged like "eat1", "eat2", "eat3" get added to carol/food: + #"carol/food" => qr/\beat\d+\b/, + + # ADVANCED USAGE: plug-in functions + # pings tagged anything except "afk" get added to "dan/nafk": + #"dan/nafk" => sub { return shift() !~ /\bafk\b/; } + # pings tagged "workout" get added to dave/tueworkouts, but only on tuesdays: + #"dave/tueworkouts" => sub { my @now = localtime(); + # return shift() =~/\bworkout\b/ && $now[6] == 2; + #} +); + +# Pings from more than this many seconds ago get autologged with tags "afk" and +# "RETRO". (Pings can be overdue either because the computer was off or tagtime +# was waiting for you to answer a previous ping. If the computer was off, the +# tag "off" is also added.) +$retrothresh = 60; + +# If you want the universal ping schedule, don't touch these 3 settings... +$gap = 45*60; # Average number of seconds between pings (eg, 60*60 = 1 hour). +$URPING = 1184097393; # Ur-ping, ie, the birth of timepie/tagtime! (unixtime) +$seed = 11193462; + +$linelen = 79; # Try to keep log lines at most this long. + +$catchup = 1; # Whether it beeps for old pings, ie, should it beep a bunch + # of times in a row when the computer wakes from sleep. + +$enforcenums = 0; # Whether it forces you to include a number in your + # ping response (include tag non or nonXX where XX is day + # of month to override). This is for task editor integration. + +# System command that will play a sound for pings. +# Often "play" or "playsound" on Linux, or "afplay" on Mac osx. +# $playsound = "afplay ${path}sound/blip-twang.wav"; +# $playsound = "echo -e '\a'"; # this is the default if $playsound not defined. +# $playsound = ""; # makes tagtime stay quiet. + + +1; # When requiring a library in perl it has to return 1. diff --git a/tests/test_ping.py b/tests/test_ping.py new file mode 100644 index 0000000..5ccadb0 --- /dev/null +++ b/tests/test_ping.py @@ -0,0 +1,121 @@ +import unittest +from ping import Ping + + +class TestLineToPing(unittest.TestCase): + def test_line_to_ping(self): + l = "1600790846 afk off RETRO [2020.09.22 12:07:26 TUE]" + p1 = Ping.line_to_ping(l) + p2 = Ping(1600790846, + ["afk", "off", "RETRO"], + ["2020.09.22 12:07:26 TUE"], + l) + self.assertEqual(p1, p2) + + l = " 1600790846 [2020.09.22 12:07:26 TUE] foo bar (lol)" + p1 = Ping.line_to_ping(l) + p2 = Ping(1600790846, + ["foo", "bar"], + ["lol", "2020.09.22 12:07:26 TUE"], + l) + self.assertEqual(p1, p2) + + l = "1600790846 12weird #tags :should work" + p1 = Ping.line_to_ping(l) + p2 = Ping(1600790846, + ["12weird", "#tags", ":should", "work"], + [], + l) + self.assertEqual(p1, p2) + + def test_line_to_ping_no_timestamp(self): + l = " 846 [2020.09.22 12:07:26 TUE] foo bar (lol)" + with self.assertRaises(AttributeError): + Ping.line_to_ping(l) + + l = "foo bar (lol)" + with self.assertRaises(AttributeError): + Ping.line_to_ping(l) + + def test_line_to_ping_no_tags(self): + l = "1600790846 [2020.09.22 12:07:26 TUE] (foo) [baar]" + p = Ping.line_to_ping(l) + self.assertEqual([], p.tags) + + +class TestGetTags(unittest.TestCase): + def test_get_tags(self): + tags = Ping.get_tags("123456789 foo #bar quz [sd ab)") + expected = ["foo", "#bar", "quz", "[sd", "ab)"] + self.assertEqual(tags, expected) + + +class TestPingToLine(unittest.TestCase): + + def test_ping_to_line(self): + p = Ping(1600790846, + ["afk", "off", "RETRO"], + ["qul", "2020.09.22 12:07:26 TUE"]) + line = Ping.ping_to_line(p) + expected = "1600790846 afk off RETRO [qul] [2020.09.22 12:07:26 TUE]" + self.assertEqual(line, expected) + + def test_ping_to_line_with_annotation(self): + p = Ping(1600790846, + ["afk", "off", "RETRO"], + ["qul"]) + line = Ping.ping_to_line(p, True, 57) + expected = "1600790846 afk off RETRO [qul] [2020.09.22 12:07:26 Tue]" + self.assertEqual(line, expected) + + +class TestAddTimeAnnotation(unittest.TestCase): + + def test_add_time_annotation_24(self): + time = 1600790846 + line = str(time) + new_line = Ping.add_time_annotation(time, line, 40) + exp_line = line + " [2020.09.22 12:07:26 Tue]" + self.assertEqual(new_line, exp_line) + + def test_add_time_annotation_18(self): + time = 1600790846 + line = str(time) + new_line = Ping.add_time_annotation(time, line, 29) + exp_line = line + " [09.22 12:07:26 Tue]" + self.assertEqual(new_line, exp_line) + + def test_add_time_annotation_15(self): + time = 1600790846 + line = str(time) + new_line = Ping.add_time_annotation(time, line, 26) + exp_line = line + " [22 12:07:26 Tue]" + self.assertEqual(new_line, exp_line) + + def test_add_time_annotation_12(self): + time = 1600790846 + line = str(time) + new_line = Ping.add_time_annotation(time, line, 23) + exp_line = line + " [12:07:26 Tue]" + self.assertEqual(new_line, exp_line) + + def test_add_time_annotation_9(self): + time = 1600790846 + line = str(time) + new_line = Ping.add_time_annotation(time, line, 20) + exp_line = line + " [12:07 Tue]" + self.assertEqual(new_line, exp_line) + + def test_add_time_annotation_5(self): + time = 1600790846 + line = str(time) + new_line = Ping.add_time_annotation(time, line, 16) + exp_line = line + " [12:07]" + self.assertEqual(new_line, exp_line) + + def test_add_time_annotation_else(self): + time = 1600790846 + line = str(time) + new_line = Ping.add_time_annotation(time, line, 10) + exp_line = line + " [07]" + self.assertEqual(new_line, exp_line) diff --git a/tests/test_taglog.py b/tests/test_taglog.py new file mode 100644 index 0000000..d90d23e --- /dev/null +++ b/tests/test_taglog.py @@ -0,0 +1,67 @@ +import os +import unittest +import taglog + + +class TestTagLog(unittest.TestCase): + TEST_LOG_1 = "tests/user1.log" + TEST_LOG_2 = "tests/user2.log" + TEST_LOG_3 = "tests/user3.log" + TEST_LOG_1_LOCK = TEST_LOG_1 + ".lock" + TEST_LOG_2_LOCK = TEST_LOG_2 + ".lock" + + def test_tag_log_locking(self): + tl1 = taglog.TagLog(self.TEST_LOG_1) + tl2 = taglog.TagLog(self.TEST_LOG_2) + # Make sure that lock files are created. + self.assertTrue(os.path.isfile(self.TEST_LOG_1_LOCK)) + self.assertTrue(os.path.isfile(self.TEST_LOG_2_LOCK)) + # Make sure that lock files are deleted on object destruction. + del tl2 + self.assertFalse(os.path.isfile(self.TEST_LOG_2_LOCK)) + del tl1 + self.assertFalse(os.path.isfile(self.TEST_LOG_1_LOCK)) + + def test_tag_log_locking_conflict(self): + tl1 = taglog.TagLog(self.TEST_LOG_1) + self.assertTrue(os.path.isfile(self.TEST_LOG_1_LOCK)) + + # Creating another TagLog object for the same log caues a system exit. + with self.assertRaises(SystemExit): + tl2 = taglog.TagLog(self.TEST_LOG_1) + + # Create another TagLog after destroying the first one. + del tl1 + tl2 = taglog.TagLog(self.TEST_LOG_1) + self.assertTrue(os.path.isfile(self.TEST_LOG_1_LOCK)) + del tl2 + + def test_exists(self): + tl = taglog.TagLog(self.TEST_LOG_1) + self.assertTrue(tl.exists()) + tl = taglog.TagLog(self.TEST_LOG_3) + self.assertFalse(tl.exists()) + + def test_last_ping(self): + Ping = taglog.Ping + p1 = taglog.TagLog(self.TEST_LOG_1).last_ping() + line = "1600808609 (lol) afk bar [lulz]\n" + p2 = Ping(1600808609, ["afk", "bar"], ["lol", "lulz"], line) + self.assertEqual(p1, p2) + + def test_last_ping_time(self): + time = taglog.TagLog(self.TEST_LOG_1).last_ping_time() + self.assertEqual(time, 1600808609) + + time = taglog.TagLog(self.TEST_LOG_2).last_ping_time() + self.assertEqual(time, 1601543391) + + def test_get_tags(self): + tags = taglog.TagLog(self.TEST_LOG_1).get_tags() + expected_tags = [ + (6, "afk"), + (5, "off"), + (3, "bar"), + (1, "quz"), + (1, "foo")] + self.assertEqual(tags, expected_tags) diff --git a/tests/test_tagtimerc.py b/tests/test_tagtimerc.py new file mode 100644 index 0000000..2a6045d --- /dev/null +++ b/tests/test_tagtimerc.py @@ -0,0 +1,26 @@ +import unittest +import tagtimerc + + +class TestTagTimeRc(unittest.TestCase): + + def test_parse_tagtimerc(self): + test_rc = "./tests/tagtimerc" + ttrc = tagtimerc.parse_tagtimerc(test_rc) + self.assertEqual(ttrc.log_file, "/home/auser/dev/TagTime/auser.log") + self.assertEqual(ttrc.ed, "/usr/bin/vim +") + self.assertEqual(ttrc.xt, "/usr/bin/st") + self.assertEqual(ttrc.retrothresh, 60) + self.assertEqual(ttrc.gap, 45 * 60) + self.assertEqual(ttrc.urping, 1184097393) + self.assertEqual(ttrc.seed, 11193462) + self.assertEqual(ttrc.linelen, 79) + self.assertEqual(ttrc.catchup, 1) + + def test_value_to_int(self): + value_to_int = tagtimerc.value_to_int + self.assertEqual(value_to_int("312"), 312) + with self.assertRaises(ValueError): + value_to_int("") + self.assertEqual(value_to_int("12 * 3*2"), 72) + diff --git a/tests/test_timer.py b/tests/test_timer.py new file mode 100644 index 0000000..24edc8a --- /dev/null +++ b/tests/test_timer.py @@ -0,0 +1,77 @@ +import unittest +import timer + + +class TestTimer(unittest.TestCase): + + def create_timer(self): + seed, urping, gap = 11193462, 1184097393, 45 * 60 + return timer.Timer(seed, urping, gap) + + def test_create_timer(self): + p = self.create_timer() + self.assertEqual(p.seed, 11193462) + self.assertEqual(p.ping, 1184097393) + self.assertEqual(p.gap, 45 * 60) + self.assertEqual(p.ia, 16807) + self.assertEqual(p.im, 2147483647) + + def test_ran0(self): + p = self.create_timer() + self.assertEqual(p.ran0(), 1297438545) + self.assertEqual(p.ran0(), 500674177) + self.assertEqual(p.ran0(), 989963893) + + def test_ran01(self): + p = self.create_timer() + self.assertEqual(p.ran01(), 0.6041669033487174) + self.assertEqual(p.ran01(), 0.2331445818921293) + self.assertEqual(p.ran01(), 0.4609878610172252) + + def test_exprand(self): + p = self.create_timer() + self.assertEqual(p.exprand(), 1360.5429307641098) + self.assertEqual(p.exprand(), 3931.4605357416476) + self.assertEqual(p.exprand(), 2090.8356340914393) + + def test_next_at_urping(self): + p = self.create_timer() + self.assertEqual(p.next(), 1184098754) + self.assertEqual(p.next(), 1184102685) + self.assertEqual(p.next(), 1184104776) + self.assertEqual(p.next(), 1184105302) + + def test_next_in_2020(self): + """ I took the following ping times from the Perl implementation for + this test. + + 1600803512 afk off RETRO [2020.09.22 15:38:32 TUE] + 1600805511 afk off RETRO [2020.09.22 16:11:51 TUE] + 1600806947 afk off RETRO [2020.09.22 16:35:47 TUE] + """ + p = self.create_timer() + self.assertEqual(p.prev(1600803600), 1600803512) + self.assertEqual(p.next(), 1600805511) + self.assertEqual(p.next(), 1600806947) + + def test_prev(self): + p = self.create_timer() + time = 1601502077 # 2020/09/30 17:40:26 ET + self.assertEqual(p.prev(time), 1601502026) + self.assertEqual(p.seed, 1953937112) + + def test_prev_repeat(self): + """ Calling prev multiple times should work even if the timer has + already moved past time. """ + p = self.create_timer() + self.assertEqual(p.prev(1600803600), 1600803512) + self.assertEqual(p.next(), 1600805511) + self.assertEqual(p.next(), 1600806947) + self.assertEqual(p.prev(1600803600), 1600803512) + self.assertEqual(p.next(), 1600805511) + self.assertEqual(p.next(), 1600806947) + + def test_time_now(self): + p = self.create_timer() + self.assertEqual(type(p.time_now()), int) + diff --git a/tests/user1.log b/tests/user1.log new file mode 100644 index 0000000..f1fd111 --- /dev/null +++ b/tests/user1.log @@ -0,0 +1,8 @@ +1600790846 foo bar [2020.09.22 12:07:26 TUE] +1600791982 afk off [2020.09.22 12:26:22 TUE] +1600793495 afk off [2020.09.22 12:51:35 TUE] +1600797909 afk off [2020.09.22 14:05:09 TUE] +1600803512 afk off [2020.09.22 15:38:32 TUE] +1600805511 quz bar [2020.09.22 16:11:51 TUE] +1600806947 afk off [2020.09.22 16:35:47 TUE] +1600808609 (lol) afk bar [lulz] diff --git a/tests/user2.log b/tests/user2.log new file mode 100644 index 0000000..357d6ca --- /dev/null +++ b/tests/user2.log @@ -0,0 +1,9 @@ +1601534943 afk off RETRO [2020.10.01 02:49:03 THU] +1601535769 afk off RETRO [2020.10.01 03:02:49 THU] +1601535868 afk off RETRO [2020.10.01 03:04:28 THU] +1601537458 afk off RETRO [2020.10.01 03:30:58 THU] +1601538061 afk off RETRO [2020.10.01 03:41:01 THU] +1601538726 afk off RETRO [2020.10.01 03:52:06 THU] +1601538750 afk off RETRO [2020.10.01 03:52:30 THU] +1601542553 afk off RETRO [2020.10.01 04:55:53 THU] +1601543391 afk off RETRO [2020.10.01 05:09:51 THU] diff --git a/timer.py b/timer.py new file mode 100644 index 0000000..361a829 --- /dev/null +++ b/timer.py @@ -0,0 +1,94 @@ +import sys +import math +import time +import datetime +import logging + + +class Timer: + def __init__(self, seed: int, ping: int, gap: int): + self.seed = seed + self.ping = ping + self.gap = gap + self.ini_seed = seed # remember initial seed/ping for prev method + self.ini_ping = ping + self.ia = 16807 # const for RNG (p37 of Simulation by Ross) + self.im = 2**31 - 1 # const for RNG + + def ran0(self) -> int: + """ Returns a random integer in [1, self.im - 1]; changes self.seed, + i.e., RNG state. (This is ran0 from Numerical Recipes and has a period + of ~2 billion.) """ + self.seed = (self.ia * self.seed) % self.im + return self.seed + + def ran01(self) -> float: + """Returns a U(0,1) random number. Changes seed.""" + return self.ran0() / self.im + + def exprand(self) -> float: + """ Returns a random number drawn from an exponential distribution with + mean self.gap (defined in settings file). Changes seed. """ + return -1 * self.gap * math.log(self.ran01()) + + def next(self) -> int: + """Returns random next ping time in unix time. Changes seed.""" + prev_ping = self.ping + self.ping = max(prev_ping + 1, round(prev_ping + self.exprand())) + return self.ping + + def prev(self, time: int) -> int: + """ + Computes the last scheduled ping time before time. This updates the + internal state of Ping to the last seed/ping-pair before time. In other + words, the next call to next_ping will return a ping >= time. Another + name for this function would be `forward`. + """ + assert(self.ini_ping < time) + self.seed = self.ini_seed + self.ping = self.ini_ping + + # Compute new pings till self.ping >= time. + while self.ping < time: + last_ping = self.ping + last_seed = self.seed + self.next() # updates self.ping and self.seed + + # Restore ping/seed state to the values before time. + self.ping = last_ping + self.seed = last_seed + + # Return ping before time. + return last_ping + + def time_now(self) -> int: + return now() + + +def now() -> int: + """Returns the current unix time in seconds as an integer.""" + return int(time.time()) + + +def date_str_to_time(date_str: str) -> int: + """Parse string into UNIX time.""" + + formats = ['%Y.%m.%d', '%Y-%m-%d', '%Y/%m/%d', '%d.%m.%Y'] + time = None + for f in formats: + try: + datetime_object = datetime.datetime.strptime(date_str, f) + time = int(datetime_object.timestamp()) + break + except ValueError: + pass + if time is None: + logging.error("Could not parse {} to date".format(date_str)) + sys.exit(1) + return time + + +def time_to_str(time: int) -> str: + """Transform UNIX time to human-readable string.""" + strf = "%Y.%m.%d %H:%M:%S %a" + return datetime.datetime.fromtimestamp(time).strftime(strf)