File: store_server.py

package info (click to toggle)
golang-github-grpc-ecosystem-grpc-opentracing 0.0~git20180507.8e809c8-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-proposed-updates, bullseye
  • size: 576 kB
  • sloc: python: 2,021; java: 1,077; makefile: 2
file content (122 lines) | stat: -rw-r--r-- 3,747 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# A OpenTraced server for a Python service that implements the store interface.
from __future__ import print_function

import time
import argparse
from collections import defaultdict

from six import iteritems

import grpc
from concurrent import futures
from jaeger_client import Config

from grpc_opentracing import open_tracing_server_interceptor, \
                             SpanDecorator
from grpc_opentracing.grpcext import intercept_server

import store_pb2

_ONE_DAY_IN_SECONDS = 60 * 60 * 24


class Store(store_pb2.StoreServicer):

    def __init__(self):
        self._inventory = defaultdict(int)

    def AddItem(self, request, context):
        self._inventory[request.name] += 1
        return store_pb2.Empty()

    def AddItems(self, request_iter, context):
        for request in request_iter:
            self._inventory[request.name] += 1
        return store_pb2.Empty()

    def RemoveItem(self, request, context):
        new_quantity = self._inventory[request.name] - 1
        if new_quantity < 0:
            return store_pb2.RemoveItemResponse(was_successful=False)
        self._inventory[request.name] = new_quantity
        return store_pb2.RemoveItemResponse(was_successful=True)

    def RemoveItems(self, request_iter, context):
        response = store_pb2.RemoveItemResponse(was_successful=True)
        for request in request_iter:
            response = self.RemoveItem(request, context)
            if not response.was_successful:
                break
        return response

    def ListInventory(self, request, context):
        for name, count in iteritems(self._inventory):
            if not count:
                continue
            else:
                yield store_pb2.QuantityResponse(name=name, count=count)

    def QueryQuantity(self, request, context):
        count = self._inventory[request.name]
        return store_pb2.QuantityResponse(name=request.name, count=count)

    def QueryQuantities(self, request_iter, context):
        for request in request_iter:
            count = self._inventory[request.name]
            yield store_pb2.QuantityResponse(name=request.name, count=count)


class StoreSpanDecorator(SpanDecorator):

    def __call__(self, span, rpc_info):
        span.set_tag('grpc.method', rpc_info.full_method)
        span.set_tag('grpc.headers', str(rpc_info.metadata))
        span.set_tag('grpc.deadline', str(rpc_info.timeout))


def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--log_payloads',
        action='store_true',
        help='log request/response objects to open-tracing spans')
    parser.add_argument(
        '--include_grpc_tags',
        action='store_true',
        help='set gRPC-specific tags on spans')
    args = parser.parse_args()

    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name='store-server')
    tracer = config.initialize_tracer()
    span_decorator = None
    if args.include_grpc_tags:
        span_decorator = StoreSpanDecorator()
    tracer_interceptor = open_tracing_server_interceptor(
        tracer, log_payloads=args.log_payloads, span_decorator=span_decorator)
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    server = intercept_server(server, tracer_interceptor)

    store_pb2.add_StoreServicer_to_server(Store(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

    time.sleep(2)
    tracer.close()
    time.sleep(2)


if __name__ == '__main__':
    serve()