1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
import base64
import os
from quart import Quart, render_template_string, request
from telethon import TelegramClient, utils
from telethon.errors import SessionPasswordNeededError
def get_env(name, message):
if name in os.environ:
return os.environ[name]
return input(message)
BASE_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head>
<meta charset='UTF-8'>
<title>Telethon + Quart</title>
</head>
<body>{{ content | safe }}</body>
</html>
'''
PHONE_FORM = '''
<form action='/' method='post'>
Phone (international format): <input name='phone' type='text' placeholder='+34600000000'>
<input type='submit'>
</form>
'''
CODE_FORM = '''
<form action='/' method='post'>
Telegram code: <input name='code' type='text' placeholder='70707'>
<input type='submit'>
</form>
'''
PASSWORD_FORM = '''
<form action='/' method='post'>
Telegram password: <input name='password' type='text' placeholder='your password'>
<input type='submit'>
</form>
'''
# Session name, API ID and hash to use; loaded from environmental variables
SESSION = os.environ.get('TG_SESSION', 'quart')
API_ID = int(get_env('TG_API_ID', 'Enter your API ID: '))
API_HASH = get_env('TG_API_HASH', 'Enter your API hash: ')
# Telethon client
client = TelegramClient(SESSION, API_ID, API_HASH)
client.parse_mode = 'html' # <- Render things nicely
phone = None
# Quart app
app = Quart(__name__)
app.secret_key = 'CHANGE THIS TO SOMETHING SECRET'
# Helper method to format messages nicely
async def format_message(message):
if message.photo:
content = '<img src="data:image/png;base64,{}" alt="{}" />'.format(
base64.b64encode(await message.download_media(bytes)).decode(),
message.raw_text
)
else:
# client.parse_mode = 'html', so bold etc. will work!
content = (message.text or '(action message)').replace('\n', '<br>')
return '<p><strong>{}</strong>: {}<sub>{}</sub></p>'.format(
utils.get_display_name(message.sender),
content,
message.date
)
# Connect the client before we start serving with Quart
@app.before_serving
async def startup():
# After connecting, the client will create additional asyncio tasks that run until it's disconnected again.
# Be careful to not mix different asyncio loops during a client's lifetime, or things won't work properly!
await client.connect()
# After we're done serving (near shutdown), clean up the client
@app.after_serving
async def cleanup():
await client.disconnect()
@app.route('/', methods=['GET', 'POST'])
async def root():
# We want to update the global phone variable to remember it
global phone
# Check form parameters (phone/code)
form = await request.form
if 'phone' in form:
phone = form['phone']
await client.send_code_request(phone)
if 'code' in form:
try:
await client.sign_in(code=form['code'])
except SessionPasswordNeededError:
return await render_template_string(BASE_TEMPLATE, content=PASSWORD_FORM)
if 'password' in form:
await client.sign_in(password=form['password'])
# If we're logged in, show them some messages from their first dialog
if await client.is_user_authorized():
# They are logged in, show them some messages from their first dialog
dialog = (await client.get_dialogs())[0]
result = '<h1>{}</h1>'.format(dialog.title)
async for m in client.iter_messages(dialog, 10):
result += await(format_message(m))
return await render_template_string(BASE_TEMPLATE, content=result)
# Ask for the phone if we don't know it yet
if phone is None:
return await render_template_string(BASE_TEMPLATE, content=PHONE_FORM)
# We have the phone, but we're not logged in, so ask for the code
return await render_template_string(BASE_TEMPLATE, content=CODE_FORM)
# By default, `Quart.run` uses `asyncio.run()`, which creates a new asyncio
# event loop. If we had connected the `TelegramClient` before, `telethon` will
# use `asyncio.get_running_loop()` to create some additional tasks. If these
# loops are different, it won't work.
#
# To keep things simple, be sure to not create multiple asyncio loops!
if __name__ == '__main__':
app.run()
|