import logging
import timer
import sys
from collections import OrderedDict
from typing import Optional
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, PicklePersistence
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
class UserData:
def __init__(self, user_data):
if not "pings" in user_data:
user_data["pings"] = OrderedDict()
if not "tags" in user_data:
user_data["tags"] = set()
if not "started" in user_data:
user_data["started"] = False
if not "pending_pings" in user_data:
user_data["pending_pings"] = {}
if not "reply_all" in user_data:
user_data["reply_all"] = None
self.user_data = user_data
def started(self):
return self.user_data["started"]
def started(self, value):
self.user_data["started"] = value
def tags(self):
return self.user_data["tags"]
def pings(self):
return self.user_data["pings"]
def reply_all(self) -> Optional[tuple[int, list[str]]]:
return self.user_data["reply_all"]
def reply_all(self, value: Optional[tuple[int, list[str]]]):
self.user_data["reply_all"] = value
def add_tag(self, tag):
def delete_tag(self, tag):
"""Remove an item from the inventory."""
if tag in self.user_data["tags"]:
def add_ping(self, chat_id, message_id, ping_id):
self.user_data["pending_pings"][(chat_id, message_id)] = (ping_id, [])
def get_ping_list(self, chat_id, message_id) -> Optional[list]:
_, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
return ping_list
except KeyError:
return None
def submit_ping(self, chat_id, message_id):
ping_id, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
del self.user_data["pending_pings"][(chat_id, message_id)]
self.user_data["pings"][ping_id] = ping_list
def submit_pending_pings(self, tags: list[str]) -> list[tuple[int, int]]:
res = []
for (chat_id, message_id), (ping_id, _) in self.user_data["pending_pings"].items():
self.user_data["pings"][ping_id] = tags
res.append((chat_id, message_id))
for (chat_id, message_id) in res:
del self.user_data["pending_pings"][(chat_id, message_id)]
return res
def get_pending_pings(self):
return self.user_data["pending_pings"].values()
def get_ping_reply_markup(self):
keyboard = []
keyboard.append([InlineKeyboardButton("🎯 on-target", callback_data="on-target"),
InlineKeyboardButton("❌ off-target", callback_data="off-target"),
InlineKeyboardButton("⭕ no-target", callback_data="no-target"),])
tags_per_row = 5
tags = list(sorted(self.tags))
for i in range(0, len(tags), tags_per_row):
for tag in tags[i:i + tags_per_row]:
keyboard[-1].append(InlineKeyboardButton(tag, callback_data=tag))
keyboard.append([InlineKeyboardButton("🗑️ clear", callback_data="clear"),
InlineKeyboardButton("🔄 reload", callback_data="reload"),
InlineKeyboardButton("📤 submit", callback_data="submit"),])
reply_markup = InlineKeyboardMarkup(keyboard)
return reply_markup
def ping_is_due(self) -> Optional[int]:
t = timer.Timer()
prev_ping = t.prev(t.time_now())
pending_pings = [ping for ping, _ in self.get_pending_pings()]
if prev_ping in self.pings or prev_ping in pending_pings:
return None
return prev_ping
async def callback_minute(context: ContextTypes.DEFAULT_TYPE):
assert context.job is not None
for chat_id, user_id in context.bot_data['chats'].items():
user_data = UserData(context.application.user_data[user_id])
if (ping := user_data.ping_is_due()) is not None:
reply_markup = user_data.get_ping_reply_markup()
text = f"Ping! {timer.time_to_str(ping)}"
message = await context.bot.send_message(chat_id=chat_id, text=text, reply_markup=reply_markup)
user_data.add_ping(chat_id, message.id, ping)
async def ping_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
assert context.user_data is not None
assert update.message is not None
user_data = UserData(context.user_data)
reply_markup = user_data.get_ping_reply_markup()
message = await update.message.reply_text("Ping! Select Tags.", reply_markup=reply_markup)
user_data.add_ping(update.message.chat_id, message.id, timer.now())
async def list_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
pings = sorted(UserData(context.user_data).pings.items())
text = "There aren't any pings, yet. Use /ping to get pinged now."
n_pings = 20
if pings:
text = f"Recent {n_pings} pings:\n"
for ping, tags in pings[-n_pings:]:
ping_time = timer.time_to_str(ping)
text += f" {ping} {ping_time} {', '.join(sorted(tags))}\n"
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def list_pending_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
pings = user_data.get_pending_pings()
text = "There are no pending pings."
if pings:
text = "Pending pings:\n"
for ping, tags in pings:
ping_time = timer.time_to_str(ping)
text += f" {ping_time}: {', '.join(sorted(tags))}\n"
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def reply_all_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
user_data.reply_all = None
pings = user_data.get_pending_pings()
if user_data.reply_all is not None:
text = "It seems like there is a pending reply all command."
reply_markup = None
elif len(pings) == 0:
text = "No pending pings to reply to."
reply_markup = None
text = f"Reply to {len(pings)} pending pings."
message = await context.bot.send_message(chat_id=update.message.chat.id, text=text, reply_markup=reply_markup)
if reply_markup is not None and len(pings) > 0:
user_data.reply_all = (message.id, [])
async def callback_button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Parses the CallbackQuery and updates the message text."""
assert update.callback_query is not None
assert context.chat_data is not None
assert update.callback_query.message is not None
query = update.callback_query
assert query.message is not None
assert isinstance(query.data, str)
# We have to always to do that because the PTB docs say so.
await query.answer()
user_data = UserData(context.user_data)
tags, text, reply_all_id = None, None, None
reply_markup = user_data.get_ping_reply_markup()
if user_data.reply_all is not None and query.message.id == user_data.reply_all[0]:
reply_all_id, tags = user_data.reply_all
tags = user_data.get_ping_list(query.message.chat_id, query.message.id)
if tags is None:
text = "Invalid ping. Removed."
reply_markup = None
match query.data:
case "submit" if len(tags) > 0:
text = f"☑️ Submitted {', '.join(tags)}."
reply_markup = None
if reply_all_id is None:
user_data.submit_ping(query.message.chat_id, query.message.id)
for (chat_id, message_id) in user_data.submit_pending_pings(tags):
await context.bot.edit_message_text(chat_id=chat_id, message_id=message_id, text=text)
case "submit":
text = "Select at least one tag before submitting."
case "clear":
case "reload":
case tag if tag not in tags:
case tag if tag in tags:
if text is None:
selected = ", ".join(tags)
if selected:
text = f"Selected {selected}."
text = "Select tags."
if text is None:
text = "Something went wrong. Sorry!"
if query.message.text != text or query.message.reply_markup != reply_markup:
await query.edit_message_text(text=text, reply_markup=reply_markup)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
assert update.message is not None
assert update.message.from_user is not None
assert context.user_data is not None
assert context.job_queue is not None
logging.info(f"Start from {update.message.from_user}.")
if not "chats" in context.bot_data:
context.bot_data["chats"] = {}
chat_id = update.message.chat_id
# Note it seems like chat_id == user_id but that's okay.
if not chat_id in context.bot_data["chats"]:
context.bot_data["chats"][chat_id] = update.message.from_user.id
user_data = UserData(context.user_data)
if not user_data.started:
user_data.started = True
text = ("TagTime has been started. You will receive pings per the official schedule.\n"
"Run /help to see all commands.")
text = "TagTimeBot is active. Run /help to see more commands."
await update.message.reply_text(text)
async def help_command(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
"""Displays info on how to use the bot."""
assert update.message is not None
text = ("Commands:\n"
" /start to start receiving pings\n"
" /listtags to see see currently defined tags\n"
" /addtags to add tags (provide tags separated by space)\n"
" /deletetags to delete tags (will not affect past pings)\n"
" /ping to receive a demo ping\n"
" /listpings to list recently answered pings\n"
" /pending to list pending pings\n"
" /replyall to tag all pending pings\n"
await update.message.reply_text(text)
async def add_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
if context.args is None or len(context.args) == 0:
text = "No tags were added because none were provided."
tags = context.args
if len(tags) == 1:
text = f"Tag '{tags[0]}' added."
t = "', '".join(tags)
text = f"Tags '{t}' added."
for tag in tags:
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def list_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
tags_set = UserData(context.user_data).tags
tags = ', '.join(sorted(tags_set))
if tags != "":
text = f"Tags: {tags}"
text = "No tags defined. Add with /add_tags."
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
async def delete_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
assert update.message is not None
assert context.user_data is not None
user_data = UserData(context.user_data)
if context.args is None or len(context.args) == 0:
text = "No tags were deleted because none were provided."
tags = context.args
tags_deleted, tags_ignored = [], []
for tag in tags:
if tag in user_data.tags:
if tags_deleted:
tags_deleted = "', '".join(tags_deleted)
text = f"Deleted '{tags_deleted}'."
text = "No tags were deleted."
if tags_ignored:
tags_ignored = "', '".join(tags_ignored)
text += f" Ignored non-existent '{tags_ignored}'."
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
def main() -> None:
"""Run the bot."""
token = open(sys.argv[1], "r").read().strip()
persistence = PicklePersistence(filepath='persistence.pkl')
application = Application.builder().token(token).persistence(persistence).build()
assert application.job_queue is not None
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("ping", ping_command))
application.add_handler(CommandHandler("listpings", list_pings_command))
application.add_handler(CommandHandler("pending", list_pending_pings_command))
application.add_handler(CommandHandler("replyall", reply_all_command))
application.add_handler(CommandHandler("addtags", add_tags_command))
application.add_handler(CommandHandler("listtags", list_tags_command))
application.add_handler(CommandHandler("deletetags", delete_tags_command))
application.job_queue.run_repeating(callback_minute, interval=60, first=5)
if __name__ == "__main__":