File: concurrent_worker.rb

package info (click to toggle)
ruby-redis-cluster-client 0.13.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 224 kB
  • sloc: ruby: 2,498; makefile: 4
file content (84 lines) | stat: -rw-r--r-- 2,284 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# frozen_string_literal: true

require 'redis_client/cluster/concurrent_worker/on_demand'
require 'redis_client/cluster/concurrent_worker/pooled'
require 'redis_client/cluster/concurrent_worker/none'

class RedisClient
  class Cluster
    module ConcurrentWorker
      InvalidNumberOfTasks = Class.new(StandardError)

      class Group
        Task = Struct.new(
          'RedisClusterClientConcurrentWorkerTask',
          :id, :queue, :args, :kwargs, :block, :result,
          keyword_init: true
        ) do
          def exec
            self[:result] = block&.call(*args, **kwargs)
          rescue StandardError => e
            self[:result] = e
          ensure
            done
          end

          def done
            queue&.push(self)
          rescue ClosedQueueError
            # something was wrong
          end
        end

        def initialize(worker:, queue:, size:)
          @worker = worker
          @queue = queue
          @size = size
          @count = 0
        end

        def push(id, *args, **kwargs, &block)
          raise InvalidNumberOfTasks, "max size reached: #{@count}" if @count == @size

          task = Task.new(id: id, queue: @queue, args: args, kwargs: kwargs, block: block)
          @worker.push(task)
          @count += 1
          nil
        end

        def each
          raise InvalidNumberOfTasks, "expected: #{@size}, actual: #{@count}" if @count != @size

          @size.times do
            task = @queue.pop
            yield(task.id, task.result)
          end

          nil
        end

        def close
          @queue.clear
          @queue.close if @queue.respond_to?(:close)
          @count = 0
          nil
        end

        def inspect
          "#<#{self.class.name} size: #{@count}, max: #{@size}, worker: #{@worker.class.name}>"
        end
      end

      module_function

      def create(model: :none, size: 5)
        case model
        when :none then ::RedisClient::Cluster::ConcurrentWorker::None.new
        when :on_demand then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size)
        when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size)
        else raise ArgumentError, "unknown model: #{model}"
        end
      end
    end
  end
end