Browse Source
Refactoring
Refactoring
Convert whole project to runnable package Extract command handlers into separate modules Add setuptools build script Replace requirements.txt use with "editable" install of project package in Dockerfilefeature-fuel-prices
19 changed files with 300 additions and 226 deletions
-
1.dockerignore
-
5Dockerfile
-
220bot.py
-
0devpotato_bot/__init__.py
-
2devpotato_bot/__main__.py
-
11devpotato_bot/command_handlers/__init__.py
-
22devpotato_bot/command_handlers/fortune.py
-
25devpotato_bot/command_handlers/help.py
-
18devpotato_bot/command_handlers/me.py
-
12devpotato_bot/command_handlers/ping.py
-
21devpotato_bot/command_handlers/produce_error.py
-
59devpotato_bot/command_handlers/roll.py
-
2devpotato_bot/dice_parser.py
-
54devpotato_bot/error_handler.py
-
55devpotato_bot/runner.py
-
2entrypoint.sh
-
15setup.py
-
0tests/__init__.py
-
2tests/test_dice_parser.py
@ -1,220 +0,0 @@ |
|||
#!/usr/bin/env python3 |
|||
# |
|||
"""Simple Telegram bot for cryptopotato chat. |
|||
""" |
|||
|
|||
import logging |
|||
import os |
|||
import subprocess |
|||
import sys |
|||
import traceback |
|||
|
|||
import telegram |
|||
from telegram import Update, Message, ParseMode |
|||
from telegram.ext import Updater, CommandHandler, CallbackContext, Filters, run_async |
|||
|
|||
from dice_parser import Dice, ParseError, ValueRangeError |
|||
|
|||
# Enable logging |
|||
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|||
level=logging.INFO) |
|||
|
|||
logger = logging.getLogger(__name__) |
|||
|
|||
|
|||
__NOTATION_DESCRIPTION = ( |
|||
'simplified [dice notation](https://en.wikipedia.org/wiki/Dice_notation): `AdB+M`\n' |
|||
'- `A` is a number of rolls (can be omitted if 1)\n' |
|||
'- `B` specifies number of sides\n' |
|||
'- `M` is a modifier that is added to the roll result, "+" or "-" between `B` and `M` ' |
|||
"defines modifier's sign\n" |
|||
'Both `A` and `B` are positive integer numbers, `M` is an integer number, ' |
|||
f'maximum number of rolls is *{Dice.ROLL_LIMIT}*, the biggest dice has ' |
|||
f'*{Dice.BIGGEST_DICE}* sides' |
|||
) |
|||
|
|||
|
|||
def show_help(update: Update, context: CallbackContext): |
|||
"""Send a message when the command /help is issued.""" |
|||
update.message.reply_markdown( |
|||
'*Available commands:*\n\n' |
|||
|
|||
'*/me* - announce your actions to the chat\n' |
|||
'\n' |
|||
'*/ping* - check if bot is currently active\n' |
|||
'\n' |
|||
f'*/roll* or */r* - make a dice roll in {__NOTATION_DESCRIPTION}\n' |
|||
'\n' |
|||
'*/fortune* - get a random epigram', |
|||
disable_web_page_preview=True |
|||
) |
|||
|
|||
|
|||
def me_command(update: Update, context: CallbackContext): |
|||
"""Announce sender's actions to the chat.""" |
|||
message: Message = update.message |
|||
status = message.text_html.split(None, 1)[1:] |
|||
status = status[0] if status else 'completely failed to describe his own actions' |
|||
name = '<b>***</b>{}'.format(update.effective_user.mention_html()) |
|||
text = '{} {}'.format(name, status) |
|||
message.reply_html(text, quote=False, disable_web_page_preview=True) |
|||
context.bot.delete_message(message.chat_id, message.message_id) |
|||
|
|||
|
|||
def ping_command(update: Update, context: CallbackContext): |
|||
"""Confirm bot's presence in the chat.""" |
|||
update.message.reply_text('Pong!') |
|||
|
|||
|
|||
@run_async |
|||
def fortune_command(update: Update, context: CallbackContext): |
|||
"""Get random epigram from `fortune`.""" |
|||
try: |
|||
result = subprocess.run(['fortune', '-a'], capture_output=True, text=True, timeout=2) |
|||
update.message.reply_text(result.stdout, quote=False, disable_web_page_preview=True) |
|||
except (OSError, TimeoutError) as error: |
|||
logger.warning('Failed to call fortune executable: %s', error) |
|||
|
|||
|
|||
def roll_command(update: Update, context: CallbackContext): |
|||
"""Perform dice roll specified in dice notation.""" |
|||
message: Message = update.message |
|||
if context.args: |
|||
roll_str = context.args[0] |
|||
try: |
|||
dice = Dice.parse(roll_str) |
|||
except ParseError: |
|||
update.message.reply_markdown( |
|||
"Oops, couldn't decide what kind of roll you want to make.\n\n" |
|||
f"This command accepts only {__NOTATION_DESCRIPTION}", |
|||
disable_web_page_preview=True |
|||
) |
|||
return |
|||
except ValueRangeError as e: |
|||
update.message.reply_markdown(e.formatted_message) |
|||
return |
|||
label = message.text_markdown.split(None, 2)[2:] |
|||
label = label[0] if label else '' |
|||
else: |
|||
roll_str = '1d6' |
|||
dice = Dice(1, 6) |
|||
label = '' |
|||
lines = ['{} rolls *{}*'.format(update.effective_user.mention_markdown(), roll_str)] |
|||
if label: |
|||
lines.extend((' for:\n', label, '\n')) |
|||
lines.append('\n') |
|||
roll_total, single_rolls, was_limited = dice.get_result(item_limit=10) |
|||
lines.extend(( |
|||
'(', |
|||
' + '.join(str(r) for r in single_rolls) |
|||
)) |
|||
if was_limited: |
|||
lines.append(' ... ') |
|||
lines.extend((') = ', str(roll_total))) |
|||
text = ''.join(lines) |
|||
message.reply_markdown(text, quote=False, disable_web_page_preview=True) |
|||
|
|||
|
|||
__developer_ids = set() |
|||
|
|||
|
|||
def produce_error_command(update: Update, context: CallbackContext): |
|||
"""Generate error to be handled in error_handler""" |
|||
user_id = update.effective_user.id |
|||
if user_id in __developer_ids: |
|||
assert not 'a banana' |
|||
# ignore everyone else |
|||
|
|||
|
|||
def error_handler(update: Update, context: CallbackContext): |
|||
"""Log Errors caused by Updates.""" |
|||
message_parts = [f'<code>{context.error!r}</code> was triggered'] |
|||
|
|||
user = update.effective_user |
|||
if user: |
|||
message_parts.append(f' by user {user.mention_html()}') |
|||
|
|||
chat = update.effective_chat |
|||
if chat: |
|||
if chat.type == 'private': |
|||
message_parts.append(' in private chat') |
|||
else: |
|||
message_parts.append(f' in {chat.type} <i>{update.effective_chat.title}</i>') |
|||
if update.effective_chat.username: |
|||
message_parts.append(f' (@{update.effective_chat.username})') |
|||
|
|||
if update.poll: |
|||
message_parts.append(f' with poll id {update.poll.id}') |
|||
|
|||
trace = ''.join(traceback.format_tb(sys.exc_info()[2])) |
|||
message_parts.append(f'. Full traceback:\n\n<code>{trace}</code>') |
|||
message_text = ''.join(message_parts) |
|||
delivery_failed = set() |
|||
for dev_id in __developer_ids: |
|||
try: |
|||
context.bot.send_message(dev_id, message_text, parse_mode=ParseMode.HTML) |
|||
except (telegram.error.Unauthorized, telegram.error.BadRequest): |
|||
# User blocked the bot or didn't initiate conversation with it |
|||
delivery_failed.add(dev_id) |
|||
logger.warning('Update "%s" caused error "%s"', update, context.error) |
|||
|
|||
if delivery_failed: |
|||
failed_ids_str = ' '.join(str(i) for i in delivery_failed) |
|||
text = f'DM error reports delivery failed for users: {failed_ids_str}' |
|||
for dev_id in (__developer_ids - delivery_failed): |
|||
try: |
|||
context.bot.send_message(dev_id, text) |
|||
except (telegram.error.Unauthorized, telegram.error.BadRequest): |
|||
pass # just ignore it |
|||
logger.warning(text) |
|||
|
|||
|
|||
def main(): |
|||
"""Start the bot.""" |
|||
updater = Updater(token=os.getenv('BOT_TOKEN'), use_context=True) |
|||
developer_ids_str = os.getenv('DEVELOPER_IDS') |
|||
if developer_ids_str: |
|||
try: |
|||
global __developer_ids |
|||
__developer_ids = set(map(int, developer_ids_str.split(','))) |
|||
except ValueError: |
|||
logger.error( |
|||
'DEVELOPER_IDS value must be a comma-separated integers list, not "%s"!', |
|||
developer_ids_str |
|||
) |
|||
|
|||
dispatcher = updater.dispatcher |
|||
|
|||
dispatcher.add_handler(CommandHandler("help", show_help)) |
|||
|
|||
produce_error = CommandHandler( |
|||
"produce_error", |
|||
produce_error_command, |
|||
filters=~Filters.update.edited_message |
|||
) |
|||
dispatcher.add_handler(produce_error) |
|||
|
|||
me = CommandHandler("me", me_command, filters=~Filters.update.edited_message) |
|||
dispatcher.add_handler(me) |
|||
|
|||
ping = CommandHandler("ping", ping_command, filters=~Filters.update.edited_message) |
|||
dispatcher.add_handler(ping) |
|||
|
|||
fortune = CommandHandler("fortune", fortune_command, filters=~Filters.update.edited_message) |
|||
dispatcher.add_handler(fortune) |
|||
|
|||
roll = CommandHandler(['roll', 'r'], roll_command, filters=~Filters.update.edited_message) |
|||
dispatcher.add_handler(roll) |
|||
|
|||
dispatcher.add_error_handler(error_handler) |
|||
|
|||
updater.start_polling() |
|||
|
|||
# Run the bot until you press Ctrl-C or the process receives SIGINT, |
|||
# SIGTERM or SIGABRT. This should be used most of the time, since |
|||
# start_polling() is non-blocking and will stop the bot gracefully. |
|||
updater.idle() |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
main() |
@ -0,0 +1,2 @@ |
|||
from .runner import main |
|||
main() |
@ -0,0 +1,11 @@ |
|||
from . import fortune |
|||
from . import help |
|||
from . import me |
|||
from . import ping |
|||
from . import produce_error |
|||
from . import roll |
|||
|
|||
handler_getters = [ |
|||
m.get_handler for m in |
|||
(fortune, help, me, ping, produce_error, roll) |
|||
] |
@ -0,0 +1,22 @@ |
|||
import logging |
|||
import subprocess |
|||
|
|||
from telegram import Update |
|||
from telegram.ext import CallbackContext, run_async |
|||
|
|||
_logger = logging.getLogger(__name__) |
|||
|
|||
|
|||
@run_async |
|||
def _fortune_callback(update: Update, context: CallbackContext): |
|||
"""Get random epigram from `fortune`.""" |
|||
try: |
|||
result = subprocess.run(['fortune', '-a'], capture_output=True, text=True, timeout=2) |
|||
update.message.reply_text(result.stdout, quote=False, disable_web_page_preview=True) |
|||
except (OSError, TimeoutError) as error: |
|||
_logger.warning('Failed to call fortune executable: %s', error) |
|||
|
|||
|
|||
def get_handler(**kwargs): |
|||
from telegram.ext import Filters, CommandHandler |
|||
return CommandHandler("fortune", _fortune_callback, filters=~Filters.update.edited_message) |
@ -0,0 +1,25 @@ |
|||
from telegram import Update |
|||
from telegram.ext import CallbackContext |
|||
|
|||
from . import roll |
|||
|
|||
|
|||
def _help_callback(update: Update, context: CallbackContext): |
|||
"""Send a message when the command /help is issued.""" |
|||
update.message.reply_markdown( |
|||
'*Available commands:*\n\n' |
|||
|
|||
'*/me* - announce your actions to the chat\n' |
|||
'\n' |
|||
'*/ping* - check if bot is currently active\n' |
|||
'\n' |
|||
f'*/roll* or */r* - make a dice roll in {roll.NOTATION_DESCRIPTION}\n' |
|||
'\n' |
|||
'*/fortune* - get a random epigram', |
|||
disable_web_page_preview=True |
|||
) |
|||
|
|||
|
|||
def get_handler(**kwargs): |
|||
from telegram.ext import CommandHandler |
|||
return CommandHandler("help", _help_callback) |
@ -0,0 +1,18 @@ |
|||
from telegram import Update, Message |
|||
from telegram.ext import CallbackContext |
|||
|
|||
|
|||
def _me_callback(update: Update, context: CallbackContext): |
|||
"""Announce sender's actions to the chat.""" |
|||
message: Message = update.message |
|||
status = message.text_html.split(None, 1)[1:] |
|||
status = status[0] if status else 'completely failed to describe his own actions' |
|||
name = '<b>***</b>{}'.format(update.effective_user.mention_html()) |
|||
text = '{} {}'.format(name, status) |
|||
message.reply_html(text, quote=False, disable_web_page_preview=True) |
|||
context.bot.delete_message(message.chat_id, message.message_id) |
|||
|
|||
|
|||
def get_handler(**kwargs): |
|||
from telegram.ext import Filters, CommandHandler |
|||
return CommandHandler("me", _me_callback, filters=~Filters.update.edited_message) |
@ -0,0 +1,12 @@ |
|||
from telegram import Update |
|||
from telegram.ext import CallbackContext |
|||
|
|||
|
|||
def _ping_callback(update: Update, context: CallbackContext): |
|||
"""Confirm bot's presence in the chat.""" |
|||
update.message.reply_text('Pong!') |
|||
|
|||
|
|||
def get_handler(**kwargs): |
|||
from telegram.ext import Filters, CommandHandler |
|||
return CommandHandler("ping", _ping_callback, filters=~Filters.update.edited_message) |
@ -0,0 +1,21 @@ |
|||
from telegram import Update |
|||
from telegram.ext import CallbackContext |
|||
|
|||
|
|||
def _build_callback(developer_ids): |
|||
def _callback(update: Update, context: CallbackContext): |
|||
"""Generate error to be handled in error_handler""" |
|||
user_id = update.effective_user.id |
|||
if user_id in developer_ids: |
|||
assert not 'a banana' |
|||
# ignore everyone else |
|||
return _callback |
|||
|
|||
|
|||
def get_handler(*, developer_ids, **kwargs): |
|||
from telegram.ext import Filters, CommandHandler |
|||
return CommandHandler( |
|||
"produce_error", |
|||
_build_callback(developer_ids), |
|||
filters=~Filters.update.edited_message |
|||
) |
@ -0,0 +1,59 @@ |
|||
from telegram import Update, Message |
|||
from telegram.ext import CallbackContext |
|||
|
|||
from ..dice_parser import Dice, ParseError, ValueRangeError |
|||
|
|||
NOTATION_DESCRIPTION = ( |
|||
'simplified [dice notation](https://en.wikipedia.org/wiki/Dice_notation): `AdB+M`\n' |
|||
'- `A` is a number of rolls (can be omitted if 1)\n' |
|||
'- `B` specifies number of sides\n' |
|||
'- `M` is a modifier that is added to the roll result, "+" or "-" between `B` and `M` ' |
|||
"defines modifier's sign\n" |
|||
'Both `A` and `B` are positive integer numbers, `M` is an integer number, ' |
|||
f'maximum number of rolls is *{Dice.ROLL_LIMIT}*, the biggest dice has ' |
|||
f'*{Dice.BIGGEST_DICE}* sides' |
|||
) |
|||
|
|||
|
|||
def _roll_callback(update: Update, context: CallbackContext): |
|||
"""Perform dice roll specified in dice notation.""" |
|||
message: Message = update.message |
|||
if context.args: |
|||
roll_str = context.args[0] |
|||
try: |
|||
dice = Dice.parse(roll_str) |
|||
except ParseError: |
|||
update.message.reply_markdown( |
|||
"Oops, couldn't decide what kind of roll you want to make.\n\n" |
|||
f"This command accepts only {NOTATION_DESCRIPTION}", |
|||
disable_web_page_preview=True |
|||
) |
|||
return |
|||
except ValueRangeError as e: |
|||
update.message.reply_markdown(e.formatted_message) |
|||
return |
|||
label = message.text_markdown.split(None, 2)[2:] |
|||
label = label[0] if label else '' |
|||
else: |
|||
roll_str = '1d6' |
|||
dice = Dice(1, 6) |
|||
label = '' |
|||
lines = ['{} rolls *{}*'.format(update.effective_user.mention_markdown(), roll_str)] |
|||
if label: |
|||
lines.extend((' for:\n', label, '\n')) |
|||
lines.append('\n') |
|||
roll_total, single_rolls, was_limited = dice.get_result(item_limit=10) |
|||
lines.extend(( |
|||
'(', |
|||
' + '.join(str(r) for r in single_rolls) |
|||
)) |
|||
if was_limited: |
|||
lines.append(' ... ') |
|||
lines.extend((') = ', str(roll_total))) |
|||
text = ''.join(lines) |
|||
message.reply_markdown(text, quote=False, disable_web_page_preview=True) |
|||
|
|||
|
|||
def get_handler(**kwargs): |
|||
from telegram.ext import Filters, CommandHandler |
|||
return CommandHandler(['roll', 'r'], _roll_callback, filters=~Filters.update.edited_message) |
@ -0,0 +1,54 @@ |
|||
import logging |
|||
import sys |
|||
import traceback |
|||
|
|||
import telegram |
|||
from telegram import Update, ParseMode |
|||
from telegram.ext import CallbackContext |
|||
|
|||
_logger = logging.getLogger(__name__) |
|||
|
|||
|
|||
def create_callback(developer_ids): |
|||
def callback(update: Update, context: CallbackContext): |
|||
"""Log Errors caused by Updates.""" |
|||
message_parts = [f'<code>{context.error!r}</code> was triggered'] |
|||
|
|||
user = update.effective_user |
|||
if user: |
|||
message_parts.append(f' by user {user.mention_html()}') |
|||
|
|||
chat = update.effective_chat |
|||
if chat: |
|||
if chat.type == 'private': |
|||
message_parts.append(' in private chat') |
|||
else: |
|||
message_parts.append(f' in {chat.type} <i>{update.effective_chat.title}</i>') |
|||
if update.effective_chat.username: |
|||
message_parts.append(f' (@{update.effective_chat.username})') |
|||
|
|||
if update.poll: |
|||
message_parts.append(f' with poll id {update.poll.id}') |
|||
|
|||
trace = ''.join(traceback.format_tb(sys.exc_info()[2])) |
|||
message_parts.append(f'. Full traceback:\n\n<code>{trace}</code>') |
|||
message_text = ''.join(message_parts) |
|||
delivery_failed = set() |
|||
for dev_id in developer_ids: |
|||
try: |
|||
context.bot.send_message(dev_id, message_text, parse_mode=ParseMode.HTML) |
|||
except (telegram.error.Unauthorized, telegram.error.BadRequest): |
|||
# User blocked the bot or didn't initiate conversation with it |
|||
delivery_failed.add(dev_id) |
|||
_logger.warning('Update "%s" caused error "%s"', update, context.error) |
|||
|
|||
if delivery_failed: |
|||
failed_ids_str = ' '.join(str(i) for i in delivery_failed) |
|||
text = f'DM error reports delivery failed for users: {failed_ids_str}' |
|||
for dev_id in (developer_ids - delivery_failed): |
|||
try: |
|||
context.bot.send_message(dev_id, text) |
|||
except (telegram.error.Unauthorized, telegram.error.BadRequest): |
|||
pass # just ignore it |
|||
_logger.warning(text) |
|||
return callback |
@ -0,0 +1,55 @@ |
|||
import logging |
|||
import os |
|||
|
|||
import telegram |
|||
from telegram.ext import Updater |
|||
|
|||
|
|||
class Runner: |
|||
def __init__(self, token, developer_ids=None): |
|||
self.logger = logging.getLogger(__name__) |
|||
self.updater = Updater(token=token, use_context=True) |
|||
self.developer_ids = set() if developer_ids is None else developer_ids |
|||
|
|||
dispatcher = self.updater.dispatcher |
|||
|
|||
from .command_handlers import handler_getters |
|||
for handler_getter in handler_getters: |
|||
dispatcher.add_handler(handler_getter(developer_ids=self.developer_ids)) |
|||
|
|||
from devpotato_bot.error_handler import create_callback |
|||
dispatcher.add_error_handler(create_callback(self.developer_ids)) |
|||
|
|||
def run(self): |
|||
self.updater.start_polling() |
|||
|
|||
# Run the bot until you press Ctrl-C or the process receives SIGINT, |
|||
# SIGTERM or SIGABRT. This should be used most of the time, since |
|||
# start_polling() is non-blocking and will stop the bot gracefully. |
|||
self.updater.idle() |
|||
|
|||
|
|||
def main(): |
|||
"""Start the bot.""" |
|||
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|||
level=logging.INFO) |
|||
logger = logging.getLogger(__name__) |
|||
logger.info('Cryptopotato bot started') |
|||
|
|||
bot_token = os.getenv('BOT_TOKEN') |
|||
developer_ids_str = os.getenv('DEVELOPER_IDS') |
|||
developer_ids = None |
|||
if developer_ids_str: |
|||
try: |
|||
developer_ids = set(map(int, developer_ids_str.split(','))) |
|||
except ValueError: |
|||
logger.error( |
|||
'DEVELOPER_IDS value must be a comma-separated integers list, not "%s"!', |
|||
developer_ids_str |
|||
) |
|||
try: |
|||
Runner(bot_token, developer_ids).run() |
|||
except telegram.error.InvalidToken as e: |
|||
logger.error('Bot token "%s" is not valid', bot_token, exc_info=e) |
|||
except Exception as e: |
|||
logger.error('Unhandled exception', exc_info=e) |
@ -0,0 +1,15 @@ |
|||
from setuptools import setup |
|||
|
|||
setup( |
|||
name = 'devpotato-bot', |
|||
version = '0.3.2', |
|||
description='Telegram bot for cryptopotato chat', |
|||
packages = ['devpotato_bot'], |
|||
python_requires='>=3.5', |
|||
install_requires=['python-telegram-bot>=12.3.0'], |
|||
author='Vladislav Glinsky', |
|||
author_email='cl0ne@mithril.org.ua', |
|||
url="https://code.nix.org.ua/cl0ne/cryptopotato-bot", |
|||
license='MIT', |
|||
license_file='LICENSE' |
|||
) |
@ -1,7 +1,7 @@ |
|||
import unittest |
|||
from unittest import mock |
|||
|
|||
from dice_parser import Dice, ParseError, ValueRangeError |
|||
from devpotato_bot.dice_parser import Dice, ParseError, ValueRangeError |
|||
|
|||
|
|||
class DiceParserTest(unittest.TestCase): |
Write
Preview
Loading…
Cancel
Save
Reference in new issue