File: aiohttp_fetch.py

package info (click to toggle)
qasync 0.27.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 240 kB
  • sloc: python: 1,576; makefile: 10
file content (82 lines) | stat: -rw-r--r-- 2,113 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import asyncio
import sys

import aiohttp

# from PyQt6.QtWidgets import (
from PySide6.QtWidgets import (
    QApplication,
    QLabel,
    QLineEdit,
    QPushButton,
    QTextEdit,
    QVBoxLayout,
    QWidget,
)
from qasync import QEventLoop, asyncClose, asyncSlot


class MainWindow(QWidget):
    """Main window."""

    _DEF_URL: str = "https://jsonplaceholder.typicode.com/todos/1"
    """Default URL."""

    def __init__(self):
        super().__init__()

        self.setLayout(QVBoxLayout())

        self.lbl_status = QLabel("Idle", self)
        self.layout().addWidget(self.lbl_status)

        self.edit_url = QLineEdit(self._DEF_URL, self)
        self.layout().addWidget(self.edit_url)

        self.edit_response = QTextEdit("", self)
        self.layout().addWidget(self.edit_response)

        self.btn_fetch = QPushButton("Fetch", self)
        self.btn_fetch.clicked.connect(self.on_btn_fetch_clicked)
        self.layout().addWidget(self.btn_fetch)

        self.session: aiohttp.ClientSession

    @asyncClose
    async def closeEvent(self, event):  # noqa:N802
        await self.session.close()

    async def boot(self):
        self.session = aiohttp.ClientSession()

    @asyncSlot()
    async def on_btn_fetch_clicked(self):
        self.btn_fetch.setEnabled(False)
        self.lbl_status.setText("Fetching...")

        try:
            async with self.session.get(self.edit_url.text()) as r:
                self.edit_response.setText(await r.text())
        except Exception as exc:
            self.lbl_status.setText("Error: {}".format(exc))
        else:
            self.lbl_status.setText("Finished!")
        finally:
            self.btn_fetch.setEnabled(True)


if __name__ == "__main__":
    app = QApplication(sys.argv)

    event_loop = QEventLoop(app)
    asyncio.set_event_loop(event_loop)

    app_close_event = asyncio.Event()
    app.aboutToQuit.connect(app_close_event.set)
    
    main_window = MainWindow()
    main_window.show()

    event_loop.create_task(main_window.boot())
    event_loop.run_until_complete(app_close_event.wait())
    event_loop.close()