File: main.py

package info (click to toggle)
freenub 0.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,168 kB
  • sloc: python: 10,664; makefile: 7; sh: 6
file content (39 lines) | stat: -rw-r--r-- 1,066 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import logging

from fastapi import BackgroundTasks, FastAPI

import pubnub as pn
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_asyncio import PubNubAsyncio

app = FastAPI()

pnconfig = PNConfiguration()
pnconfig.publish_key = "demo"
pnconfig.subscribe_key = "demo"
pnconfig.uuid = "UUID-PUB"
CHANNEL = "the_guide"


pubnub = PubNubAsyncio(pnconfig)
pn.set_stream_logger("pubnub", logging.DEBUG)


async def write_notification(email: str, message=""):
    with open("/tmp/log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)

    await pubnub.publish().channel(CHANNEL).message(email).future()


@app.get("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}


@app.on_event("shutdown")
async def stop_pubnub():
    print("Closing Application")
    await pubnub.stop()