
|
#!/usr/bin/env ruby
$stdout.sync = true
require 'sinatra/base'
require 'webrick'
require 'jaeger/client'
require 'net/http'
require 'uri'
class HealthServer < Sinatra::Application
get '/' do
status 200
end
end
class HttpServer < Sinatra::Application
post '/start_trace' do
puts "Got request to start trace: #{trace_request}"
parent_context = tracer.extract(OpenTracing::FORMAT_RACK, request.env)
server_span = tracer.start_span('/start_trace', child_of: parent_context)
server_span.set_baggage_item('crossdock-baggage-key', trace_request['baggage'])
if trace_request.key?('sampled')
server_span.set_tag('sampling.priority', trace_request['sampled'] ? 1 : 0)
end
response = {
span: observe_span(server_span),
notImplementedError: ''
}
if trace_request['downstream']
downstream = trace_request['downstream']
transport = downstream['transport']
response[:downstream] =
case transport
when 'HTTP'
call_downstream_http(downstream, server_span)
when 'DUMMY'
{ notImplementedError: 'Dummy has not been implemented' }
else
{ notImplementedError: "Unrecognized transport received: #{transport}" }
end
end
puts "Response: #{response}"
server_span.finish
body JSON.dump(response)
end
post '/join_trace' do
puts 'Got request to join trace' \
"\n Params: #{trace_request}" \
"\n Headers: #{request_headers(request)}"
parent_context = tracer.extract(OpenTracing::FORMAT_RACK, request.env)
server_span = tracer.start_span('/join_trace', child_of: parent_context)
response = {
span: observe_span(server_span),
notImplementedError: ''
}
if trace_request['downstream']
downstream = trace_request['downstream']
transport = downstream['transport']
response[:downstream] =
case transport
when 'HTTP'
call_downstream_http(downstream, server_span)
when 'DUMMY'
{ notImplementedError: 'Dummy has not been implemented' }
else
{ notImplementedError: "Unrecognized transport received: #{transport}" }
end
end
puts "Response: #{response}"
server_span.finish
body JSON.dump(response)
end
post '/create_traces' do
puts "Got request to create traces: #{trace_request}"
trace_request['count'].times do
span = tracer.start_span(trace_request['operation'], tags: trace_request['tags'])
span.finish
end
status 200
end
private
def tracer
@tracer ||= Jaeger::Client.build(
service_name: 'crossdock-ruby',
host: 'jaeger-agent',
port: 6831,
flush_interval: 1,
sampler: Jaeger::Samplers::Const.new(true)
)
end
def trace_request
@trace_request ||= begin
request.body.rewind
JSON.parse(request.body.read)
end
end
def observe_span(span)
if span
{
traceId: span.context.to_trace_id,
sampled: span.context.sampled?,
baggage: span.get_baggage_item('crossdock-baggage-key')
}
else
{
traceId: 'no span found',
sampled: false,
baggage: 'no span found'
}
end
end
def call_downstream_http(downstream, server_span)
downstream_url = "http://#{downstream['host']}:#{downstream['port']}/join_trace"
client_span = tracer.start_span('client-span', child_of: server_span)
headers = { 'Content-Type' => 'application/json' }
tracer.inject(client_span.context, OpenTracing::FORMAT_RACK, headers)
response = Net::HTTP.post(
URI(downstream_url),
JSON.dump(
serverRole: downstream['serverRole'],
downstream: downstream['downstream']
),
headers
)
client_span.finish
if response.is_a?(Net::HTTPSuccess)
JSON.parse(response.body)
else
{ error: response.body }
end
end
def request_headers(request)
request.env.select do |key, _value|
key.start_with?('HTTP_')
end
end
end
threads = []
threads << Thread.new do
Rack::Handler::WEBrick.run(HealthServer, Port: 8080, Host: '0.0.0.0')
end
threads << Thread.new do
Rack::Handler::WEBrick.run(HttpServer, Port: 8081, Host: '0.0.0.0')
end
threads.each(&:join)
|