190 lines
6.5 KiB
Python
190 lines
6.5 KiB
Python
import os
|
|
import sys
|
|
import logging
|
|
import timer
|
|
from typing import List, Tuple, Iterable
|
|
from ping import Ping
|
|
|
|
|
|
class TagLog:
|
|
"""
|
|
Thread-safe abstraction of log file. Enables easy implementation of
|
|
different log file type if desired.
|
|
"""
|
|
|
|
def __init__(self, log_file: str) -> None:
|
|
self.log_file = log_file
|
|
self.lock_file = self.log_file + ".lock"
|
|
self.got_lock = False
|
|
self.acquire_lock()
|
|
|
|
def __del__(self):
|
|
self.release_lock()
|
|
|
|
def acquire_lock(self):
|
|
""" Acquire lock file and terminate if it already exists. """
|
|
if os.path.isfile(self.lock_file):
|
|
m = "Could not get lock {}".format(self.lock_file)
|
|
logging.debug(m)
|
|
sys.exit(1)
|
|
else:
|
|
with open(self.lock_file, 'w') as f:
|
|
f.write("")
|
|
self.got_lock = True
|
|
logging.debug("Acquired lock_file={}".format(self.lock_file))
|
|
|
|
def release_lock(self):
|
|
"""
|
|
Release lock. Executed during object destruction, so no need to call
|
|
explicitly.
|
|
"""
|
|
if not self.got_lock:
|
|
return
|
|
|
|
if os.path.isfile(self.lock_file):
|
|
os.remove(self.lock_file)
|
|
logging.debug("Released lock_file={}".format(self.lock_file))
|
|
else:
|
|
logging.debug("Lock already released.")
|
|
|
|
def exists(self):
|
|
"""True if log file exists and contains at least one valid ping."""
|
|
try:
|
|
self.last_ping()
|
|
except (FileNotFoundError, AttributeError):
|
|
return False
|
|
return True
|
|
|
|
def last_ping(self) -> Ping:
|
|
"""Returns last Ping if there is one. Exception otherwise."""
|
|
with open(self.log_file, 'r') as f:
|
|
ping = Ping.line_to_ping(f.readlines()[-1])
|
|
return ping
|
|
|
|
def last_ping_time(self) -> int:
|
|
"""Returns last ping time f there is one. Exception otherwise."""
|
|
return self.last_ping().time
|
|
|
|
def last_tags(self) -> List[str]:
|
|
"""Returs the tags of the last ping."""
|
|
try:
|
|
return self.last_ping().tags
|
|
except:
|
|
return []
|
|
|
|
def all_pings(self) -> Iterable[Ping]:
|
|
"""Returns an iterator over all pings in the log."""
|
|
with open(self.log_file, 'r') as f:
|
|
for line in f.readlines():
|
|
try:
|
|
yield Ping.line_to_ping(line)
|
|
except AttributeError:
|
|
logging.error("Invalid line {}".format(line))
|
|
|
|
def grep(self, start: int, end: int) -> List[Ping]:
|
|
"""Returns all pings between start and end as an iterator."""
|
|
for ping in self.all_pings():
|
|
if ping.time >= start and ping.time <= end:
|
|
yield ping
|
|
|
|
def validate(self, verbose: bool = True) -> bool:
|
|
"""
|
|
Iterates over log and returns True if all lines are valid Pings.
|
|
Returns False otherwise. Also prints some statistics.
|
|
"""
|
|
unique_tags = set()
|
|
total_tags, valid_pings, invalid_pings = 0, 0, 0
|
|
previous_time, initial_time, last_time = 0, 0, 0
|
|
with open(self.log_file, 'r') as f:
|
|
for line_number, line in enumerate(f.readlines()):
|
|
try:
|
|
p = Ping.line_to_ping(line)
|
|
unique_tags |= set(p.tags)
|
|
valid_pings += 1
|
|
total_tags += len(p.tags)
|
|
if initial_time == 0:
|
|
initial_time = p.time
|
|
if p.time <= previous_time:
|
|
m = "Line {} not ascending. Please fix order."
|
|
logging.error(m.format(line_number))
|
|
else:
|
|
last_time = p.time
|
|
previous_time = p.time
|
|
except AttributeError:
|
|
m = "Line {}: {} is not a valid ping."
|
|
logging.error(m.format(line_number, line.strip()))
|
|
invalid_pings += 1
|
|
|
|
start_time = timer.time_to_str(initial_time)
|
|
print("You have started tagging on {}.".format(start_time))
|
|
print("Your TagTime log contains {} valid pings.".format(valid_pings))
|
|
if invalid_pings > 0:
|
|
print("It also contains {} invalid pings.".format(invalid_pings))
|
|
m = "You have logged {} unique tags and {} tags in total."
|
|
print(m.format(len(unique_tags), total_tags))
|
|
last_time = timer.time_to_str(last_time)
|
|
print("You have last tagged on {}.".format(last_time))
|
|
|
|
if invalid_pings > 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
def get_tags(self) -> List[Tuple[int, str]]:
|
|
"""
|
|
Returns a list of tuples where the first element is an integer and the
|
|
second element is a tag-string sorted from highest to lowest number of
|
|
occurences. For example, when called on a log with this content:
|
|
|
|
1600805511 quz bar [2020.09.22 16:11:51 TUE]
|
|
1600806947 quz off [2020.09.22 16:35:47 TUE]
|
|
|
|
The function would return:
|
|
|
|
[(2, quz), (1, bar), (1, off)]
|
|
"""
|
|
if not self.exists():
|
|
return []
|
|
|
|
def add_tags(new_tags, tag_bag):
|
|
for t in new_tags:
|
|
try:
|
|
tag_bag[t] += 1
|
|
except KeyError:
|
|
tag_bag[t] = 1
|
|
|
|
tags = {}
|
|
with open(self.log_file, 'r') as f:
|
|
for line in f.readlines():
|
|
try:
|
|
p = Ping.line_to_ping(line)
|
|
except:
|
|
continue
|
|
add_tags(p.tags, tags)
|
|
tags = [(value, key) for key, value in tags.items()]
|
|
return sorted(tags, reverse=True)
|
|
|
|
def log_ping(self, ping: Ping, annotate_time: bool = False, line_len: int = 79):
|
|
"""
|
|
Creates a line from time, tags, and comments, and writes it into the
|
|
log.
|
|
|
|
If linelen is provided log_ping replicates the behavior of the original
|
|
Perl implementation and creates human-readable date/time annotation
|
|
comment right aligned to linelen.
|
|
"""
|
|
line = Ping.ping_to_line(ping, annotate_time, line_len)
|
|
with open(self.log_file, 'a') as f:
|
|
logging.debug("Logged {}".format(line))
|
|
f.write(line + "\n")
|
|
|
|
def log_line(self, line: str):
|
|
"""Append line to log file. No questions asked."""
|
|
with open(self.log_file, 'a') as f:
|
|
if line.endswith("\n"):
|
|
f.write(line)
|
|
line = line.strip()
|
|
else:
|
|
f.write(line + "\n")
|
|
logging.debug("Logged {}".format(line))
|