tagtimepy/taglog.py

190 lines
6.5 KiB
Python

import os
import sys
import logging
import timer
from typing import List, Tuple, Iterable
from ping import Ping
class TagLog:
"""
Thread-safe abstraction of log file. Enables easy implementation of
different log file type if desired.
"""
def __init__(self, log_file: str) -> None:
self.log_file = log_file
self.lock_file = self.log_file + ".lock"
self.got_lock = False
self.acquire_lock()
def __del__(self):
self.release_lock()
def acquire_lock(self):
""" Acquire lock file and terminate if it already exists. """
if os.path.isfile(self.lock_file):
m = "Could not get lock {}".format(self.lock_file)
logging.debug(m)
sys.exit(1)
else:
with open(self.lock_file, 'w') as f:
f.write("")
self.got_lock = True
logging.debug("Acquired lock_file={}".format(self.lock_file))
def release_lock(self):
"""
Release lock. Executed during object destruction, so no need to call
explicitly.
"""
if not self.got_lock:
return
if os.path.isfile(self.lock_file):
os.remove(self.lock_file)
logging.debug("Released lock_file={}".format(self.lock_file))
else:
logging.debug("Lock already released.")
def exists(self):
"""True if log file exists and contains at least one valid ping."""
try:
self.last_ping()
except (FileNotFoundError, AttributeError):
return False
return True
def last_ping(self) -> Ping:
"""Returns last Ping if there is one. Exception otherwise."""
with open(self.log_file, 'r') as f:
ping = Ping.line_to_ping(f.readlines()[-1])
return ping
def last_ping_time(self) -> int:
"""Returns last ping time f there is one. Exception otherwise."""
return self.last_ping().time
def last_tags(self) -> List[str]:
"""Returs the tags of the last ping."""
try:
return self.last_ping().tags
except:
return []
def all_pings(self) -> Iterable[Ping]:
"""Returns an iterator over all pings in the log."""
with open(self.log_file, 'r') as f:
for line in f.readlines():
try:
yield Ping.line_to_ping(line)
except AttributeError:
logging.error("Invalid line {}".format(line))
def grep(self, start: int, end: int) -> List[Ping]:
"""Returns all pings between start and end as an iterator."""
for ping in self.all_pings():
if ping.time >= start and ping.time <= end:
yield ping
def validate(self, verbose: bool = True) -> bool:
"""
Iterates over log and returns True if all lines are valid Pings.
Returns False otherwise. Also prints some statistics.
"""
unique_tags = set()
total_tags, valid_pings, invalid_pings = 0, 0, 0
previous_time, initial_time, last_time = 0, 0, 0
with open(self.log_file, 'r') as f:
for line_number, line in enumerate(f.readlines()):
try:
p = Ping.line_to_ping(line)
unique_tags |= set(p.tags)
valid_pings += 1
total_tags += len(p.tags)
if initial_time == 0:
initial_time = p.time
if p.time <= previous_time:
m = "Line {} not ascending. Please fix order."
logging.error(m.format(line_number))
else:
last_time = p.time
previous_time = p.time
except AttributeError:
m = "Line {}: {} is not a valid ping."
logging.error(m.format(line_number, line.strip()))
invalid_pings += 1
start_time = timer.time_to_str(initial_time)
print("You have started tagging on {}.".format(start_time))
print("Your TagTime log contains {} valid pings.".format(valid_pings))
if invalid_pings > 0:
print("It also contains {} invalid pings.".format(invalid_pings))
m = "You have logged {} unique tags and {} tags in total."
print(m.format(len(unique_tags), total_tags))
last_time = timer.time_to_str(last_time)
print("You have last tagged on {}.".format(last_time))
if invalid_pings > 0:
return False
return True
def get_tags(self) -> List[Tuple[int, str]]:
"""
Returns a list of tuples where the first element is an integer and the
second element is a tag-string sorted from highest to lowest number of
occurences. For example, when called on a log with this content:
1600805511 quz bar [2020.09.22 16:11:51 TUE]
1600806947 quz off [2020.09.22 16:35:47 TUE]
The function would return:
[(2, quz), (1, bar), (1, off)]
"""
if not self.exists():
return []
def add_tags(new_tags, tag_bag):
for t in new_tags:
try:
tag_bag[t] += 1
except KeyError:
tag_bag[t] = 1
tags = {}
with open(self.log_file, 'r') as f:
for line in f.readlines():
try:
p = Ping.line_to_ping(line)
except:
continue
add_tags(p.tags, tags)
tags = [(value, key) for key, value in tags.items()]
return sorted(tags, reverse=True)
def log_ping(self, ping: Ping, annotate_time: bool = False, line_len: int = 79):
"""
Creates a line from time, tags, and comments, and writes it into the
log.
If linelen is provided log_ping replicates the behavior of the original
Perl implementation and creates human-readable date/time annotation
comment right aligned to linelen.
"""
line = Ping.ping_to_line(ping, annotate_time, line_len)
with open(self.log_file, 'a') as f:
logging.debug("Logged {}".format(line))
f.write(line + "\n")
def log_line(self, line: str):
"""Append line to log file. No questions asked."""
with open(self.log_file, 'a') as f:
if line.endswith("\n"):
f.write(line)
line = line.strip()
else:
f.write(line + "\n")
logging.debug("Logged {}".format(line))