import sys import math import time import datetime import logging class Timer: def __init__(self, seed: int, ping: int, gap: int): self.seed = seed self.ping = ping self.gap = gap self.ini_seed = seed # remember initial seed/ping for prev method self.ini_ping = ping self.ia = 16807 # const for RNG (p37 of Simulation by Ross) self.im = 2**31 - 1 # const for RNG def ran0(self) -> int: """ Returns a random integer in [1, self.im - 1]; changes self.seed, i.e., RNG state. (This is ran0 from Numerical Recipes and has a period of ~2 billion.) """ self.seed = (self.ia * self.seed) % self.im return self.seed def ran01(self) -> float: """Returns a U(0,1) random number. Changes seed.""" return self.ran0() / self.im def exprand(self) -> float: """ Returns a random number drawn from an exponential distribution with mean self.gap (defined in settings file). Changes seed. """ return -1 * self.gap * math.log(self.ran01()) def next(self) -> int: """Returns random next ping time in unix time. Changes seed.""" prev_ping = self.ping self.ping = max(prev_ping + 1, round(prev_ping + self.exprand())) return self.ping def prev(self, time: int) -> int: """ Computes the last scheduled ping time before time. This updates the internal state of Ping to the last seed/ping-pair before time. In other words, the next call to next_ping will return a ping >= time. Another name for this function would be `forward`. """ assert(self.ini_ping < time) self.seed = self.ini_seed self.ping = self.ini_ping # Compute new pings till self.ping >= time. while self.ping < time: last_ping = self.ping last_seed = self.seed self.next() # updates self.ping and self.seed # Restore ping/seed state to the values before time. self.ping = last_ping self.seed = last_seed # Return ping before time. return last_ping def time_now(self) -> int: return now() def now() -> int: """Returns the current unix time in seconds as an integer.""" return int(time.time()) def date_str_to_time(date_str: str) -> int: """Parse string into UNIX time.""" formats = ['%Y.%m.%d', '%Y-%m-%d', '%Y/%m/%d', '%d.%m.%Y'] time = None for f in formats: try: datetime_object = datetime.datetime.strptime(date_str, f) time = int(datetime_object.timestamp()) break except ValueError: pass if time is None: logging.error("Could not parse {} to date".format(date_str)) sys.exit(1) return time def time_to_str(time: int) -> str: """Transform UNIX time to human-readable string.""" strf = "%Y.%m.%d %H:%M:%S %a" return datetime.datetime.fromtimestamp(time).strftime(strf)