1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
# A OpenTraced client for a Python service that implements the store interface.
from __future__ import print_function
import time
import argparse
from builtins import input, range
import grpc
from jaeger_client import Config
from grpc_opentracing import open_tracing_client_interceptor, \
SpanDecorator
from grpc_opentracing.grpcext import intercept_channel
import store_pb2
class CommandExecuter(object):
def __init__(self, stub):
self._stub = stub
def _execute_rpc(self, method, via, timeout, request_or_iterator):
if via == 'future':
result = getattr(self._stub, method).future(request_or_iterator,
timeout)
return result.result()
elif via == 'with_call':
return getattr(self._stub, method).with_call(request_or_iterator,
timeout)[0]
else:
return getattr(self._stub, method)(request_or_iterator, timeout)
def do_stock_item(self, via, timeout, arguments):
if len(arguments) != 1:
print('must input a single item')
return
request = store_pb2.AddItemRequest(name=arguments[0])
self._execute_rpc('AddItem', via, timeout, request)
def do_stock_items(self, via, timeout, arguments):
if not arguments:
print('must input at least one item')
return
requests = [store_pb2.AddItemRequest(name=name) for name in arguments]
self._execute_rpc('AddItems', via, timeout, iter(requests))
def do_sell_item(self, via, timeout, arguments):
if len(arguments) != 1:
print('must input a single item')
return
request = store_pb2.RemoveItemRequest(name=arguments[0])
response = self._execute_rpc('RemoveItem', via, timeout, request)
if not response.was_successful:
print('unable to sell')
def do_sell_items(self, via, timeout, arguments):
if not arguments:
print('must input at least one item')
return
requests = [
store_pb2.RemoveItemRequest(name=name) for name in arguments
]
response = self._execute_rpc('RemoveItems', via, timeout,
iter(requests))
if not response.was_successful:
print('unable to sell')
def do_inventory(self, via, timeout, arguments):
if arguments:
print('inventory does not take any arguments')
return
if via != 'functor':
print('inventory can only be called via functor')
return
request = store_pb2.Empty()
result = self._execute_rpc('ListInventory', via, timeout, request)
for query in result:
print(query.name, '\t', query.count)
def do_query_item(self, via, timeout, arguments):
if len(arguments) != 1:
print('must input a single item')
return
request = store_pb2.QueryItemRequest(name=arguments[0])
query = self._execute_rpc('QueryQuantity', via, timeout, request)
print(query.name, '\t', query.count)
def do_query_items(self, via, timeout, arguments):
if not arguments:
print('must input at least one item')
return
if via != 'functor':
print('query_items can only be called via functor')
return
requests = [store_pb2.QueryItemRequest(name=name) for name in arguments]
result = self._execute_rpc('QueryQuantities', via, timeout,
iter(requests))
for query in result:
print(query.name, '\t', query.count)
def execute_command(command_executer, command, arguments):
via = 'functor'
timeout = None
for argument_index in range(0, len(arguments), 2):
argument = arguments[argument_index]
if argument == '--via' and argument_index + 1 < len(arguments):
if via not in ('functor', 'with_call', 'future'):
print('invalid --via option')
return
via = arguments[argument_index + 1]
elif argument == '--timeout' and argument_index + 1 < len(arguments):
timeout = float(arguments[argument_index + 1])
else:
arguments = arguments[argument_index:]
break
try:
getattr(command_executer, 'do_' + command)(via, timeout, arguments)
except AttributeError:
print('unknown command: \"%s\"' % command)
INSTRUCTIONS = \
"""Enter commands to interact with the store service:
stock_item Stock a single item.
stock_items Stock one or more items.
sell_item Sell a single item.
sell_items Sell one or more items.
inventory List the store's inventory.
query_item Query the inventory for a single item.
query_items Query the inventory for one or more items.
You can also optionally provide a --via argument to instruct the RPC to be
initiated via either the functor, with_call, or future method; or provide a
--timeout argument to set a deadline for the RPC to be completed.
Example:
> stock_item apple
> stock_items --via future apple milk
> inventory
apple 2
milk 1
"""
def read_and_execute(command_executer):
print(INSTRUCTIONS)
while True:
try:
line = input('> ')
components = line.split()
if not components:
continue
command = components[0]
arguments = components[1:]
execute_command(command_executer, command, arguments)
except EOFError:
break
class StoreSpanDecorator(SpanDecorator):
def __call__(self, span, rpc_info):
span.set_tag('grpc.method', rpc_info.full_method)
span.set_tag('grpc.headers', str(rpc_info.metadata))
span.set_tag('grpc.deadline', str(rpc_info.timeout))
def run():
parser = argparse.ArgumentParser()
parser.add_argument(
'--log_payloads',
action='store_true',
help='log request/response objects to open-tracing spans')
parser.add_argument(
'--include_grpc_tags',
action='store_true',
help='set gRPC-specific tags on spans')
args = parser.parse_args()
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name='store-client')
tracer = config.initialize_tracer()
span_decorator = None
if args.include_grpc_tags:
span_decorator = StoreSpanDecorator()
tracer_interceptor = open_tracing_client_interceptor(
tracer, log_payloads=args.log_payloads, span_decorator=span_decorator)
channel = grpc.insecure_channel('localhost:50051')
channel = intercept_channel(channel, tracer_interceptor)
stub = store_pb2.StoreStub(channel)
read_and_execute(CommandExecuter(stub))
time.sleep(2)
tracer.close()
time.sleep(2)
if __name__ == '__main__':
run()
|