1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
import asyncio
import sys
import aiohttp
# from PyQt6.QtWidgets import (
from PySide6.QtWidgets import (
QApplication,
QLabel,
QLineEdit,
QPushButton,
QTextEdit,
QVBoxLayout,
QWidget,
)
from qasync import QEventLoop, asyncClose, asyncSlot
class MainWindow(QWidget):
"""Main window."""
_DEF_URL: str = "https://jsonplaceholder.typicode.com/todos/1"
"""Default URL."""
def __init__(self):
super().__init__()
self.setLayout(QVBoxLayout())
self.lbl_status = QLabel("Idle", self)
self.layout().addWidget(self.lbl_status)
self.edit_url = QLineEdit(self._DEF_URL, self)
self.layout().addWidget(self.edit_url)
self.edit_response = QTextEdit("", self)
self.layout().addWidget(self.edit_response)
self.btn_fetch = QPushButton("Fetch", self)
self.btn_fetch.clicked.connect(self.on_btn_fetch_clicked)
self.layout().addWidget(self.btn_fetch)
self.session: aiohttp.ClientSession
@asyncClose
async def closeEvent(self, event): # noqa:N802
await self.session.close()
async def boot(self):
self.session = aiohttp.ClientSession()
@asyncSlot()
async def on_btn_fetch_clicked(self):
self.btn_fetch.setEnabled(False)
self.lbl_status.setText("Fetching...")
try:
async with self.session.get(self.edit_url.text()) as r:
self.edit_response.setText(await r.text())
except Exception as exc:
self.lbl_status.setText("Error: {}".format(exc))
else:
self.lbl_status.setText("Finished!")
finally:
self.btn_fetch.setEnabled(True)
if __name__ == "__main__":
app = QApplication(sys.argv)
event_loop = QEventLoop(app)
asyncio.set_event_loop(event_loop)
app_close_event = asyncio.Event()
app.aboutToQuit.connect(app_close_event.set)
main_window = MainWindow()
main_window.show()
event_loop.create_task(main_window.boot())
event_loop.run_until_complete(app_close_event.wait())
event_loop.close()
|