1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
|
# Copyright (c) 2023. Lena "Teekeks" During <info@teawork.de>
"""
EventSub Webhook
----------------
.. note:: EventSub Webhook is targeted at programs which have to subscribe to topics for multiple broadcasters.\n
Should you only need to target a single broadcaster or are building a client side project, look at :doc:`/modules/twitchAPI.eventsub.websocket`
EventSub lets you listen for events that happen on Twitch.
The EventSub client runs in its own thread, calling the given callback function whenever an event happens.
************
Requirements
************
.. note:: Please note that Your Endpoint URL has to be HTTPS, has to run on Port 443 and requires a valid, non self signed certificate
This most likely means, that you need a reverse proxy like nginx. You can also hand in a valid ssl context to be used in the constructor.
In the case that you don't hand in a valid ssl context to the constructor, you can specify any port you want in the constructor and handle the
bridge between this program and your public URL on port 443 via reverse proxy.\n
You can check on whether or not your webhook is publicly reachable by navigating to the URL set in `callback_url`.
You should get a 200 response with the text :code:`pyTwitchAPI eventsub`.
*******************
Listening to topics
*******************
After you started your EventSub client, you can use the :code:`listen_` prefixed functions to listen to the topics you are interested in.
Look at :ref:`eventsub-available-topics` to find the topics you are interested in.
The function you hand in as callback will be called whenever that event happens with the event data as a parameter,
the type of that parameter is also listed in the link above.
************
Code Example
************
.. code-block:: python
from twitchAPI.twitch import Twitch
from twitchAPI.helper import first
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.object.eventsub import ChannelFollowEvent
from twitchAPI.oauth import UserAuthenticator
from twitchAPI.type import AuthScope
import asyncio
TARGET_USERNAME = 'target_username_here'
EVENTSUB_URL = 'https://url.to.your.webhook.com'
APP_ID = 'your_app_id'
APP_SECRET = 'your_app_secret'
TARGET_SCOPES = [AuthScope.MODERATOR_READ_FOLLOWERS]
async def on_follow(data: ChannelFollowEvent):
# our event happened, lets do things with the data we got!
print(f'{data.event.user_name} now follows {data.event.broadcaster_user_name}!')
async def eventsub_webhook_example():
# create the api instance and get the ID of the target user
twitch = await Twitch(APP_ID, APP_SECRET)
user = await first(twitch.get_users(logins=TARGET_USERNAME))
# the user has to authenticate once using the bot with our intended scope.
# since we do not need the resulting token after this authentication, we just discard the result we get from authenticate()
# Please read up the UserAuthenticator documentation to get a full view of how this process works
auth = UserAuthenticator(twitch, TARGET_SCOPES)
await auth.authenticate()
# basic setup, will run on port 8080 and a reverse proxy takes care of the https and certificate
eventsub = EventSubWebhook(EVENTSUB_URL, 8080, twitch)
# unsubscribe from all old events that might still be there
# this will ensure we have a clean slate
await eventsub.unsubscribe_all()
# start the eventsub client
eventsub.start()
# subscribing to the desired eventsub hook for our user
# the given function (in this example on_follow) will be called every time this event is triggered
# the broadcaster is a moderator in their own channel by default so specifying both as the same works in this example
await eventsub.listen_channel_follow_v2(user.id, user.id, on_follow)
# eventsub will run in its own process
# so lets just wait for user input before shutting it all down again
try:
input('press Enter to shut down...')
finally:
# stopping both eventsub as well as gracefully closing the connection to the API
await eventsub.stop()
await twitch.close()
print('done')
# lets run our example
asyncio.run(eventsub_webhook_example())"""
import asyncio
import hashlib
import hmac
import threading
from functools import partial
from json import JSONDecodeError
from random import choice
from string import ascii_lowercase
from ssl import SSLContext
from time import sleep
from typing import Optional, Union, Callable, Awaitable
import datetime
from collections import deque
from aiohttp import web, ClientSession
from twitchAPI.eventsub.base import EventSubBase
from ..twitch import Twitch
from ..helper import done_task_callback
from ..type import TwitchBackendException, EventSubSubscriptionConflict, EventSubSubscriptionError, EventSubSubscriptionTimeout, \
TwitchAuthorizationException, AuthType
__all__ = ['EventSubWebhook']
class EventSubWebhook(EventSubBase):
def __init__(self,
callback_url: str,
port: int,
twitch: Twitch,
ssl_context: Optional[SSLContext] = None,
host_binding: str = '0.0.0.0',
subscription_url: Optional[str] = None,
callback_loop: Optional[asyncio.AbstractEventLoop] = None,
revocation_handler: Optional[Callable[[dict], Awaitable[None]]] = None,
message_deduplication_history_length: int = 50):
"""
:param callback_url: The full URL of the webhook.
:param port: the port on which this webhook should run
:param twitch: a app authenticated instance of :const:`~twitchAPI.twitch.Twitch`
:param ssl_context: optional ssl context to be used |default| :code:`None`
:param host_binding: the host to bind the internal server to |default| :code:`0.0.0.0`
:param subscription_url: Alternative subscription URL, useful for development with the twitch-cli
:param callback_loop: The asyncio eventloop to be used for callbacks. \n
Set this if you or a library you use cares about which asyncio event loop is running the callbacks.
Defaults to the one used by EventSub Webhook.
:param revocation_handler: Optional handler for when subscriptions get revoked. |default| :code:`None`
:param message_deduplication_history_length: The amount of messages being considered for the duplicate message deduplication. |default| :code:`50`
"""
super().__init__(twitch, 'twitchAPI.eventsub.webhook')
self.callback_url: str = callback_url
"""The full URL of the webhook."""
if self.callback_url[-1] == '/':
self.callback_url = self.callback_url[:-1]
self.secret: str = ''.join(choice(ascii_lowercase) for _ in range(20))
"""A random secret string. Set this for added security. |default| :code:`A random 20 character long string`"""
self.wait_for_subscription_confirm: bool = True
"""Set this to false if you don't want to wait for a subscription confirm. |default| :code:`True`"""
self.wait_for_subscription_confirm_timeout: int = 30
"""Max time in seconds to wait for a subscription confirmation. Only used if ``wait_for_subscription_confirm`` is set to True.
|default| :code:`30`"""
self._port: int = port
self.subscription_url: Optional[str] = subscription_url
"""Alternative subscription URL, useful for development with the twitch-cli"""
if self.subscription_url is not None and self.subscription_url[-1] != '/':
self.subscription_url += '/'
self._callback_loop = callback_loop
self._host: str = host_binding
self.__running = False
self.revokation_handler: Optional[Callable[[dict], Awaitable[None]]] = revocation_handler
"""Optional handler for when subscriptions get revoked."""
self._startup_complete = False
self.unsubscribe_on_stop: bool = True
"""Unsubscribe all currently active Webhooks on calling :const:`~twitchAPI.eventsub.EventSub.stop()` |default| :code:`True`"""
self._closing = False
self.__ssl_context: Optional[SSLContext] = ssl_context
self.__active_webhooks = {}
self.__hook_thread: Union['threading.Thread', None] = None
self.__hook_loop: Union['asyncio.AbstractEventLoop', None] = None
self.__hook_runner: Union['web.AppRunner', None] = None
self._task_callback = partial(done_task_callback, self.logger)
if not self.callback_url.startswith('https'):
raise RuntimeError('HTTPS is required for authenticated webhook.\n'
+ 'Either use non authenticated webhook or use a HTTPS proxy!')
self._msg_id_history: deque = deque(maxlen=message_deduplication_history_length)
async def _unsubscribe_hook(self, topic_id: str) -> bool:
return True
def __build_runner(self):
hook_app = web.Application()
hook_app.add_routes([web.post('/callback', self.__handle_callback),
web.get('/', self.__handle_default)])
return web.AppRunner(hook_app)
def __run_hook(self, runner: 'web.AppRunner'):
self.__hook_runner = runner
self.__hook_loop = asyncio.new_event_loop()
if self._callback_loop is None:
self._callback_loop = self.__hook_loop
asyncio.set_event_loop(self.__hook_loop)
self.__hook_loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, str(self._host), self._port, ssl_context=self.__ssl_context)
self.__hook_loop.run_until_complete(site.start())
self.logger.info('started twitch API event sub on port ' + str(self._port))
self._startup_complete = True
self.__hook_loop.run_until_complete(self._keep_loop_alive())
async def _keep_loop_alive(self):
while not self._closing:
await asyncio.sleep(0.1)
def start(self):
"""Starts the EventSub client
:rtype: None
:raises RuntimeError: if EventSub is already running
"""
if self.__running:
raise RuntimeError('already started')
self.__hook_thread = threading.Thread(target=self.__run_hook, args=(self.__build_runner(),))
self.__running = True
self._startup_complete = False
self._closing = False
self.__hook_thread.start()
while not self._startup_complete:
sleep(0.1)
async def stop(self):
"""Stops the EventSub client
This also unsubscribes from all known subscriptions if unsubscribe_on_stop is True
:rtype: None
:raises RuntimeError: if EventSub is not running
"""
if not self.__running:
raise RuntimeError('EventSubWebhook is not running')
self.logger.debug('shutting down eventsub')
if self.__hook_runner is not None and self.unsubscribe_on_stop:
await self.unsubscribe_all_known()
# ensure all client sessions are closed
await asyncio.sleep(0.25)
self._closing = True
# cleanly shut down the runner
if self.__hook_runner is not None:
await self.__hook_runner.shutdown()
await self.__hook_runner.cleanup()
self.__hook_runner = None
self.__running = False
self.logger.debug('eventsub shut down')
def _get_transport(self) -> dict:
return {
'method': 'webhook',
'callback': f'{self.callback_url}/callback',
'secret': self.secret
}
async def _build_request_header(self) -> dict:
token = await self._twitch.get_refreshed_app_token()
if token is None:
raise TwitchAuthorizationException('no Authorization set!')
return {
'Client-ID': self._twitch.app_id,
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
}
async def _subscribe(self, sub_type: str, sub_version: str, condition: dict, callback, event, is_batching_enabled: Optional[bool] = None) -> str:
""""Subscribe to Twitch Topic"""
if not asyncio.iscoroutinefunction(callback):
raise ValueError('callback needs to be a async function which takes one parameter')
self.logger.debug(f'subscribe to {sub_type} version {sub_version} with condition {condition}')
data = {
'type': sub_type,
'version': sub_version,
'condition': condition,
'transport': self._get_transport()
}
if is_batching_enabled is not None:
data['is_batching_enabled'] = is_batching_enabled
async with ClientSession(timeout=self._twitch.session_timeout) as session:
sub_base = self.subscription_url if self.subscription_url is not None else self._twitch.base_url
r_data = await self._api_post_request(session, sub_base + 'eventsub/subscriptions', data=data)
result = await r_data.json()
error = result.get('error')
if r_data.status == 500:
raise TwitchBackendException(error)
if error is not None:
if error.lower() == 'conflict':
raise EventSubSubscriptionConflict(result.get('message', ''))
raise EventSubSubscriptionError(result.get('message'))
sub_id = result['data'][0]['id']
self.logger.debug(f'subscription for {sub_type} version {sub_version} with condition {condition} has id {sub_id}')
self._add_callback(sub_id, callback, event)
if self.wait_for_subscription_confirm:
timeout = datetime.datetime.utcnow() + datetime.timedelta(
seconds=self.wait_for_subscription_confirm_timeout)
while timeout >= datetime.datetime.utcnow():
if self._callbacks[sub_id]['active']:
return sub_id
await asyncio.sleep(0.01)
self._callbacks.pop(sub_id, None)
raise EventSubSubscriptionTimeout()
return sub_id
def _target_token(self) -> AuthType:
return AuthType.APP
async def _verify_signature(self, request: 'web.Request') -> bool:
expected = request.headers['Twitch-Eventsub-Message-Signature']
hmac_message = request.headers['Twitch-Eventsub-Message-Id'] + \
request.headers['Twitch-Eventsub-Message-Timestamp'] + await request.text()
sig = 'sha256=' + hmac.new(bytes(self.secret, 'utf-8'),
msg=bytes(hmac_message, 'utf-8'),
digestmod=hashlib.sha256).hexdigest().lower()
return sig == expected
# noinspection PyUnusedLocal
@staticmethod
async def __handle_default(request: 'web.Request'):
return web.Response(text="pyTwitchAPI EventSub")
async def __handle_challenge(self, request: 'web.Request', data: dict):
self.logger.debug(f'received challenge for subscription {data.get("subscription", {}).get("id")}')
if not await self._verify_signature(request):
self.logger.warning('message signature is not matching! Discarding message')
return web.Response(status=403)
await self._activate_callback(data.get('subscription', {}).get('id'))
return web.Response(text=data.get('challenge'))
async def _handle_revokation(self, data):
sub_id: str = data.get('subscription', {}).get('id')
self.logger.debug(f'got revocation of subscription {sub_id} for reason {data.get("subscription").get("status")}')
if sub_id not in self._callbacks.keys():
self.logger.warning(f'unknown subscription {sub_id} got revoked. ignore')
return
self._callbacks.pop(sub_id)
if self.revokation_handler is not None and self._callback_loop is not None:
t = self._callback_loop.create_task(self.revokation_handler(data)) #type: ignore
t.add_done_callback(self._task_callback)
async def __handle_callback(self, request: 'web.Request'):
try:
data: dict = await request.json()
except JSONDecodeError:
self.logger.error('got request with malformed body! Discarding message')
return web.Response(status=400)
if data.get('challenge') is not None:
return await self.__handle_challenge(request, data)
sub_id = data.get('subscription', {}).get('id')
callback = self._callbacks.get(sub_id)
if callback is None:
self.logger.error(f'received event for unknown subscription with ID {sub_id}')
else:
if not await self._verify_signature(request):
self.logger.warning('message signature is not matching! Discarding message')
return web.Response(status=403)
msg_type = request.headers['Twitch-Eventsub-Message-Type']
if msg_type.lower() == 'revocation':
await self._handle_revokation(data)
else:
msg_id = request.headers.get('Twitch-Eventsub-Message-Id')
if msg_id is not None and msg_id in self._msg_id_history:
self.logger.warning(f'got message with duplicate id {msg_id}! Discarding message')
else:
self._msg_id_history.append(msg_id)
data['metadata'] = {
'message_id': msg_id,
'message_type': msg_type,
'message_timestamp': request.headers['Twitch-Eventsub-Message-Timestamp'],
'subscription_type': request.headers['Twitch-Eventsub-Subscription-Type'],
'subscription_version': request.headers['Twitch-Eventsub-Subscription-Version'],
}
dat = callback['event'](**data)
if self._callback_loop is not None:
t = self._callback_loop.create_task(callback['callback'](dat))
t.add_done_callback(self._task_callback)
return web.Response(status=200)
|