File: worker.rb

package info (click to toggle)
ruby-delayer-deferred 2.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 308 kB
  • sloc: ruby: 1,650; sh: 4; makefile: 2
file content (112 lines) | stat: -rw-r--r-- 3,468 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# -*- coding: utf-8 -*-
require "delayer/deferred/request"
require "delayer/deferred/response"

module Delayer::Deferred
=begin rdoc
Deferredを実行するためのWorker。Deferredチェインを実行するFiberを
管理する。

== pushに渡すオブジェクトについて
Worker#push に渡す引数は、activateメソッドを実装している必要がある。

=== activate(response)
==== Args
response :: Delayer::Deferred::Response::Base Deferredに渡す値
==== Returns
[Delayer::Deferred::Response::Base]
  これを返すと、値の自動変換が行われないため、意図的に失敗させたり、Deferredを次のブロックに伝搬させることができる。
[Delayer::Deferred::Chainable]
  戻り値のDeferredが終わるまでWorkerの処理を停止する。
  再開された時、結果は戻り値のDeferredの結果に置き換えられる。
[else]
  _Delayer::Deferred::Response::Ok.new_ の引数に渡され、その結果が利用される
=end
  class Worker
    def initialize(delayer:, initial:)
      @delayer, @initial = delayer, initial
    end

    def push(deferred)
      deferred.reserve_activate
      @delayer.new do
        next if deferred.spoiled?
        begin
          fiber.resume(deferred).accept_request(worker: self,
                                                deferred: deferred)
        rescue Delayer::Deferred::SequenceError => err
          err.deferred = deferred
          raise
        end
      end
      nil
    end

    # Awaitから復帰した時に呼ばれる。
    # ==== Args
    # [response] Awaitの結果(Delayer::Deferred::Response::Base)
    # [deferred] 現在実行中のDeferred
    def give_response(response, deferred)
      @delayer.new do
        next if deferred.spoiled?
        deferred.exit_await
        fiber.resume(response).accept_request(worker: self,
                                              deferred: deferred)
      end
      nil
    end

    # Tools#pass から復帰した時に呼ばれる。
    # ==== Args
    # [deferred] 現在実行中のDeferred
    def resume_pass(deferred)
      deferred.exit_pass
      @delayer.new do
        next if deferred.spoiled?
        fiber.resume(nil).accept_request(worker: self,
                                         deferred: deferred)
      end
    end

    private

    def fiber
      @fiber ||= Fiber.new{|response|
        loop do
          response = wait_and_activate(response)
          case response.value
          when Delayer::Deferred::SequenceError
            raise response.value
          end
        end
      }.tap{|f| f.resume(@initial); @initial = nil }
    end

    def wait_and_activate(argument)
      response = catch(:success) do
        failed = catch(:__deferredable_fail) do
          begin
            if argument.value.is_a? Deferredable::Awaitable
              throw :success, +argument.value
            else
              defer = Fiber.yield(Request::NEXT_WORKER)
              res = defer.activate(argument)
              if res.is_a? Delayer::Deferred::Deferredable::Awaitable
                defer.add_awaited(res)
              end
            end
            throw :success, res
          rescue Exception => err
            throw :__deferredable_fail, err
          end
        end
        Response::Ng.new(failed)
      end
      if response.is_a?(Response::Base)
        response
      else
        Response::Ok.new(response)
      end
    end
  end
end