File: streaming.rb

package info (click to toggle)
mikutter 3.8.6%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 10,544 kB
  • sloc: ruby: 20,548; sh: 99; makefile: 19
file content (91 lines) | stat: -rw-r--r-- 2,901 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# -*- coding: utf-8 -*-

require 'set'

Plugin.create :streaming do
  thread = nil
  @fail_count = @wait_time = 0
  reconnect_request_flag = false

  on_filter_stream_force_retry do
    if UserConfig[:filter_realtime_rewind]
      thread.kill rescue nil if thread
      thread = start end end

  on_filter_stream_reconnect_request do
    if not reconnect_request_flag
      reconnect_request_flag = true
      Reserver.new(30, thread: Delayer) {
        reconnect_request_flag = false
        Plugin.call(:filter_stream_force_retry) } end end

  def start
    twitter = Enumerator.new{|y|
      Plugin.filtering(:worlds, y)
    }.find{|world|
      world.class.slug == :twitter
    }
    return unless twitter
    @success_flag = false
    @fail = MikuTwitter::StreamingFailedActions.new("Filter Stream", self)
    Thread.new{
      loop{
        begin
          follow = Plugin.filtering(:filter_stream_follow, Set.new).first || Set.new
          track = Plugin.filtering(:filter_stream_track, "").first || ""
          if follow.empty? && track.empty?
            sleep(60)
          else
            param = {}
            param[:follow] = follow.to_a[0, 5000].map(&:id).join(',') if not follow.empty?
            param[:track] = track if not track.empty?
            r = twitter.streaming(:filter_stream, param){ |json|
              json.strip!
              case json
              when /\A\{.*\}\Z/
                if @success_flag
                  @fail.success
                  @success_flag = true end
                parsed = JSON.parse(json).symbolize
                if not parsed[:retweeted_status]
                  MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(parsed) rescue nil end
              end }
            raise r if r.is_a? Exception
            notice "filter stream: disconnected #{r}"
            streamerror r
          end
        rescue Net::HTTPError => exception
          warn "filter stream: disconnected: #{exception.code} #{exception.body}"
          streamerror exception
          warn exception
        rescue Net::ReadTimeout => exception
          streamerror exception
        rescue Exception => exception
          warn "filter stream: disconnected: exception #{exception}"
          streamerror exception
          warn exception end
        notice "retry wait #{@fail.wait_time}, fail_count #{@fail.fail_count}"
        sleep @fail.wait_time } }
  end

  def streamerror(exception)
    @success_flag = false
    @fail.notify(exception) end

  on_userconfig_modify do |key, new_val|
    next if key != :filter_realtime_rewind
    if new_val
      notice 'filter stream: enable'
      thread = start unless thread.is_a? Thread
    else
      notice 'filter stream: disable'
      thread.kill if thread.is_a? Thread
      thread = nil
    end
  end

  Delayer.new do
    thread = start if UserConfig[:filter_realtime_rewind]
  end

end