import logging import timer import sys from collections import OrderedDict from typing import Optional from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, PicklePersistence logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger('apscheduler').setLevel(logging.WARNING) logger = logging.getLogger(__name__) class UserData: def __init__(self, user_data): if not "pings" in user_data: user_data["pings"] = OrderedDict() if not "tags" in user_data: user_data["tags"] = set() if not "started" in user_data: user_data["started"] = False if not "pending_pings" in user_data: user_data["pending_pings"] = {} if not "reply_all" in user_data: user_data["reply_all"] = None self.user_data = user_data @property def started(self): return self.user_data["started"] @started.setter def started(self, value): self.user_data["started"] = value @property def tags(self): return self.user_data["tags"] @property def pings(self): return self.user_data["pings"] @property def reply_all(self) -> Optional[tuple[int, list[str]]]: return self.user_data["reply_all"] @reply_all.setter def reply_all(self, value: Optional[tuple[int, list[str]]]): self.user_data["reply_all"] = value def add_tag(self, tag): self.user_data["tags"].add(tag) def delete_tag(self, tag): """Remove an item from the inventory.""" if tag in self.user_data["tags"]: self.user_data["tags"].remove(tag) def add_ping(self, chat_id, message_id, ping_id): self.user_data["pending_pings"][(chat_id, message_id)] = (ping_id, []) def get_ping_list(self, chat_id, message_id) -> Optional[list]: try: _, ping_list = self.user_data["pending_pings"][(chat_id, message_id)] return ping_list except KeyError: return None def submit_ping(self, chat_id, message_id): ping_id, ping_list = self.user_data["pending_pings"][(chat_id, message_id)] del self.user_data["pending_pings"][(chat_id, message_id)] self.user_data["pings"][ping_id] = ping_list def submit_pending_pings(self, tags: list[str]) -> list[tuple[int, int]]: res = [] for (chat_id, message_id), (ping_id, _) in self.user_data["pending_pings"].items(): self.user_data["pings"][ping_id] = tags res.append((chat_id, message_id)) for (chat_id, message_id) in res: del self.user_data["pending_pings"][(chat_id, message_id)] return res def get_pending_pings(self): return self.user_data["pending_pings"].values() def get_ping_reply_markup(self): keyboard = [] tags_per_row = 5 tags = list(sorted(self.tags)) for i in range(0, len(tags), tags_per_row): keyboard.append([]) for tag in tags[i:i + tags_per_row]: keyboard[-1].append(InlineKeyboardButton(tag, callback_data=tag)) keyboard.append([InlineKeyboardButton("🗑️ clear", callback_data="clear"), InlineKeyboardButton("🔄 reload", callback_data="reload"), InlineKeyboardButton("📤 submit", callback_data="submit"),]) reply_markup = InlineKeyboardMarkup(keyboard) return reply_markup def ping_is_due(self) -> Optional[int]: t = timer.Timer() prev_ping = t.prev(t.time_now()) pending_pings = [ping for ping, _ in self.get_pending_pings()] if prev_ping in self.pings or prev_ping in pending_pings: return None return prev_ping async def callback_minute(context: ContextTypes.DEFAULT_TYPE): assert context.job is not None for chat_id, user_id in context.bot_data['chats'].items(): user_data = UserData(context.application.user_data[user_id]) if (ping := user_data.ping_is_due()) is not None: reply_markup = user_data.get_ping_reply_markup() text = f"Ping! {timer.time_to_str(ping)}" message = await context.bot.send_message(chat_id=chat_id, text=text, reply_markup=reply_markup) user_data.add_ping(chat_id, message.id, ping) async def ping_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: assert context.user_data is not None assert update.message is not None user_data = UserData(context.user_data) reply_markup = user_data.get_ping_reply_markup() message = await update.message.reply_text("Ping! Select Tags.", reply_markup=reply_markup) user_data.add_ping(update.message.chat_id, message.id, timer.now()) async def list_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE): assert update.message is not None assert context.user_data is not None pings = sorted(UserData(context.user_data).pings.items()) text = "There aren't any pings, yet. Use /ping to get pinged now." n_pings = 20 if pings: text = f"Recent {n_pings} pings:\n" for ping, tags in pings[-n_pings:]: ping_time = timer.time_to_str(ping) text += f" {ping} {ping_time} {', '.join(sorted(tags))}\n" await context.bot.send_message(chat_id=update.message.chat.id, text=text) async def list_pending_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE): assert update.message is not None assert context.user_data is not None user_data = UserData(context.user_data) pings = user_data.get_pending_pings() text = "There are no pending pings." if pings: text = "Pending pings:\n" for ping, tags in pings: ping_time = timer.time_to_str(ping) text += f" {ping_time}: {', '.join(sorted(tags))}\n" await context.bot.send_message(chat_id=update.message.chat.id, text=text) async def reply_all_command(update: Update, context: ContextTypes.DEFAULT_TYPE): assert update.message is not None assert context.user_data is not None user_data = UserData(context.user_data) user_data.reply_all = None pings = user_data.get_pending_pings() if user_data.reply_all is not None: text = "It seems like there is a pending reply all command." reply_markup = None elif len(pings) == 0: text = "No pending pings to reply to." reply_markup = None else: text = f"Reply to {len(pings)} pending pings." reply_markup=user_data.get_ping_reply_markup() message = await context.bot.send_message(chat_id=update.message.chat.id, text=text, reply_markup=reply_markup) if reply_markup is not None and len(pings) > 0: user_data.reply_all = (message.id, []) async def callback_button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Parses the CallbackQuery and updates the message text.""" assert update.callback_query is not None assert context.chat_data is not None assert update.callback_query.message is not None query = update.callback_query assert query.message is not None assert isinstance(query.data, str) # We have to always to do that because the PTB docs say so. await query.answer() user_data = UserData(context.user_data) tags, text, reply_all_id = None, None, None reply_markup = user_data.get_ping_reply_markup() if user_data.reply_all is not None and query.message.id == user_data.reply_all[0]: reply_all_id, tags = user_data.reply_all else: tags = user_data.get_ping_list(query.message.chat_id, query.message.id) if tags is None: text = "Invalid ping. Removed." reply_markup = None else: match query.data: case "submit" if len(tags) > 0: text = f"☑️ Submitted {', '.join(tags)}." reply_markup = None if reply_all_id is None: user_data.submit_ping(query.message.chat_id, query.message.id) else: for (chat_id, message_id) in user_data.submit_pending_pings(tags): await context.bot.edit_message_text(chat_id=chat_id, message_id=message_id, text=text) case "submit": text = "Select at least one tag before submitting." case "clear": tags.clear() case "reload": pass case tag if tag not in tags: tags.append(tag) case tag if tag in tags: tags.remove(tag) if text is None: selected = ", ".join(tags) if selected: text = f"Selected {selected}." else: text = "Select tags." if text is None: text = "Something went wrong. Sorry!" if query.message.text != text or query.message.reply_markup != reply_markup: await query.edit_message_text(text=text, reply_markup=reply_markup) async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: assert update.message is not None assert update.message.from_user is not None assert context.user_data is not None assert context.job_queue is not None logging.info(f"Start from {update.message.from_user}.") if not "chats" in context.bot_data: context.bot_data["chats"] = {} chat_id = update.message.chat_id # Note it seems like chat_id == user_id but that's okay. if not chat_id in context.bot_data["chats"]: context.bot_data["chats"][chat_id] = update.message.from_user.id user_data = UserData(context.user_data) if not user_data.started: user_data.started = True text = ("TagTime has been started. You will receive pings per the official schedule.\n" "\n" "Run /help to see all commands.") else: text = "TagTimeBot is active. Run /help to see more commands." await update.message.reply_text(text) async def help_command(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None: """Displays info on how to use the bot.""" assert update.message is not None text = ("Commands:\n" " /start to start receiving pings\n" "\n" " /listtags to see see currently defined tags\n" " /addtags to add tags (provide tags separated by space)\n" " /deletetags to delete tags (will not affect past pings)\n" "\n" " /ping to receive a demo ping\n" " /listpings to list recently answered pings\n" " /pending to list pending pings\n" " /replyall to tag all pending pings\n" "\n" "🫂") await update.message.reply_text(text) async def add_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE): assert update.message is not None assert context.user_data is not None user_data = UserData(context.user_data) if context.args is None or len(context.args) == 0: text = "No tags were added because none were provided." else: tags = context.args if len(tags) == 1: text = f"Tag '{tags[0]}' added." else: t = "', '".join(tags) text = f"Tags '{t}' added." for tag in tags: user_data.add_tag(tag) await context.bot.send_message(chat_id=update.message.chat.id, text=text) async def list_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE): assert update.message is not None assert context.user_data is not None tags_set = UserData(context.user_data).tags tags = ', '.join(sorted(tags_set)) if tags != "": text = f"Tags: {tags}" else: text = "No tags defined. Add with /add_tags." await context.bot.send_message(chat_id=update.message.chat.id, text=text) async def delete_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE): assert update.message is not None assert context.user_data is not None user_data = UserData(context.user_data) if context.args is None or len(context.args) == 0: text = "No tags were deleted because none were provided." else: tags = context.args tags_deleted, tags_ignored = [], [] for tag in tags: if tag in user_data.tags: user_data.delete_tag(tag) tags_deleted.append(tag) else: tags_ignored.append(tag) if tags_deleted: tags_deleted = "', '".join(tags_deleted) text = f"Deleted '{tags_deleted}'." else: text = "No tags were deleted." if tags_ignored: tags_ignored = "', '".join(tags_ignored) text += f" Ignored non-existent '{tags_ignored}'." await context.bot.send_message(chat_id=update.message.chat.id, text=text) def main() -> None: """Run the bot.""" token = open(sys.argv[1], "r").read().strip() persistence = PicklePersistence(filepath='persistence.pkl') application = Application.builder().token(token).persistence(persistence).build() assert application.job_queue is not None application.add_handler(CommandHandler("start", start_command)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("ping", ping_command)) application.add_handler(CommandHandler("listpings", list_pings_command)) application.add_handler(CommandHandler("pending", list_pending_pings_command)) application.add_handler(CommandHandler("replyall", reply_all_command)) application.add_handler(CallbackQueryHandler(callback_button)) application.add_handler(CommandHandler("addtags", add_tags_command)) application.add_handler(CommandHandler("listtags", list_tags_command)) application.add_handler(CommandHandler("deletetags", delete_tags_command)) application.job_queue.run_repeating(callback_minute, interval=60, first=5) application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": main()