File: remote_controlled.rb

package info (click to toggle)
ruby-jaeger-client 1.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 624 kB
  • sloc: ruby: 3,381; makefile: 6; sh: 4
file content (119 lines) | stat: -rw-r--r-- 4,102 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# frozen_string_literal: true

require_relative 'remote_controlled/instructions_fetcher'

module Jaeger
  module Samplers
    class RemoteControlled
      DEFAULT_REFRESH_INTERVAL = 60
      DEFAULT_SAMPLING_HOST = 'localhost'
      DEFAULT_SAMPLING_PORT = 5778

      attr_reader :sampler

      def initialize(opts = {})
        @sampler = opts.fetch(:sampler, Probabilistic.new)
        @logger = opts.fetch(:logger, Logger.new($stdout))

        @poll_executor = opts[:poll_executor] || begin
          refresh_interval = opts.fetch(:refresh_interval, DEFAULT_REFRESH_INTERVAL)
          RecurringExecutor.new(interval: refresh_interval)
        end

        @instructions_fetcher = opts[:instructions_fetcher] || begin
          service_name = opts.fetch(:service_name)
          host = opts.fetch(:host, DEFAULT_SAMPLING_HOST)
          port = opts.fetch(:port, DEFAULT_SAMPLING_PORT)
          InstructionsFetcher.new(host: host, port: port, service_name: service_name)
        end
      end

      def sample(*args)
        @poll_executor.start(&method(:poll)) unless @poll_executor.running?

        @sampler.sample(*args)
      end

      def poll
        @logger.debug 'Fetching sampling strategy'

        instructions = @instructions_fetcher.fetch
        handle_instructions(instructions)
      rescue InstructionsFetcher::FetchFailed => e
        @logger.warn "Fetching sampling strategy failed: #{e.message}"
      end

      private

      def handle_instructions(instructions)
        if instructions['operationSampling']
          update_per_operation_sampler(instructions['operationSampling'])
        else
          update_rate_limiting_or_probabilistic_sampler(instructions['strategyType'], instructions)
        end
      end

      def update_per_operation_sampler(instructions)
        strategies = normalize(instructions)

        if @sampler.is_a?(PerOperation)
          @sampler.update(strategies: strategies)
        else
          @sampler = PerOperation.new(strategies: strategies, max_operations: 2000)
        end
      end

      def normalize(instructions)
        {
          default_sampling_probability: instructions['defaultSamplingProbability'],
          default_lower_bound_traces_per_second: instructions['defaultLowerBoundTracesPerSecond'],
          per_operation_strategies: instructions['perOperationStrategies'].map do |strategy|
            {
              operation: strategy['operation'],
              probabilistic_sampling: {
                sampling_rate: strategy['probabilisticSampling']['samplingRate']
              }
            }
          end
        }
      end

      def update_rate_limiting_or_probabilistic_sampler(strategy, instructions)
        case strategy
        when 'PROBABILISTIC'
          update_probabilistic_strategy(instructions['probabilisticSampling'])
        when 'RATE_LIMITING'
          update_rate_limiting_strategy(instructions['rateLimitingSampling'])
        else
          @logger.warn "Unknown sampling strategy #{strategy}"
        end
      end

      def update_probabilistic_strategy(instructions)
        rate = instructions['samplingRate']
        return unless rate

        if @sampler.is_a?(Probabilistic)
          @sampler.update(rate: rate)
          @logger.info "Updated Probabilistic sampler (rate=#{rate})"
        else
          @sampler = Probabilistic.new(rate: rate)
          @logger.info "Updated sampler to Probabilistic (rate=#{rate})"
        end
      end

      def update_rate_limiting_strategy(instructions)
        max_traces_per_second = instructions['maxTracesPerSecond']
        return unless max_traces_per_second

        if @sampler.is_a?(RateLimiting)
          @sampler.update(max_traces_per_second: max_traces_per_second)
          @logger.info "Updated Ratelimiting sampler (max_traces_per_second=#{max_traces_per_second})"
        else
          @sampler = RateLimiting.new(max_traces_per_second: max_traces_per_second)
          @logger.info "Updated sampler to Ratelimiting (max_traces_per_second=#{max_traces_per_second})"
        end
      end
    end
  end
end