File: __init__.py

package info (click to toggle)
python-thriftpy 0.3.9%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 560 kB
  • sloc: python: 3,287; ansic: 30; makefile: 7
file content (201 lines) | stat: -rw-r--r-- 6,282 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# -*- coding: utf-8 -*-

"""
Tracking support similar to twitter finagle-thrift.

Note: When using tracking, every client should have a corresponding
server processor.
"""

from __future__ import absolute_import

import os.path
import time

from ...thrift import TClient, TApplicationException, TMessageType, \
    TProcessor, TType
from ...parser import load

track_method = "__thriftpy_tracing_method_name__v2"
track_thrift = load(os.path.join(os.path.dirname(__file__), "tracking.thrift"))


__all__ = ["TTrackedClient", "TTrackedProcessor", "TrackerBase",
           "ConsoleTracker"]


class RequestInfo(object):
    def __init__(self, request_id, api, seq, client, server, status, start,
                 end, annotation, meta):
        """Used to store call info.

        :request_id: used to identity a request
        :api: api name
        :seq: sequence number
        :client: client name
        :server: server name
        :status: request status
        :start: start timestamp
        :end: end timestamp
        :annotation: application-level key-value datas
        """
        self.request_id = request_id
        self.api = api
        self.seq = seq
        self.client = client
        self.server = server
        self.status = status
        self.start = start
        self.end = end
        self.annotation = annotation
        self.meta = meta


class TTrackedClient(TClient):
    def __init__(self, tracker_handler, *args, **kwargs):
        super(TTrackedClient, self).__init__(*args, **kwargs)

        self.tracker = tracker_handler
        self._upgraded = False

        try:
            self._negotiation()
            self._upgraded = True
        except TApplicationException as e:
            if e.type != TApplicationException.UNKNOWN_METHOD:
                raise

    def _negotiation(self):
        self._oprot.write_message_begin(track_method, TMessageType.CALL,
                                        self._seqid)
        args = track_thrift.UpgradeArgs()
        self.tracker.init_handshake_info(args)
        args.write(self._oprot)
        self._oprot.write_message_end()
        self._oprot.trans.flush()

        api, msg_type, seqid = self._iprot.read_message_begin()
        if msg_type == TMessageType.EXCEPTION:
            x = TApplicationException()
            x.read(self._iprot)
            self._iprot.read_message_end()
            raise x
        else:
            result = track_thrift.UpgradeReply()
            result.read(self._iprot)
            self._iprot.read_message_end()

    def _send(self, _api, **kwargs):
        if self._upgraded:
            self._header = track_thrift.RequestHeader()
            self.tracker.gen_header(self._header)
            self._header.write(self._oprot)

        self.send_start = int(time.time() * 1000)
        super(TTrackedClient, self)._send(_api, **kwargs)

    def _req(self, _api, *args, **kwargs):
        if not self._upgraded:
            return super(TTrackedClient, self)._req(_api, *args, **kwargs)

        exception = None
        status = False

        try:
            res = super(TTrackedClient, self)._req(_api, *args, **kwargs)
            status = True
            return res
        except BaseException as e:
            exception = e
            raise
        finally:
            header_info = RequestInfo(
                request_id=self._header.request_id,
                seq=self._header.seq,
                client=self.tracker.client,
                server=self.tracker.server,
                api=_api,
                status=status,
                start=self.send_start,
                end=int(time.time() * 1000),
                annotation=self.tracker.annotation,
                meta=self._header.meta,
            )
            self.tracker.record(header_info, exception)


class TTrackedProcessor(TProcessor):
    def __init__(self, tracker_handler, *args, **kwargs):
        super(TTrackedProcessor, self).__init__(*args, **kwargs)

        self.tracker = tracker_handler
        self._upgraded = False

    def process(self, iprot, oprot):
        if not self._upgraded:
            res = self._try_upgrade(iprot)
        else:
            request_header = track_thrift.RequestHeader()
            request_header.read(iprot)
            self.tracker.handle(request_header)
            res = super(TTrackedProcessor, self).process_in(iprot)

        self._do_process(iprot, oprot, *res)

    def _try_upgrade(self, iprot):
        api, msg_type, seqid = iprot.read_message_begin()
        if msg_type == TMessageType.CALL and api == track_method:
            self._upgraded = True

            args = track_thrift.UpgradeArgs()
            args.read(iprot)
            self.tracker.handle_handshake_info(args)
            result = track_thrift.UpgradeReply()
            result.oneway = False

            def call():
                pass
            iprot.read_message_end()
        else:
            result, call = self._process_in(api, iprot)

        return api, seqid, result, call

    def _process_in(self, api, iprot):
        if api not in self._service.thrift_services:
            iprot.skip(TType.STRUCT)
            iprot.read_message_end()
            return TApplicationException(
                TApplicationException.UNKNOWN_METHOD), None

        args = getattr(self._service, api + "_args")()
        args.read(iprot)
        iprot.read_message_end()
        result = getattr(self._service, api + "_result")()

        # convert kwargs to args
        api_args = [args.thrift_spec[k][1]
                    for k in sorted(args.thrift_spec)]

        def call():
            return getattr(self._handler, api)(
                *(args.__dict__[k] for k in api_args)
            )

        return result, call

    def _do_process(self, iprot, oprot, api, seqid, result, call):
        if isinstance(result, TApplicationException):
            return self.send_exception(oprot, api, result, seqid)

        try:
            result.success = call()
        except Exception as e:
            # raise if api don't have throws
            self.handle_exception(e, result)

        if not result.oneway:
            self.send_result(oprot, api, result, seqid)


from .tracker import TrackerBase, ConsoleTracker  # noqa