File: promises.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (174 lines) | stat: -rw-r--r-- 5,810 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# TODO try stealing pool, each thread has it's own queue

require 'concurrent/promises'

module Concurrent
  module Promises

    class Future < AbstractEventFuture

      # @!macro warn.edge
      module ActorIntegration
        # Asks the actor with its value.
        # @return [Future] new future with the response form the actor
        def then_ask(actor)
          self.then(actor) { |v, a| a.ask_op(v) }.flat
        end
      end

      include ActorIntegration

      # @!macro warn.edge
      module FlatShortcuts

        # @return [Future]
        def then_flat_future(*args, &block)
          self.then(*args, &block).flat_future
        end

        alias_method :then_flat, :then_flat_future

        # @return [Future]
        def then_flat_future_on(executor, *args, &block)
          self.then_on(executor, *args, &block).flat_future
        end

        alias_method :then_flat_on, :then_flat_future_on

        # @return [Event]
        def then_flat_event(*args, &block)
          self.then(*args, &block).flat_event
        end

        # @return [Event]
        def then_flat_event_on(executor, *args, &block)
          self.then_on(executor, *args, &block).flat_event
        end
      end

      include FlatShortcuts
    end

    class Future < AbstractEventFuture
      # @!macro warn.edge
      module NewChannelIntegration

        # @param [Channel] channel to push to.
        # @return [Future] a future which is fulfilled after the message is pushed to the channel.
        #   May take a moment if the channel is full.
        def then_channel_push(channel)
          self.then(channel) { |value, ch| ch.push_op value }.flat_future
        end

      end

      include NewChannelIntegration
    end

    module FactoryMethods
      # @!macro promises.shortcut.on
      # @return [Future]
      # @!macro warn.edge
      def zip_futures_over(enumerable, &future_factory)
        zip_futures_over_on default_executor, enumerable, &future_factory
      end

      # Creates new future which is resolved after all the futures created by future_factory from
      # enumerable elements are resolved. Simplified it does:
      # `zip(*enumerable.map { |e| future e, &future_factory })`
      # @example
      #   # `#succ` calls are executed in parallel
      #   zip_futures_over_on(:io, [1, 2], &:succ).value! # => [2, 3]
      #
      # @!macro promises.param.default_executor
      # @param [Enumerable] enumerable
      # @yield a task to be executed in future
      # @yieldparam [Object] element from enumerable
      # @yieldreturn [Object] a value of the future
      # @return [Future]
      # @!macro warn.edge
      def zip_futures_over_on(default_executor, enumerable, &future_factory)
        # ZipFuturesPromise.new_blocked_by(futures_and_or_events, default_executor).future
        zip_futures_on(default_executor, *enumerable.map { |e| future e, &future_factory })
      end
    end

    module Resolvable
      include InternalStates

      # Reserves the event or future, if reserved others are prevented from resolving it.
      # Advanced feature.
      # Be careful about the order of reservation to avoid deadlocks,
      # the method blocks if the future or event is already reserved
      # until it is released or resolved.
      #
      # @example
      #   f = Concurrent::Promises.resolvable_future
      #   reserved = f.reserve
      #   Thread.new { f.resolve true, :val, nil } # fails
      #   f.resolve true, :val, nil, true if reserved # must be called only if reserved
      # @return [true, false] on successful reservation
      def reserve
        while true
          return true if compare_and_set_internal_state(PENDING, RESERVED)
          return false if resolved?
          # FIXME (pitr-ch 17-Jan-2019): sleep until given up or resolved instead of busy wait
          Thread.pass
        end
      end

      # @return [true, false] on successful release of the reservation
      def release
        compare_and_set_internal_state(RESERVED, PENDING)
      end

      # @return [Comparable] an item to sort the resolvable events or futures
      #   by to get the right global locking order of resolvable events or futures
      # @see .atomic_resolution
      def self.locking_order_by(resolvable)
        resolvable.object_id
      end

      # Resolves all passed events and futures to the given resolutions
      # if possible (all are unresolved) or none.
      #
      # @param [Hash{Resolvable=>resolve_arguments}, Array<Array(Resolvable, resolve_arguments)>] resolvable_map
      #   collection of resolvable events and futures which should be resolved all at once
      #   and what should they be resolved to, examples:
      #   ```ruby
      #   { a_resolvable_future1 => [true, :val, nil],
      #     a_resolvable_future2 => [false, nil, :err],
      #     a_resolvable_event => [] }
      #   ```
      #    or
      #   ```ruby
      #   [[a_resolvable_future1, [true, :val, nil]],
      #    [a_resolvable_future2, [false, nil, :err]],
      #    [a_resolvable_event, []]]
      #   ```
      # @return [true, false] if success
      def self.atomic_resolution(resolvable_map)
        # atomic_resolution event => [], future => [true, :v, nil]
        sorted = resolvable_map.to_a.sort_by { |resolvable, _| locking_order_by resolvable }

        reserved = 0
        while reserved < sorted.size && sorted[reserved].first.reserve
          reserved += 1
        end

        if reserved == sorted.size
          sorted.each { |resolvable, args| resolvable.resolve(*args, true, true) }
          true
        else
          while reserved > 0
            reserved -= 1
            raise 'has to be reserved' unless sorted[reserved].first.release
          end
          false
        end
      end
    end


  end
end