File: stream.rb

package info (click to toggle)
ruby-pluggaloid 1.7.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 260 kB
  • sloc: ruby: 1,752; sh: 4; makefile: 2
file content (92 lines) | stat: -rw-r--r-- 2,096 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# frozen_string_literal: true

module Pluggaloid
  class Stream
    include Enumerable

    def initialize(enumerator)
      @enumerator = enumerator
    end

    def throttle(sec)
      throttling = 0
      @enumerator.select do |item|
        r0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
        if throttling <= r0
          throttling = r0 + sec
        end
      end
    end

    def debounce(sec)
      throttling_promise = nil
      Stream.new(
        Enumerator.new do |yielder|
          @enumerator.each do |item|
            throttling_promise&.cancel
            throttling_promise = Delayer.new(delay: sec) do
              yielder << item
            end
          end
        end.lazy
      )
    end

    def buffer(sec)
      throttling_promise = nil
      buffer = []
      Stream.new(
        Enumerator.new do |yielder|
          @enumerator.each do |item|
            buffer << item
            throttling_promise ||= Delayer.new(delay: sec) do
              yielder << buffer.freeze
              buffer = []
              throttling_promise = nil
            end
          end
        end.lazy
      )
    end

    def merge(*streams)
      Stream.new(Merge.new(self, *streams).lazy)
    end

    (Enumerator.instance_methods - Enumerator.superclass.instance_methods).each do |method_name|
      define_method(method_name) do |*rest, **kwrest, &block|
        if kwrest.empty?
          r = @enumerator.__send__(method_name, *rest, &block)
        else
          r = @enumerator.__send__(method_name, *rest, **kwrest, &block)
        end
        if r.is_a?(Enumerator::Lazy)
          Pluggaloid::Stream.new(r)
        else
          r
        end
      end
    end

    class Merge
      include Enumerable

      def initialize(*sources)
        @sources = sources
      end

      def each(&block)
        fiber = Fiber.new do
          loop do
            block.call(Fiber.yield)
          end
        end
        fiber.resume
        @sources.each do |source|
          source.each(&fiber.method(:resume))
        end
        self
      end
    end
  end
end