File: postgres.py

package info (click to toggle)
python-greenio 0.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 144 kB
  • ctags: 200
  • sloc: python: 988; makefile: 32
file content (133 lines) | stat: -rw-r--r-- 3,405 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
##
# Copyright (c) 2008-2012 Sprymix Inc.
# License: Apache 2.0
##


"""A non-blocking adapter for py-postgresql library.
WARNING: This is just an experiment; use in production is not recommended.
"""


import postgresql
from postgresql.python import socket as pg_socket
from postgresql.driver import pq3

from greenio import socket as greensocket


class SocketConnector:
    def create_socket_factory(self, **params):
        params['socket_extra'] = {'async': self.async}
        return SocketFactory(**params)

    def __init__(self, async=False):
        self.async = async


class IPConnector(pq3.IPConnector, SocketConnector):
    def __init__(self, *args, async=False, **kw):
        pq3.IPConnector.__init__(self, *args, **kw)
        SocketConnector.__init__(self, async=async)


class IP4(IPConnector, pq3.IP4):
    pass


class IP6(IPConnector, pq3.IP6):
    pass


class Host(SocketConnector, pq3.Host):
    def __init__(self, *args, async=False, **kw):
        pq3.Host.__init__(self, *args, **kw)
        SocketConnector.__init__(self, async=async)


class Unix(SocketConnector, pq3.Unix):
    def __init__(self, unix=None, async=False, **kw):
        pq3.Unix.__init__(self, unix=unix, **kw)
        SocketConnector.__init__(self, async=async)


class SocketFactory(pg_socket.SocketFactory):
    def __init__(self, *args, socket_extra=None, **kw):
        super().__init__(*args, **kw)
        self.async = (socket_extra.get('async', False)
                      if socket_extra else False)

    def __call__(self, timeout=None):
        if self.async:
            s = greensocket.socket(*self.socket_create)
            s.connect(self.socket_connect)
            return s
        else:
            return super().__call__(timeout)


class Driver(pq3.Driver):
    def ip4(self, **kw):
        return IP4(driver=self, **kw)

    def ip6(self, **kw):
        return IP6(driver=self, **kw)

    def host(self, **kw):
        return Host(driver=self, **kw)

    def unix(self, **kw):
        return Unix(driver=self, **kw)


driver = Driver()


def connector_factory(iri, async=False):
    params = postgresql.iri.parse(iri)
    settings = params.setdefault('settings', {})
    settings['standard_conforming_strings'] = 'on'
    params['async'] = async
    return driver.fit(**params)


if __name__ == '__main__':
    import greenio
    import time
    import asyncio

    @asyncio.coroutine
    def sleeper():
        # show that we're not blocked
        while True:
            yield from asyncio.sleep(0.4)
            print('.')

    @greenio.task
    def db():
        connection = connector_factory(
            'pq://postgres@localhost:5432', async=True)()
        connection.connect()

        try:
            print('>> sleeping')
            st = time.monotonic()
            connection.execute('SELECT pg_sleep(2)')
            en = time.monotonic() - st
            assert en >= 2
            print('<< sleeping {:.3f}s'.format(en))

            ps = connection.prepare('SELECT 42')
            print('"SELECT 42" -> {!r}'.format(ps()))
        finally:
            connection.close()

    @asyncio.coroutine
    def run():
        yield from asyncio.wait(
            [db(), sleeper()], return_when=asyncio.FIRST_COMPLETED)

    asyncio.set_event_loop_policy(greenio.GreenEventLoopPolicy())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()