381 lines
14 KiB
Python
381 lines
14 KiB
Python
import logging
|
|
import timer
|
|
import sys
|
|
from collections import OrderedDict
|
|
from typing import Optional
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
|
|
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, PicklePersistence
|
|
|
|
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
logging.getLogger('apscheduler').setLevel(logging.WARNING)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class UserData:
|
|
def __init__(self, user_data):
|
|
if not "pings" in user_data:
|
|
user_data["pings"] = OrderedDict()
|
|
|
|
if not "tags" in user_data:
|
|
user_data["tags"] = set()
|
|
|
|
if not "started" in user_data:
|
|
user_data["started"] = False
|
|
|
|
if not "pending_pings" in user_data:
|
|
user_data["pending_pings"] = {}
|
|
|
|
if not "reply_all" in user_data:
|
|
user_data["reply_all"] = None
|
|
|
|
self.user_data = user_data
|
|
|
|
@property
|
|
def started(self):
|
|
return self.user_data["started"]
|
|
|
|
@started.setter
|
|
def started(self, value):
|
|
self.user_data["started"] = value
|
|
|
|
@property
|
|
def tags(self):
|
|
return self.user_data["tags"]
|
|
|
|
@property
|
|
def pings(self):
|
|
return self.user_data["pings"]
|
|
|
|
@property
|
|
def reply_all(self) -> Optional[tuple[int, list[str]]]:
|
|
return self.user_data["reply_all"]
|
|
|
|
@reply_all.setter
|
|
def reply_all(self, value: Optional[tuple[int, list[str]]]):
|
|
self.user_data["reply_all"] = value
|
|
|
|
def add_tag(self, tag):
|
|
self.user_data["tags"].add(tag)
|
|
|
|
def delete_tag(self, tag):
|
|
"""Remove an item from the inventory."""
|
|
if tag in self.user_data["tags"]:
|
|
self.user_data["tags"].remove(tag)
|
|
|
|
def add_ping(self, chat_id, message_id, ping_id):
|
|
self.user_data["pending_pings"][(chat_id, message_id)] = (ping_id, [])
|
|
|
|
def get_ping_list(self, chat_id, message_id) -> Optional[list]:
|
|
try:
|
|
_, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
|
|
return ping_list
|
|
except KeyError:
|
|
return None
|
|
|
|
def submit_ping(self, chat_id, message_id):
|
|
ping_id, ping_list = self.user_data["pending_pings"][(chat_id, message_id)]
|
|
del self.user_data["pending_pings"][(chat_id, message_id)]
|
|
self.user_data["pings"][ping_id] = ping_list
|
|
|
|
def submit_pending_pings(self, tags: list[str]) -> list[tuple[int, int]]:
|
|
res = []
|
|
for (chat_id, message_id), (ping_id, _) in self.user_data["pending_pings"].items():
|
|
self.user_data["pings"][ping_id] = tags
|
|
res.append((chat_id, message_id))
|
|
for (chat_id, message_id) in res:
|
|
del self.user_data["pending_pings"][(chat_id, message_id)]
|
|
return res
|
|
|
|
def get_pending_pings(self):
|
|
return self.user_data["pending_pings"].values()
|
|
|
|
def get_ping_reply_markup(self):
|
|
keyboard = []
|
|
keyboard.append([InlineKeyboardButton("🎯 on-target", callback_data="on-target"),
|
|
InlineKeyboardButton("❌ off-target", callback_data="off-target"),
|
|
InlineKeyboardButton("⭕ no-target", callback_data="no-target"),])
|
|
tags_per_row = 5
|
|
tags = list(sorted(self.tags))
|
|
for i in range(0, len(tags), tags_per_row):
|
|
keyboard.append([])
|
|
for tag in tags[i:i + tags_per_row]:
|
|
keyboard[-1].append(InlineKeyboardButton(tag, callback_data=tag))
|
|
keyboard.append([InlineKeyboardButton("🗑️ clear", callback_data="clear"),
|
|
InlineKeyboardButton("🔄 reload", callback_data="reload"),
|
|
InlineKeyboardButton("📤 submit", callback_data="submit"),])
|
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
|
return reply_markup
|
|
|
|
def ping_is_due(self) -> Optional[int]:
|
|
t = timer.Timer()
|
|
prev_ping = t.prev(t.time_now())
|
|
pending_pings = [ping for ping, _ in self.get_pending_pings()]
|
|
if prev_ping in self.pings or prev_ping in pending_pings:
|
|
return None
|
|
return prev_ping
|
|
|
|
|
|
async def callback_minute(context: ContextTypes.DEFAULT_TYPE):
|
|
assert context.job is not None
|
|
for chat_id, user_id in context.bot_data['chats'].items():
|
|
user_data = UserData(context.application.user_data[user_id])
|
|
if (ping := user_data.ping_is_due()) is not None:
|
|
reply_markup = user_data.get_ping_reply_markup()
|
|
text = f"Ping! {timer.time_to_str(ping)}"
|
|
message = await context.bot.send_message(chat_id=chat_id, text=text, reply_markup=reply_markup)
|
|
user_data.add_ping(chat_id, message.id, ping)
|
|
|
|
|
|
async def ping_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
assert context.user_data is not None
|
|
assert update.message is not None
|
|
|
|
user_data = UserData(context.user_data)
|
|
reply_markup = user_data.get_ping_reply_markup()
|
|
message = await update.message.reply_text("Ping! Select Tags.", reply_markup=reply_markup)
|
|
user_data.add_ping(update.message.chat_id, message.id, timer.now())
|
|
|
|
|
|
async def list_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
assert update.message is not None
|
|
assert context.user_data is not None
|
|
|
|
pings = sorted(UserData(context.user_data).pings.items())
|
|
text = "There aren't any pings, yet. Use /ping to get pinged now."
|
|
n_pings = 20
|
|
if pings:
|
|
text = f"Recent {n_pings} pings:\n"
|
|
for ping, tags in pings[-n_pings:]:
|
|
ping_time = timer.time_to_str(ping)
|
|
text += f" {ping} {ping_time} {', '.join(sorted(tags))}\n"
|
|
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
|
|
|
|
|
async def list_pending_pings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
assert update.message is not None
|
|
assert context.user_data is not None
|
|
|
|
user_data = UserData(context.user_data)
|
|
pings = user_data.get_pending_pings()
|
|
text = "There are no pending pings."
|
|
if pings:
|
|
text = "Pending pings:\n"
|
|
for ping, tags in pings:
|
|
ping_time = timer.time_to_str(ping)
|
|
text += f" {ping_time}: {', '.join(sorted(tags))}\n"
|
|
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
|
|
|
|
|
async def reply_all_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
assert update.message is not None
|
|
assert context.user_data is not None
|
|
|
|
user_data = UserData(context.user_data)
|
|
user_data.reply_all = None
|
|
pings = user_data.get_pending_pings()
|
|
if user_data.reply_all is not None:
|
|
text = "It seems like there is a pending reply all command."
|
|
reply_markup = None
|
|
elif len(pings) == 0:
|
|
text = "No pending pings to reply to."
|
|
reply_markup = None
|
|
else:
|
|
text = f"Reply to {len(pings)} pending pings."
|
|
reply_markup=user_data.get_ping_reply_markup()
|
|
message = await context.bot.send_message(chat_id=update.message.chat.id, text=text, reply_markup=reply_markup)
|
|
if reply_markup is not None and len(pings) > 0:
|
|
user_data.reply_all = (message.id, [])
|
|
|
|
|
|
async def callback_button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Parses the CallbackQuery and updates the message text."""
|
|
assert update.callback_query is not None
|
|
assert context.chat_data is not None
|
|
assert update.callback_query.message is not None
|
|
query = update.callback_query
|
|
assert query.message is not None
|
|
assert isinstance(query.data, str)
|
|
|
|
# We have to always to do that because the PTB docs say so.
|
|
await query.answer()
|
|
|
|
user_data = UserData(context.user_data)
|
|
tags, text, reply_all_id = None, None, None
|
|
reply_markup = user_data.get_ping_reply_markup()
|
|
|
|
if user_data.reply_all is not None and query.message.id == user_data.reply_all[0]:
|
|
reply_all_id, tags = user_data.reply_all
|
|
else:
|
|
tags = user_data.get_ping_list(query.message.chat_id, query.message.id)
|
|
|
|
if tags is None:
|
|
text = "Invalid ping. Removed."
|
|
reply_markup = None
|
|
else:
|
|
match query.data:
|
|
case "submit" if len(tags) > 0:
|
|
text = f"☑️ Submitted {', '.join(tags)}."
|
|
reply_markup = None
|
|
if reply_all_id is None:
|
|
user_data.submit_ping(query.message.chat_id, query.message.id)
|
|
else:
|
|
for (chat_id, message_id) in user_data.submit_pending_pings(tags):
|
|
await context.bot.edit_message_text(chat_id=chat_id, message_id=message_id, text=text)
|
|
case "submit":
|
|
text = "Select at least one tag before submitting."
|
|
case "clear":
|
|
tags.clear()
|
|
case "reload":
|
|
pass
|
|
case tag if tag not in tags:
|
|
tags.append(tag)
|
|
case tag if tag in tags:
|
|
tags.remove(tag)
|
|
|
|
if text is None:
|
|
selected = ", ".join(tags)
|
|
if selected:
|
|
text = f"Selected {selected}."
|
|
else:
|
|
text = "Select tags."
|
|
|
|
if text is None:
|
|
text = "Something went wrong. Sorry!"
|
|
|
|
if query.message.text != text or query.message.reply_markup != reply_markup:
|
|
await query.edit_message_text(text=text, reply_markup=reply_markup)
|
|
|
|
|
|
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
assert update.message is not None
|
|
assert update.message.from_user is not None
|
|
assert context.user_data is not None
|
|
assert context.job_queue is not None
|
|
logging.info(f"Start from {update.message.from_user}.")
|
|
|
|
if not "chats" in context.bot_data:
|
|
context.bot_data["chats"] = {}
|
|
chat_id = update.message.chat_id
|
|
|
|
# Note it seems like chat_id == user_id but that's okay.
|
|
if not chat_id in context.bot_data["chats"]:
|
|
context.bot_data["chats"][chat_id] = update.message.from_user.id
|
|
|
|
user_data = UserData(context.user_data)
|
|
if not user_data.started:
|
|
user_data.started = True
|
|
text = ("TagTime has been started. You will receive pings per the official schedule.\n"
|
|
"\n"
|
|
"Run /help to see all commands.")
|
|
else:
|
|
text = "TagTimeBot is active. Run /help to see more commands."
|
|
|
|
await update.message.reply_text(text)
|
|
|
|
|
|
async def help_command(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Displays info on how to use the bot."""
|
|
assert update.message is not None
|
|
text = ("Commands:\n"
|
|
" /start to start receiving pings\n"
|
|
"\n"
|
|
" /listtags to see see currently defined tags\n"
|
|
" /addtags to add tags (provide tags separated by space)\n"
|
|
" /deletetags to delete tags (will not affect past pings)\n"
|
|
"\n"
|
|
" /ping to receive a demo ping\n"
|
|
" /listpings to list recently answered pings\n"
|
|
" /pending to list pending pings\n"
|
|
" /replyall to tag all pending pings\n"
|
|
"\n"
|
|
"🫂")
|
|
await update.message.reply_text(text)
|
|
|
|
|
|
async def add_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
assert update.message is not None
|
|
assert context.user_data is not None
|
|
|
|
user_data = UserData(context.user_data)
|
|
if context.args is None or len(context.args) == 0:
|
|
text = "No tags were added because none were provided."
|
|
else:
|
|
tags = context.args
|
|
if len(tags) == 1:
|
|
text = f"Tag '{tags[0]}' added."
|
|
else:
|
|
t = "', '".join(tags)
|
|
text = f"Tags '{t}' added."
|
|
for tag in tags:
|
|
user_data.add_tag(tag)
|
|
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
|
|
|
|
|
async def list_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
assert update.message is not None
|
|
assert context.user_data is not None
|
|
|
|
tags_set = UserData(context.user_data).tags
|
|
tags = ', '.join(sorted(tags_set))
|
|
if tags != "":
|
|
text = f"Tags: {tags}"
|
|
else:
|
|
text = "No tags defined. Add with /add_tags."
|
|
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
|
|
|
|
|
async def delete_tags_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
assert update.message is not None
|
|
assert context.user_data is not None
|
|
|
|
user_data = UserData(context.user_data)
|
|
if context.args is None or len(context.args) == 0:
|
|
text = "No tags were deleted because none were provided."
|
|
else:
|
|
tags = context.args
|
|
tags_deleted, tags_ignored = [], []
|
|
for tag in tags:
|
|
if tag in user_data.tags:
|
|
user_data.delete_tag(tag)
|
|
tags_deleted.append(tag)
|
|
else:
|
|
tags_ignored.append(tag)
|
|
if tags_deleted:
|
|
tags_deleted = "', '".join(tags_deleted)
|
|
text = f"Deleted '{tags_deleted}'."
|
|
else:
|
|
text = "No tags were deleted."
|
|
|
|
if tags_ignored:
|
|
tags_ignored = "', '".join(tags_ignored)
|
|
text += f" Ignored non-existent '{tags_ignored}'."
|
|
await context.bot.send_message(chat_id=update.message.chat.id, text=text)
|
|
|
|
|
|
def main() -> None:
|
|
"""Run the bot."""
|
|
token = open(sys.argv[1], "r").read().strip()
|
|
persistence = PicklePersistence(filepath='persistence.pkl')
|
|
application = Application.builder().token(token).persistence(persistence).build()
|
|
assert application.job_queue is not None
|
|
|
|
application.add_handler(CommandHandler("start", start_command))
|
|
application.add_handler(CommandHandler("help", help_command))
|
|
application.add_handler(CommandHandler("ping", ping_command))
|
|
application.add_handler(CommandHandler("listpings", list_pings_command))
|
|
application.add_handler(CommandHandler("pending", list_pending_pings_command))
|
|
application.add_handler(CommandHandler("replyall", reply_all_command))
|
|
application.add_handler(CallbackQueryHandler(callback_button))
|
|
|
|
application.add_handler(CommandHandler("addtags", add_tags_command))
|
|
application.add_handler(CommandHandler("listtags", list_tags_command))
|
|
application.add_handler(CommandHandler("deletetags", delete_tags_command))
|
|
|
|
application.job_queue.run_repeating(callback_minute, interval=60, first=5)
|
|
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|