File: decorators.py

package info (click to toggle)
python-asyncmy 0.2.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 676 kB
  • sloc: python: 3,528; makefile: 40
file content (31 lines) | stat: -rw-r--r-- 807 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import functools
from asyncio import iscoroutine

from benchmark import conn_mysqlclient, data, sql


def cleanup(f):
    @functools.wraps(f)
    def decorator(*args, **kwargs):
        cur = conn_mysqlclient.cursor()
        cur.execute("truncate table test.asyncmy")
        if iscoroutine(f):
            return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
        else:
            return f(*args, **kwargs)

    return decorator


def fill_data(f):
    @functools.wraps(f)
    def decorator(*args, **kwargs):
        cur = conn_mysqlclient.cursor()
        cur.executemany(sql, data)
        if iscoroutine(f):
            return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
        else:
            return f(*args, **kwargs)

    return decorator