1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
#!/usr/bin/env ruby
$stdout.sync = true
require 'sinatra/base'
require 'webrick'
require 'jaeger/client'
require 'net/http'
require 'uri'
class HealthServer < Sinatra::Application
get '/' do
status 200
end
end
class HttpServer < Sinatra::Application
post '/start_trace' do
puts "Got request to start trace: #{trace_request}"
parent_context = tracer.extract(OpenTracing::FORMAT_RACK, request.env)
server_span = tracer.start_span('/start_trace', child_of: parent_context)
server_span.set_baggage_item('crossdock-baggage-key', trace_request['baggage'])
if trace_request.key?('sampled')
server_span.set_tag('sampling.priority', trace_request['sampled'] ? 1 : 0)
end
response = {
span: observe_span(server_span),
notImplementedError: ''
}
if trace_request['downstream']
downstream = trace_request['downstream']
transport = downstream['transport']
response[:downstream] =
case transport
when 'HTTP'
call_downstream_http(downstream, server_span)
when 'DUMMY'
{ notImplementedError: 'Dummy has not been implemented' }
else
{ notImplementedError: "Unrecognized transport received: #{transport}" }
end
end
puts "Response: #{response}"
server_span.finish
body JSON.dump(response)
end
post '/join_trace' do
puts 'Got request to join trace' \
"\n Params: #{trace_request}" \
"\n Headers: #{request_headers(request)}"
parent_context = tracer.extract(OpenTracing::FORMAT_RACK, request.env)
server_span = tracer.start_span('/join_trace', child_of: parent_context)
response = {
span: observe_span(server_span),
notImplementedError: ''
}
if trace_request['downstream']
downstream = trace_request['downstream']
transport = downstream['transport']
response[:downstream] =
case transport
when 'HTTP'
call_downstream_http(downstream, server_span)
when 'DUMMY'
{ notImplementedError: 'Dummy has not been implemented' }
else
{ notImplementedError: "Unrecognized transport received: #{transport}" }
end
end
puts "Response: #{response}"
server_span.finish
body JSON.dump(response)
end
post '/create_traces' do
puts "Got request to create traces: #{trace_request}"
trace_request['count'].times do
span = tracer.start_span(trace_request['operation'], tags: trace_request['tags'])
span.finish
end
status 200
end
private
def tracer
@tracer ||= Jaeger::Client.build(
service_name: 'crossdock-ruby',
host: 'jaeger-agent',
port: 6831,
flush_interval: 1,
sampler: Jaeger::Samplers::Const.new(true)
)
end
def trace_request
@trace_request ||= begin
request.body.rewind
JSON.parse(request.body.read)
end
end
def observe_span(span)
if span
{
traceId: span.context.to_trace_id,
sampled: span.context.sampled?,
baggage: span.get_baggage_item('crossdock-baggage-key')
}
else
{
traceId: 'no span found',
sampled: false,
baggage: 'no span found'
}
end
end
def call_downstream_http(downstream, server_span)
downstream_url = "http://#{downstream['host']}:#{downstream['port']}/join_trace"
client_span = tracer.start_span('client-span', child_of: server_span)
headers = { 'Content-Type' => 'application/json' }
tracer.inject(client_span.context, OpenTracing::FORMAT_RACK, headers)
response = Net::HTTP.post(
URI(downstream_url),
JSON.dump(
serverRole: downstream['serverRole'],
downstream: downstream['downstream']
),
headers
)
client_span.finish
if response.is_a?(Net::HTTPSuccess)
JSON.parse(response.body)
else
{ error: response.body }
end
end
def request_headers(request)
request.env.select do |key, _value|
key.start_with?('HTTP_')
end
end
end
threads = []
threads << Thread.new do
Rack::Handler::WEBrick.run(HealthServer, Port: 8080, Host: '0.0.0.0')
end
threads << Thread.new do
Rack::Handler::WEBrick.run(HttpServer, Port: 8081, Host: '0.0.0.0')
end
threads.each(&:join)
|