File: stream_generator.rb

package info (click to toggle)
ruby-pluggaloid 1.7.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 260 kB
  • sloc: ruby: 1,752; sh: 4; makefile: 2
file content (88 lines) | stat: -rw-r--r-- 1,984 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# -*- coding: utf-8 -*-
require 'securerandom'
require 'set'

class Pluggaloid::StreamGenerator < Pluggaloid::Handler
  attr_reader :accepted_hash

  def initialize(event, *specs, plugin:, **kwrest, &callback)
    raise Pluggaloid::UndefinedStreamIndexError, 'To call generate(%{event}), it must define prototype arguments include `Pluggaloid::STREAM\'.' % {event: event.name} unless event.stream_index
    super(event, **kwrest)
    @callback = callback
    @specs = specs.freeze
    @accepted_hash = @event.argument_hash(specs, nil)
    @last_subscribe_state = @event.subscribe?(*@specs)
    @plugin = plugin
    subscribe_start if @last_subscribe_state
    @event.register_stream_generator(self)
  end

  def on_subscribed
    if !@last_subscribe_state
      @last_subscribe_state = true
      subscribe_start
    end
  end

  def on_unsubscribed
    subscribe_state = @event.subscribe_hash?(@accepted_hash)
    if @last_subscribe_state && !subscribe_state
      @last_subscribe_state = false
      subscribe_stop
    end
  end

  # このリスナを削除する
  # ==== Return
  # self
  def detach
    @event.delete_stream_generator(self)
    @yielder&.die
    @yielder = nil
    self
  end

  private

  def subscribe_start
    @tag = @plugin.handler_tag do
      @yielder = Yielder.new(@event, args: @specs)
      @callback.call(@yielder)
    end
  end

  def subscribe_stop
    @plugin.detach(@tag)
    @yielder.die
    @yielder = nil
  end

  class Yielder
    def initialize(event, args:)
      @event = event
      @args = args.freeze
      @alive = true
    end

    def bulk_add(lst)
      raise Pluggaloid::NoReceiverError, "All event listener of #{self.class} already detached." if die?
      args = @args.dup
      args.insert(@event.stream_index, lst)
      @event.call(*args)
    end

    def add(value)
      bulk_add([value])
    end
    alias_method :<<, :add

    def die?
      !@alive
    end

    def die
      @alive = false
      freeze
    end
  end
end