mirea_check_bot/main.py
2024-08-01 10:53:40 +03:00

106 lines
3.9 KiB
Python

import os
from dotenv import load_dotenv
from health_ping import HealthPing
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, constants
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes
import requests
dotenv_path = os.path.join(os.path.dirname(__file__), ".env")
load_dotenv(dotenv_path)
TOKEN = os.getenv("BOT_TOKEN")
# CHAT = os.getenv("CHAT_ID")
api_1 = "https://priem.mirea.ru/competitions_api/entrants?competitions[]=1793877758295678262"
api_2 = "https://priem.mirea.ru/competitions_api/entrants?competitions[]=1793877924783332662"
if os.getenv("HEALTHCHECKS_ENDPOINT"):
HealthPing(
url=os.getenv("HEALTHCHECKS_ENDPOINT"),
schedule="1 * * * *",
retries=[60, 300, 720],
).start()
hpo = 0
snils = "143-471-137-40"
place = 0
# Define a few command handlers. These usually take the two arguments update and
# context.
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Sends a message with three inline buttons attached."""
await update.message.reply_text("Вы хуй, можете проверить списки!")
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
await update.message.reply_text("Help!")
async def check_lists(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Parses the CallbackQuery and updates the message text."""
query = update.callback_query
# CallbackQueries need to be answered, even if no notification to the user is needed
# Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
await query.answer()
await query.edit_message_text(text=f"Selected option: {query.data}")
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Echo the user message."""
await update.message.reply_text(update.message.text)
async def check_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global hpo, snils, place
response = requests.get(api_1)
if response.status_code == 200:
# Получение данных в формате JSON
data = response.json()
data = data["data"][0]["entrants"]
if len(data) != 0:
for entrant in data:
if entrant["spn"] != snils:
if "iHPO" in entrant.keys():
hpo += 1
if entrant["s"] == "Активный" or entrant["s"] == "Сданы ВИ":
place += 1
else:
await update.message.reply_text(f"*Конкурс: Проектирование и обслуживание высоконагруженных информационных систем*\nВы на {place+1} месте по общему конкурсу, на {hpo+1} месте по высшему приоритеу, колво баллов {entrant['fm']}", parse_mode=constants.ParseMode.MARKDOWN_V2)
else:
await update.message.reply_text(f"Проверьте позже, в текущий момент сервис недоступен!")
else:
print(f'Ошибка: {response.status_code}')
def main():
"""
Handles the initial launch of the program (entry point).
"""
print(TOKEN[:10])
application = Application.builder().token(TOKEN).build()
# on different commands - answer in Telegram
application.add_handler(CommandHandler("start", start))
application.add_handler(CallbackQueryHandler(check_lists))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("check", check_command))
# Run the bot until the user presses Ctrl-C
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()