Initial commit.
commit
e6473513f0
|
@ -0,0 +1,4 @@
|
|||
token.txt
|
||||
persistence.pkl
|
||||
tagtimebot.log
|
||||
__pycache__
|
|
@ -0,0 +1,25 @@
|
|||
GLWT(Good Luck With That) Public License
|
||||
Copyright (c) Everyone, except Author
|
||||
|
||||
Everyone is permitted to copy, distribute, modify, merge, sell, publish,
|
||||
sublicense or whatever they want with this software but at their OWN RISK.
|
||||
|
||||
Preamble
|
||||
|
||||
The author has absolutely no clue what the code in this project does.
|
||||
It might just work or not, there is no third option.
|
||||
|
||||
|
||||
GOOD LUCK WITH THAT PUBLIC LICENSE
|
||||
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION, AND MODIFICATION
|
||||
|
||||
0. You just DO WHATEVER YOU WANT TO as long as you NEVER LEAVE A
|
||||
TRACE TO TRACK THE AUTHOR of the original product to blame for or hold
|
||||
responsible.
|
||||
|
||||
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
DEALINGS IN THE SOFTWARE.
|
||||
|
||||
Good luck and Godspeed.
|
|
@ -0,0 +1,12 @@
|
|||
[[source]]
|
||||
url = "https://pypi.org/simple"
|
||||
verify_ssl = true
|
||||
name = "pypi"
|
||||
|
||||
[packages]
|
||||
python-telegram-bot = "*"
|
||||
|
||||
[dev-packages]
|
||||
|
||||
[requires]
|
||||
python_version = "3.11"
|
|
@ -0,0 +1,86 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "15b16e5040c50964d35e9f2bbd1c34ceaf470a211f825474912702dd4d76ecf3"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
"python_version": "3.11"
|
||||
},
|
||||
"sources": [
|
||||
{
|
||||
"name": "pypi",
|
||||
"url": "https://pypi.org/simple",
|
||||
"verify_ssl": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"default": {
|
||||
"anyio": {
|
||||
"hashes": [
|
||||
"sha256:745843b39e829e108e518c489b31dc757de7d2131d53fac32bd8df268227bfee",
|
||||
"sha256:e1875bb4b4e2de1669f4bc7869b6d3f54231cdced71605e6e64c9be77e3be50f"
|
||||
],
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==4.2.0"
|
||||
},
|
||||
"certifi": {
|
||||
"hashes": [
|
||||
"sha256:9b469f3a900bf28dc19b8cfbf8019bf47f7fdd1a65a1d4ffb98fc14166beb4d1",
|
||||
"sha256:e036ab49d5b79556f99cfc2d9320b34cfbe5be05c5871b51de9329f0603b0474"
|
||||
],
|
||||
"markers": "python_version >= '3.6'",
|
||||
"version": "==2023.11.17"
|
||||
},
|
||||
"h11": {
|
||||
"hashes": [
|
||||
"sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d",
|
||||
"sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"
|
||||
],
|
||||
"markers": "python_version >= '3.7'",
|
||||
"version": "==0.14.0"
|
||||
},
|
||||
"httpcore": {
|
||||
"hashes": [
|
||||
"sha256:096cc05bca73b8e459a1fc3dcf585148f63e534eae4339559c9b8a8d6399acc7",
|
||||
"sha256:9fc092e4799b26174648e54b74ed5f683132a464e95643b226e00c2ed2fa6535"
|
||||
],
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==1.0.2"
|
||||
},
|
||||
"httpx": {
|
||||
"hashes": [
|
||||
"sha256:8b8fcaa0c8ea7b05edd69a094e63a2094c4efcb48129fb757361bc423c0ad9e8",
|
||||
"sha256:a05d3d052d9b2dfce0e3896636467f8a5342fb2b902c819428e1ac65413ca118"
|
||||
],
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==0.25.2"
|
||||
},
|
||||
"idna": {
|
||||
"hashes": [
|
||||
"sha256:9ecdbbd083b06798ae1e86adcbfe8ab1479cf864e4ee30fe4e46a003d12491ca",
|
||||
"sha256:c05567e9c24a6b9faaa835c4821bad0590fbb9d5779e7caa6e1cc4978e7eb24f"
|
||||
],
|
||||
"markers": "python_version >= '3.5'",
|
||||
"version": "==3.6"
|
||||
},
|
||||
"python-telegram-bot": {
|
||||
"hashes": [
|
||||
"sha256:462326c65671c8c39e76c8c96756ee918be6797d225f8db84d2ec0f883383b8c",
|
||||
"sha256:4f146c39de5f5e0b3723c2abedaf78046ebd30a6a49d2281ee4b3af5eb116b68"
|
||||
],
|
||||
"index": "pypi",
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==20.7"
|
||||
},
|
||||
"sniffio": {
|
||||
"hashes": [
|
||||
"sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101",
|
||||
"sha256:eecefdce1e5bbfb7ad2eeaabf7c1eeb404d7757c379bd1f7e5cce9d8bf425384"
|
||||
],
|
||||
"markers": "python_version >= '3.7'",
|
||||
"version": "==1.3.0"
|
||||
}
|
||||
},
|
||||
"develop": {}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
A Python Telegram [TagTime](https://doc.beeminder.com/tagtime) bot.
|
||||
|
||||
Place Telegram bot API key into `token.txt`.
|
||||
|
||||
```
|
||||
pipenv update
|
||||
pipenv shell
|
||||
python main.py
|
||||
```
|
||||
|
||||
Currently deployed as `@tagtime_bot`.
|
|
@ -0,0 +1,320 @@
|
|||
import logging
|
||||
import timer
|
||||
from collections import OrderedDict
|
||||
from typing import Optional
|
||||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
|
||||
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, PicklePersistence
|
||||
|
||||
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
|
||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||
logging.getLogger('apscheduler').setLevel(logging.WARNING)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UserData:
|
||||
def __init__(self, user_data):
|
||||
if not "pings" in user_data:
|
||||
user_data["pings"] = OrderedDict()
|
||||
|
||||
if not "tags" in user_data:
|
||||
user_data["tags"] = set()
|
||||
|
||||
if not "started" in user_data:
|
||||
user_data["started"] = False
|
||||
|
||||
if not "pending_pings" in user_data:
|
||||
user_data["pending_pings"] = {}
|
||||
|
||||
self.user_data = user_data
|
||||
|
||||
@property
|
||||
def started(self):
|
||||
return self.user_data["started"]
|
||||
|
||||
@started.setter
|
||||
def started(self, value):
|
||||
self.user_data["started"] = value
|
||||
|
||||
@property
|
||||
def tags(self):
|
||||
return self.user_data["tags"]
|
||||
|
||||
@property
|
||||
def pings(self):
|
||||
return self.user_data["pings"]
|
||||
|
||||
def add_tag(self, tag):
|
||||
self.user_data["tags"].add(tag)
|
||||
|
||||
def delete_tag(self, tag):
|
||||
"""Remove an item from the inventory."""
|
||||
if tag in self.user_data["tags"]:
|
||||
self.user_data["tags"].remove(tag)
|
||||
|
||||
def add_ping(self, chat_id, message_id, ping_id):
|
||||
self.user_data["pending_pings"][(chat_id, message_id)] = (ping_id, [])
|
||||
|
||||
def get_ping_list(self, chat_id, message_id) -> Optional[list]:
|
||||
try:
|
||||
_, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
|
||||
return ping_list
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def submit_ping(self, chat_id, message_id):
|
||||
ping_id, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
|
||||
del self.user_data["pending_pings"][(chat_id, message_id)]
|
||||
self.user_data["pings"][ping_id] = ping_list
|
||||
|
||||
def get_pending_pings(self):
|
||||
return self.user_data["pending_pings"].values()
|
||||
|
||||
def get_ping_reply_markup(self):
|
||||
keyboard = []
|
||||
tags_per_row = 5
|
||||
tags = list(sorted(self.tags))
|
||||
for i in range(0, len(tags), tags_per_row):
|
||||
keyboard.append([])
|
||||
for tag in tags[i:i + tags_per_row]:
|
||||
keyboard[-1].append(InlineKeyboardButton(tag, callback_data=tag))
|
||||
keyboard.append([InlineKeyboardButton("🗑️ clear", callback_data="clear"),
|
||||
InlineKeyboardButton("🔄 reload", callback_data="reload"),
|
||||
InlineKeyboardButton("📤 submit", callback_data="submit"),])
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
return reply_markup
|
||||
|
||||
def ping_is_due(self) -> Optional[int]:
|
||||
t = timer.Timer()
|
||||
prev_ping = t.prev(t.time_now())
|
||||
pending_pings = [ping for ping, _ in self.get_pending_pings()]
|
||||
if prev_ping in self.pings or prev_ping in pending_pings:
|
||||
return None
|
||||
return prev_ping
|
||||
|
||||
|
||||
async def callback_minute(context: ContextTypes.DEFAULT_TYPE):
|
||||
assert context.job is not None
|
||||
for chat_id, user_id in context.bot_data['chats'].items():
|
||||
user_data = UserData(context.application.user_data[user_id])
|
||||
if (ping := user_data.ping_is_due()) is not None:
|
||||
reply_markup = user_data.get_ping_reply_markup()
|
||||
text = f"Ping! {timer.time_to_str(ping)}"
|
||||
message = await context.bot.send_message(chat_id=chat_id, text=text, reply_markup=reply_markup)
|
||||
user_data.add_ping(chat_id, message.id, ping)
|
||||
|
||||
|
||||
async def ping_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
assert context.user_data is not None
|
||||
assert update.message is not None
|
||||
|
||||
user_data = UserData(context.user_data)
|
||||
reply_markup = user_data.get_ping_reply_markup()
|
||||
message = await update.message.reply_text("Ping! Select Tags.", reply_markup=reply_markup)
|
||||
user_data.add_ping(update.message.chat_id, message.id, timer.now())
|
||||
|
||||
|
||||
async def list_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
assert update.message is not None
|
||||
assert context.user_data is not None
|
||||
|
||||
pings = sorted(UserData(context.user_data).pings.items())
|
||||
text = "There aren't any pings, yet. Use /ping to get pinged now."
|
||||
n_pings = 20
|
||||
if pings:
|
||||
text = f"Recent {n_pings} pings:\n"
|
||||
for ping, tags in pings[-n_pings:]:
|
||||
ping_time = timer.time_to_str(ping)
|
||||
text += f" {ping} {ping_time} {', '.join(sorted(tags))}\n"
|
||||
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
||||
|
||||
|
||||
async def list_pending_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
assert update.message is not None
|
||||
assert context.user_data is not None
|
||||
|
||||
user_data = UserData(context.user_data)
|
||||
pings = user_data.get_pending_pings()
|
||||
text = "There are no pending pings."
|
||||
if pings:
|
||||
text = "Pending pings:\n"
|
||||
for ping, tags in pings:
|
||||
ping_time = timer.time_to_str(ping)
|
||||
text += f" {ping_time}: {', '.join(sorted(tags))}\n"
|
||||
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
||||
|
||||
|
||||
async def callback_button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Parses the CallbackQuery and updates the message text."""
|
||||
assert update.callback_query is not None
|
||||
assert context.chat_data is not None
|
||||
query = update.callback_query
|
||||
assert query.message is not None
|
||||
await query.answer()
|
||||
|
||||
user_data = UserData(context.user_data)
|
||||
tags = user_data.get_ping_list(query.message.chat_id, query.message.id)
|
||||
tag = update.callback_query.data
|
||||
text = None
|
||||
reply_markup = user_data.get_ping_reply_markup()
|
||||
if tags is None:
|
||||
text = "Invalid ping. Removed."
|
||||
reply_markup = None
|
||||
elif tag == "submit" and len(tags) > 0:
|
||||
user_data.submit_ping(query.message.chat_id, query.message.id)
|
||||
tags_text = "', '".join(tags)
|
||||
text = f"☑️ Submitted '{tags_text}'."
|
||||
reply_markup = None
|
||||
elif tag == "submit":
|
||||
text = "Select at least one tag before submitting."
|
||||
elif tag == "clear":
|
||||
tags.clear()
|
||||
elif tag == "reload":
|
||||
pass
|
||||
else:
|
||||
if tag not in tags:
|
||||
tags.append(tag)
|
||||
else:
|
||||
tags.remove(tag)
|
||||
|
||||
if text is None:
|
||||
if tags is not None:
|
||||
selected = ", ".join(tags)
|
||||
if selected:
|
||||
text = f"Selected: {selected}."
|
||||
else:
|
||||
text = "Select tags."
|
||||
else:
|
||||
text = "No text or tags."
|
||||
|
||||
if query.message.text != text or query.message.reply_markup != reply_markup:
|
||||
await query.edit_message_text(text=text, reply_markup=reply_markup)
|
||||
|
||||
|
||||
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
assert update.message is not None
|
||||
assert update.message.from_user is not None
|
||||
assert context.user_data is not None
|
||||
assert context.job_queue is not None
|
||||
logging.info(f"Start from {update.message.from_user}.")
|
||||
|
||||
if not "chats" in context.bot_data:
|
||||
context.bot_data["chats"] = {}
|
||||
chat_id = update.message.chat_id
|
||||
|
||||
# Note it seems like chat_id == user_id but that's okay.
|
||||
if not chat_id in context.bot_data["chats"]:
|
||||
context.bot_data["chats"][chat_id] = update.message.from_user.id
|
||||
|
||||
user_data = UserData(context.user_data)
|
||||
if not user_data.started:
|
||||
user_data.started = True
|
||||
text = ("TagTime has been started. You will receive pings per the official schedule.\n"
|
||||
"\n"
|
||||
"Run /help to see all commands.")
|
||||
else:
|
||||
text = "TagTimeBot is active. Run /help to see more commands."
|
||||
|
||||
await update.message.reply_text(text)
|
||||
|
||||
|
||||
async def help_command(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Displays info on how to use the bot."""
|
||||
assert update.message is not None
|
||||
text = ("Commands:\n"
|
||||
" /start to start receiving pings\n"
|
||||
"\n"
|
||||
" /list_tags to see see currently defined tags\n"
|
||||
" /add_tags to add tags (provide tags separated by space)\n"
|
||||
" /delete_tags to delete tags (will not affect past pings)\n"
|
||||
"\n"
|
||||
" /ping to receive a demo ping\n"
|
||||
" /list_pings to list recently answered pings\n"
|
||||
" /pending_pings to list pending pings\n"
|
||||
"")
|
||||
await update.message.reply_text(text)
|
||||
|
||||
|
||||
async def add_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
assert update.message is not None
|
||||
assert context.user_data is not None
|
||||
|
||||
user_data = UserData(context.user_data)
|
||||
if context.args is None or len(context.args) == 0:
|
||||
text = "No tags were added because none were provided."
|
||||
else:
|
||||
tags = context.args
|
||||
if len(tags) == 1:
|
||||
text = f"Tag '{tags[0]}' added."
|
||||
else:
|
||||
t = "', '".join(tags)
|
||||
text = f"Tags '{t}' added."
|
||||
for tag in tags:
|
||||
user_data.add_tag(tag)
|
||||
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
||||
|
||||
|
||||
async def list_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
assert update.message is not None
|
||||
assert context.user_data is not None
|
||||
|
||||
tags_set = UserData(context.user_data).tags
|
||||
tags = ', '.join(sorted(tags_set))
|
||||
if tags != "":
|
||||
text = f"Tags: {tags}"
|
||||
else:
|
||||
text = "No tags defined. Add with /add_tags."
|
||||
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
||||
|
||||
|
||||
async def delete_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
assert update.message is not None
|
||||
assert context.user_data is not None
|
||||
|
||||
user_data = UserData(context.user_data)
|
||||
if context.args is None or len(context.args) == 0:
|
||||
text = "No tags were deleted because none were provided."
|
||||
else:
|
||||
tags = context.args
|
||||
tags_deleted, tags_ignored = [], []
|
||||
for tag in tags:
|
||||
if tag in user_data.tags:
|
||||
user_data.delete_tag(tag)
|
||||
tags_deleted.append(tag)
|
||||
else:
|
||||
tags_ignored.append(tag)
|
||||
if tags_deleted:
|
||||
tags_deleted = "', '".join(tags_deleted)
|
||||
text = f"Deleted '{tags_deleted}'."
|
||||
else:
|
||||
text = "No tags were deleted."
|
||||
|
||||
if tags_ignored:
|
||||
tags_ignored = "', '".join(tags_ignored)
|
||||
text += f" Ignored non-existent '{tags_ignored}'."
|
||||
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run the bot."""
|
||||
token = open("token.txt", "r").read().strip()
|
||||
persistence = PicklePersistence(filepath='persistence.pkl')
|
||||
application = Application.builder().token(token).persistence(persistence).build()
|
||||
assert application.job_queue is not None
|
||||
|
||||
application.add_handler(CommandHandler("start", start_command))
|
||||
application.add_handler(CommandHandler("help", help_command))
|
||||
application.add_handler(CommandHandler("ping", ping_command))
|
||||
application.add_handler(CommandHandler("list_pings", list_pings_command))
|
||||
application.add_handler(CommandHandler("pending_pings", list_pending_pings_command))
|
||||
application.add_handler(CallbackQueryHandler(callback_button))
|
||||
|
||||
application.add_handler(CommandHandler("add_tags", add_tags_command))
|
||||
application.add_handler(CommandHandler("list_tags", list_tags_command))
|
||||
application.add_handler(CommandHandler("delete_tags", delete_tags_command))
|
||||
|
||||
application.job_queue.run_repeating(callback_minute, interval=60, first=5)
|
||||
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,37 @@
|
|||
import hashlib
|
||||
import time
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
def get_file_hash(filename):
|
||||
hasher = hashlib.sha256()
|
||||
with open(filename, 'rb') as f:
|
||||
hasher.update(f.read())
|
||||
return hasher.hexdigest()
|
||||
|
||||
|
||||
def main(script_name, interval=1):
|
||||
last_hash = None
|
||||
process = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
current_hash = get_file_hash(script_name)
|
||||
if current_hash != last_hash:
|
||||
last_hash = current_hash
|
||||
if process and process.poll() is None:
|
||||
process.terminate()
|
||||
print(f"Detected change in {script_name}, running script...")
|
||||
process = subprocess.Popen(["python", script_name], shell=False)
|
||||
time.sleep(interval)
|
||||
except KeyboardInterrupt:
|
||||
if process:
|
||||
process.terminate()
|
||||
break
|
||||
except FileNotFoundError:
|
||||
print("The file was not found. Make sure the script name is correct.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv[1])
|
|
@ -0,0 +1,82 @@
|
|||
import math
|
||||
import time
|
||||
import datetime
|
||||
import zoneinfo
|
||||
|
||||
|
||||
class Timer:
|
||||
def __init__(self, seed: int = 11193462, ping: int = 1184097393, gap: int = 45*60):
|
||||
self.seed = seed
|
||||
self.ping = ping
|
||||
self.gap = gap
|
||||
self.ini_seed = seed # remember initial seed/ping for prev method
|
||||
self.ini_ping = ping
|
||||
self.ia = 16807 # const for RNG (p37 of Simulation by Ross)
|
||||
self.im = 2**31 - 1 # const for RNG
|
||||
|
||||
def ran0(self) -> int:
|
||||
""" Returns a random integer in [1, self.im - 1]; changes self.seed,
|
||||
i.e., RNG state. (This is ran0 from Numerical Recipes and has a period
|
||||
of ~2 billion.) """
|
||||
self.seed = (self.ia * self.seed) % self.im
|
||||
return self.seed
|
||||
|
||||
def ran01(self) -> float:
|
||||
"""Returns a U(0,1) random number. Changes seed."""
|
||||
return self.ran0() / self.im
|
||||
|
||||
def exprand(self) -> float:
|
||||
""" Returns a random number drawn from an exponential distribution with
|
||||
mean self.gap (defined in settings file). Changes seed. """
|
||||
return -1 * self.gap * math.log(self.ran01())
|
||||
|
||||
def next(self) -> int:
|
||||
"""Returns random next ping time in unix time. Changes seed."""
|
||||
prev_ping = self.ping
|
||||
self.ping = max(prev_ping + 1, round(prev_ping + self.exprand()))
|
||||
return self.ping
|
||||
|
||||
def prev(self, time: int) -> int:
|
||||
"""
|
||||
Computes the last scheduled ping time before time. This updates the
|
||||
internal state of Ping to the last seed/ping-pair before time. In other
|
||||
words, the next call to next_ping will return a ping >= time. Another
|
||||
name for this function would be `forward`.
|
||||
"""
|
||||
assert(self.ini_ping < time)
|
||||
self.seed = self.ini_seed
|
||||
self.ping = self.ini_ping
|
||||
|
||||
last_ping, last_seed = None, None
|
||||
# Compute new pings till self.ping >= time.
|
||||
while self.ping < time:
|
||||
last_ping = self.ping
|
||||
last_seed = self.seed
|
||||
self.next() # updates self.ping and self.seed
|
||||
|
||||
if last_ping is None or last_seed is None:
|
||||
raise RuntimeError("Could not forward ping/seed to given time.")
|
||||
|
||||
# Restore ping/seed state to the values before time.
|
||||
self.ping = last_ping
|
||||
self.seed = last_seed
|
||||
|
||||
# Return ping before time.
|
||||
return last_ping
|
||||
|
||||
def time_now(self) -> int:
|
||||
return now()
|
||||
|
||||
|
||||
def now() -> int:
|
||||
"""Returns the current unix time in seconds as an integer."""
|
||||
return int(time.time())
|
||||
|
||||
|
||||
def time_to_str(unix_timestamp: int, timezone="America/Detroit") -> str:
|
||||
"""Transform UNIX time to human-readable string."""
|
||||
utc_datetime = datetime.datetime.fromtimestamp(unix_timestamp, tz=datetime.timezone.utc)
|
||||
target_timezone = zoneinfo.ZoneInfo(timezone)
|
||||
local_datetime = utc_datetime.astimezone(target_timezone)
|
||||
strf = "[%d %H:%M:%S %a]"
|
||||
return local_datetime.strftime(strf)
|
Loading…
Reference in New Issue