import os import sys import logging import timer from typing import List, Tuple, Iterable from ping import Ping class TagLog: """ Thread-safe abstraction of log file. Enables easy implementation of different log file type if desired. """ def __init__(self, log_file: str) -> None: self.log_file = log_file self.lock_file = self.log_file + ".lock" self.got_lock = False self.acquire_lock() def __del__(self): self.release_lock() def acquire_lock(self): """ Acquire lock file and terminate if it already exists. """ if os.path.isfile(self.lock_file): m = "Could not get lock {}".format(self.lock_file) logging.debug(m) sys.exit(1) else: with open(self.lock_file, 'w') as f: f.write("") self.got_lock = True logging.debug("Acquired lock_file={}".format(self.lock_file)) def release_lock(self): """ Release lock. Executed during object destruction, so no need to call explicitly. """ if not self.got_lock: return if os.path.isfile(self.lock_file): os.remove(self.lock_file) logging.debug("Released lock_file={}".format(self.lock_file)) else: logging.debug("Lock already released.") def exists(self): """True if log file exists and contains at least one valid ping.""" try: self.last_ping() except (FileNotFoundError, AttributeError): return False return True def last_ping(self) -> Ping: """Returns last Ping if there is one. Exception otherwise.""" with open(self.log_file, 'r') as f: ping = Ping.line_to_ping(f.readlines()[-1]) return ping def last_ping_time(self) -> int: """Returns last ping time f there is one. Exception otherwise.""" return self.last_ping().time def last_tags(self) -> List[str]: """Returs the tags of the last ping.""" try: return self.last_ping().tags except: return [] def all_pings(self) -> Iterable[Ping]: """Returns an iterator over all pings in the log.""" with open(self.log_file, 'r') as f: for line in f.readlines(): try: yield Ping.line_to_ping(line) except AttributeError: logging.error("Invalid line {}".format(line)) def grep(self, start: int, end: int) -> List[Ping]: """Returns all pings between start and end as an iterator.""" for ping in self.all_pings(): if ping.time >= start and ping.time <= end: yield ping def validate(self, verbose: bool = True) -> bool: """ Iterates over log and returns True if all lines are valid Pings. Returns False otherwise. Also prints some statistics. """ unique_tags = set() total_tags, valid_pings, invalid_pings = 0, 0, 0 previous_time, initial_time, last_time = 0, 0, 0 with open(self.log_file, 'r') as f: for line_number, line in enumerate(f.readlines()): try: p = Ping.line_to_ping(line) unique_tags |= set(p.tags) valid_pings += 1 total_tags += len(p.tags) if initial_time == 0: initial_time = p.time if p.time <= previous_time: m = "Line {} not ascending. Please fix order." logging.error(m.format(line_number)) else: last_time = p.time previous_time = p.time except AttributeError: m = "Line {}: {} is not a valid ping." logging.error(m.format(line_number, line.strip())) invalid_pings += 1 start_time = timer.time_to_str(initial_time) print("You have started tagging on {}.".format(start_time)) print("Your TagTime log contains {} valid pings.".format(valid_pings)) if invalid_pings > 0: print("It also contains {} invalid pings.".format(invalid_pings)) m = "You have logged {} unique tags and {} tags in total." print(m.format(len(unique_tags), total_tags)) last_time = timer.time_to_str(last_time) print("You have last tagged on {}.".format(last_time)) if invalid_pings > 0: return False return True def get_tags(self) -> List[Tuple[int, str]]: """ Returns a list of tuples where the first element is an integer and the second element is a tag-string sorted from highest to lowest number of occurences. For example, when called on a log with this content: 1600805511 quz bar [2020.09.22 16:11:51 TUE] 1600806947 quz off [2020.09.22 16:35:47 TUE] The function would return: [(2, quz), (1, bar), (1, off)] """ if not self.exists(): return [] def add_tags(new_tags, tag_bag): for t in new_tags: try: tag_bag[t] += 1 except KeyError: tag_bag[t] = 1 tags = {} with open(self.log_file, 'r') as f: for line in f.readlines(): try: p = Ping.line_to_ping(line) except: continue add_tags(p.tags, tags) tags = [(value, key) for key, value in tags.items()] return sorted(tags, reverse=True) def log_ping(self, ping: Ping, annotate_time: bool = False, line_len: int = 79): """ Creates a line from time, tags, and comments, and writes it into the log. If linelen is provided log_ping replicates the behavior of the original Perl implementation and creates human-readable date/time annotation comment right aligned to linelen. """ line = Ping.ping_to_line(ping, annotate_time, line_len) with open(self.log_file, 'a') as f: logging.debug("Logged {}".format(line)) f.write(line + "\n") def log_line(self, line: str): """Append line to log file. No questions asked.""" with open(self.log_file, 'a') as f: if line.endswith("\n"): f.write(line) line = line.strip() else: f.write(line + "\n") logging.debug("Logged {}".format(line))