File: future.rb

package info (click to toggle)
ruby-celluloid 0.18.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 848 kB
  • sloc: ruby: 7,579; makefile: 10
file content (144 lines) | stat: -rw-r--r-- 3,383 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
module Celluloid
  # Celluloid::Future objects allow methods and blocks to run in the
  # background, their values requested later
  class Future
    def self.new(*args, &block)
      return super unless block

      future = new
      # task = Thread.current[:celluloid_task]
      # actor = Thread.current[:celluloid_actor]
      Internals::ThreadHandle.new(Celluloid.actor_system, :future) do
        begin
          # Thread.current[:celluloid_task] = task
          # Thread.current[:celluloid_actor] = actor
          call = Call::Sync.new(future, :call, args)
          call.dispatch(block)
        rescue
          # Exceptions in blocks will get raised when the value is retrieved
        end
      end
      future
    end

    attr_reader :address

    def initialize(&block)
      @address = Celluloid.uuid
      @mutex = Mutex.new
      @ready = false
      @result = nil
      @forwards = nil
      @cancelled = false

      if block
        @call = Call::Sync.new(self, :call, args)
        Celluloid.internal_pool.get do
          begin
            @call.dispatch(block)
          rescue
            # Exceptions in blocks will get raised when the value is retrieved
          end
        end
      else
        @call = nil
      end
    end

    # Execute the given method in future context
    def execute(receiver, method, args, block)
      @mutex.synchronize do
        raise "already calling" if @call
        @call = Call::Sync.new(self, method, args, block)
      end

      receiver << @call
    end

    # Check if this future has a value yet
    def ready?
      @ready
    end

    # Obtain the value for this Future
    def value(timeout = nil)
      ready = result = nil

      begin
        @mutex.lock

        if @ready
          ready = true
          result = @result
        else
          case @forwards
          when Array
            @forwards << Celluloid.mailbox
          when NilClass
            @forwards = Celluloid.mailbox
          else
            @forwards = [@forwards, Celluloid.mailbox]
          end
        end
      ensure
        @mutex.unlock
      end

      unless ready
        result = Celluloid.receive(timeout) do |msg|
          msg.is_a?(Future::Result) && msg.future == self
        end
      end

      if result
        result.respond_to?(:value) ? result.value : result
      else
        raise TimedOut, "Timed out"
      end
    end
    alias call value

    # Signal this future with the given result value
    def signal(value)
      return if @cancelled
      result = Result.new(value, self)

      @mutex.synchronize do
        raise "the future has already happened!" if @ready

        if @forwards
          @forwards.is_a?(Array) ? @forwards.each { |f| f << result } : @forwards << result
        end

        @result = result
        @ready = true
      end
    end
    alias << signal

    def cancel(error)
      response = Internals::Response::Error.new(@call, error)
      signal response
      @mutex.synchronize do
        @cancelled = true
      end
    end

    # Inspect this Celluloid::Future
    alias inspect to_s

    # Wrapper for result values to distinguish them in mailboxes
    class Result
      attr_reader :future

      def initialize(result, future)
        @result = result
        @future = future
      end

      def value
        @result.value
      end
    end
  end
end