TagTimeBot/main.py

381 lines
14 KiB
Python

import logging
import timer
import sys
from collections import OrderedDict
from typing import Optional
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, PicklePersistence
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger('apscheduler').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class UserData:
def __init__(self, user_data):
if not "pings" in user_data:
user_data["pings"] = OrderedDict()
if not "tags" in user_data:
user_data["tags"] = set()
if not "started" in user_data:
user_data["started"] = False
if not "pending_pings" in user_data:
user_data["pending_pings"] = {}
if not "reply_all" in user_data:
user_data["reply_all"] = None
self.user_data = user_data
@property
def started(self):
return self.user_data["started"]
@started.setter
def started(self, value):
self.user_data["started"] = value
@property
def tags(self):
return self.user_data["tags"]
@property
def pings(self):
return self.user_data["pings"]
@property
def reply_all(self) -> Optional[tuple[int, list[str]]]:
return self.user_data["reply_all"]
@reply_all.setter
def reply_all(self, value: Optional[tuple[int, list[str]]]):
self.user_data["reply_all"] = value
def add_tag(self, tag):
self.user_data["tags"].add(tag)
def delete_tag(self, tag):
"""Remove an item from the inventory."""
if tag in self.user_data["tags"]:
self.user_data["tags"].remove(tag)
def add_ping(self, chat_id, message_id, ping_id):
self.user_data["pending_pings"][(chat_id, message_id)] = (ping_id, [])
def get_ping_list(self, chat_id, message_id) -> Optional[list]:
try:
_, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
return ping_list
except KeyError:
return None
def submit_ping(self, chat_id, message_id):
ping_id, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
del self.user_data["pending_pings"][(chat_id, message_id)]
self.user_data["pings"][ping_id] = ping_list
def submit_pending_pings(self, tags: list[str]) -> list[tuple[int, int]]:
res = []
for (chat_id, message_id), (ping_id, _) in self.user_data["pending_pings"].items():
self.user_data["pings"][ping_id] = tags
res.append((chat_id, message_id))
for (chat_id, message_id) in res:
del self.user_data["pending_pings"][(chat_id, message_id)]
return res
def get_pending_pings(self):
return self.user_data["pending_pings"].values()
def get_ping_reply_markup(self):
keyboard = []
keyboard.append([InlineKeyboardButton("🎯 on-target", callback_data="on-target"),
InlineKeyboardButton("❌ off-target", callback_data="off-target"),
InlineKeyboardButton("⭕ no-target", callback_data="no-target"),])
tags_per_row = 5
tags = list(sorted(self.tags))
for i in range(0, len(tags), tags_per_row):
keyboard.append([])
for tag in tags[i:i + tags_per_row]:
keyboard[-1].append(InlineKeyboardButton(tag, callback_data=tag))
keyboard.append([InlineKeyboardButton("🗑️ clear", callback_data="clear"),
InlineKeyboardButton("🔄 reload", callback_data="reload"),
InlineKeyboardButton("📤 submit", callback_data="submit"),])
reply_markup = InlineKeyboardMarkup(keyboard)
return reply_markup
def ping_is_due(self) -> Optional[int]:
t = timer.Timer()
prev_ping = t.prev(t.time_now())
pending_pings = [ping for ping, _ in self.get_pending_pings()]
if prev_ping in self.pings or prev_ping in pending_pings:
return None
return prev_ping
async def callback_minute(context: ContextTypes.DEFAULT_TYPE):
assert context.job is not None
for chat_id, user_id in context.bot_data['chats'].items():
user_data = UserData(context.application.user_data[user_id])
if (ping := user_data.ping_is_due()) is not None:
reply_markup = user_data.get_ping_reply_markup()
text = f"Ping! {timer.time_to_str(ping)}"
message = await context.bot.send_message(chat_id=chat_id, text=text, reply_markup=reply_markup)
user_data.add_ping(chat_id, message.id, ping)
async def ping_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
assert context.user_data is not None
assert update.message is not None
user_data = UserData(context.user_data)
reply_markup = user_data.get_ping_reply_markup()
message = await update.message.reply_text("Ping! Select Tags.", reply_markup=reply_markup)
user_data.add_ping(update.message.chat_id, message.id, timer.now())
async def list_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
pings = sorted(UserData(context.user_data).pings.items())
text = "There aren't any pings, yet. Use /ping to get pinged now."
n_pings = 20
if pings:
text = f"Recent {n_pings} pings:\n"
for ping, tags in pings[-n_pings:]:
ping_time = timer.time_to_str(ping)
text += f" {ping} {ping_time} {', '.join(sorted(tags))}\n"
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def list_pending_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
pings = user_data.get_pending_pings()
text = "There are no pending pings."
if pings:
text = "Pending pings:\n"
for ping, tags in pings:
ping_time = timer.time_to_str(ping)
text += f" {ping_time}: {', '.join(sorted(tags))}\n"
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def reply_all_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
user_data.reply_all = None
pings = user_data.get_pending_pings()
if user_data.reply_all is not None:
text = "It seems like there is a pending reply all command."
reply_markup = None
elif len(pings) == 0:
text = "No pending pings to reply to."
reply_markup = None
else:
text = f"Reply to {len(pings)} pending pings."
reply_markup=user_data.get_ping_reply_markup()
message = await context.bot.send_message(chat_id=update.message.chat.id, text=text, reply_markup=reply_markup)
if reply_markup is not None and len(pings) > 0:
user_data.reply_all = (message.id, [])
async def callback_button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Parses the CallbackQuery and updates the message text."""
assert update.callback_query is not None
assert context.chat_data is not None
assert update.callback_query.message is not None
query = update.callback_query
assert query.message is not None
assert isinstance(query.data, str)
# We have to always to do that because the PTB docs say so.
await query.answer()
user_data = UserData(context.user_data)
tags, text, reply_all_id = None, None, None
reply_markup = user_data.get_ping_reply_markup()
if user_data.reply_all is not None and query.message.id == user_data.reply_all[0]:
reply_all_id, tags = user_data.reply_all
else:
tags = user_data.get_ping_list(query.message.chat_id, query.message.id)
if tags is None:
text = "Invalid ping. Removed."
reply_markup = None
else:
match query.data:
case "submit" if len(tags) > 0:
text = f"☑️ Submitted {', '.join(tags)}."
reply_markup = None
if reply_all_id is None:
user_data.submit_ping(query.message.chat_id, query.message.id)
else:
for (chat_id, message_id) in user_data.submit_pending_pings(tags):
await context.bot.edit_message_text(chat_id=chat_id, message_id=message_id, text=text)
case "submit":
text = "Select at least one tag before submitting."
case "clear":
tags.clear()
case "reload":
pass
case tag if tag not in tags:
tags.append(tag)
case tag if tag in tags:
tags.remove(tag)
if text is None:
selected = ", ".join(tags)
if selected:
text = f"Selected {selected}."
else:
text = "Select tags."
if text is None:
text = "Something went wrong. Sorry!"
if query.message.text != text or query.message.reply_markup != reply_markup:
await query.edit_message_text(text=text, reply_markup=reply_markup)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
assert update.message is not None
assert update.message.from_user is not None
assert context.user_data is not None
assert context.job_queue is not None
logging.info(f"Start from {update.message.from_user}.")
if not "chats" in context.bot_data:
context.bot_data["chats"] = {}
chat_id = update.message.chat_id
# Note it seems like chat_id == user_id but that's okay.
if not chat_id in context.bot_data["chats"]:
context.bot_data["chats"][chat_id] = update.message.from_user.id
user_data = UserData(context.user_data)
if not user_data.started:
user_data.started = True
text = ("TagTime has been started. You will receive pings per the official schedule.\n"
"\n"
"Run /help to see all commands.")
else:
text = "TagTimeBot is active. Run /help to see more commands."
await update.message.reply_text(text)
async def help_command(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
"""Displays info on how to use the bot."""
assert update.message is not None
text = ("Commands:\n"
" /start to start receiving pings\n"
"\n"
" /listtags to see see currently defined tags\n"
" /addtags to add tags (provide tags separated by space)\n"
" /deletetags to delete tags (will not affect past pings)\n"
"\n"
" /ping to receive a demo ping\n"
" /listpings to list recently answered pings\n"
" /pending to list pending pings\n"
" /replyall to tag all pending pings\n"
"\n"
"🫂")
await update.message.reply_text(text)
async def add_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
if context.args is None or len(context.args) == 0:
text = "No tags were added because none were provided."
else:
tags = context.args
if len(tags) == 1:
text = f"Tag '{tags[0]}' added."
else:
t = "', '".join(tags)
text = f"Tags '{t}' added."
for tag in tags:
user_data.add_tag(tag)
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def list_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
tags_set = UserData(context.user_data).tags
tags = ', '.join(sorted(tags_set))
if tags != "":
text = f"Tags: {tags}"
else:
text = "No tags defined. Add with /add_tags."
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def delete_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
if context.args is None or len(context.args) == 0:
text = "No tags were deleted because none were provided."
else:
tags = context.args
tags_deleted, tags_ignored = [], []
for tag in tags:
if tag in user_data.tags:
user_data.delete_tag(tag)
tags_deleted.append(tag)
else:
tags_ignored.append(tag)
if tags_deleted:
tags_deleted = "', '".join(tags_deleted)
text = f"Deleted '{tags_deleted}'."
else:
text = "No tags were deleted."
if tags_ignored:
tags_ignored = "', '".join(tags_ignored)
text += f" Ignored non-existent '{tags_ignored}'."
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
def main() -> None:
"""Run the bot."""
token = open(sys.argv[1], "r").read().strip()
persistence = PicklePersistence(filepath='persistence.pkl')
application = Application.builder().token(token).persistence(persistence).build()
assert application.job_queue is not None
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("ping", ping_command))
application.add_handler(CommandHandler("listpings", list_pings_command))
application.add_handler(CommandHandler("pending", list_pending_pings_command))
application.add_handler(CommandHandler("replyall", reply_all_command))
application.add_handler(CallbackQueryHandler(callback_button))
application.add_handler(CommandHandler("addtags", add_tags_command))
application.add_handler(CommandHandler("listtags", list_tags_command))
application.add_handler(CommandHandler("deletetags", delete_tags_command))
application.job_queue.run_repeating(callback_minute, interval=60, first=5)
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()