File: eventsource.rb

package info (click to toggle)
ruby-faye-websocket 0.11.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 236 kB
  • sloc: ruby: 1,209; makefile: 3
file content (119 lines) | stat: -rw-r--r-- 3,022 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
require File.expand_path('../websocket', __FILE__) unless defined?(Faye::WebSocket)

module Faye
  class EventSource

    include WebSocket::API::EventTarget
    attr_reader :env, :url, :ready_state

    DEFAULT_RETRY = 5

    def self.eventsource?(env)
      return false unless env['REQUEST_METHOD'] == 'GET'
      accept = (env['HTTP_ACCEPT'] || '').split(/\s*,\s*/)
      accept.include?('text/event-stream')
    end

    def self.determine_url(env)
      WebSocket.determine_url(env, ['https', 'http'])
    end

    def initialize(env, options = {})
      WebSocket.ensure_reactor_running
      super()

      @env    = env
      @ping   = options[:ping]
      @retry  = (options[:retry] || DEFAULT_RETRY).to_f
      @url    = EventSource.determine_url(env)
      @stream = Stream.new(self)

      @ready_state = WebSocket::API::CONNECTING

      headers = ::WebSocket::Driver::Headers.new
      if options[:headers]
        options[:headers].each { |k,v| headers[k] = v }
      end

      if callback = @env['async.callback']
        callback.call([101, {}, @stream])
      end

      @stream.write("HTTP/1.1 200 OK\r\n" +
                    "Content-Type: text/event-stream\r\n" +
                    "Cache-Control: no-cache, no-store\r\n" +
                    "Connection: close\r\n" +
                    headers.to_s +
                    "\r\n" +
                    "retry: #{ (@retry * 1000).floor }\r\n\r\n")

      EventMachine.next_tick { open }

      if @ping
        @ping_timer = EventMachine.add_periodic_timer(@ping) { ping }
      end
    end

    def last_event_id
      @env['HTTP_LAST_EVENT_ID'] || ''
    end

    def rack_response
      [ -1, {}, [] ]
    end

  private

    def open
      return unless @ready_state == WebSocket::API::CONNECTING

      @ready_state = WebSocket::API::OPEN

      event = WebSocket::API::Event.create('open')
      event.init_event('open', false, false)
      dispatch_event(event)
    end

  public

    def send(message, options = {})
      return false if @ready_state > WebSocket::API::OPEN

      message = ::WebSocket::Driver.encode(message.to_s).
                gsub(/(\r\n|\r|\n)/, '\1data: ')

      frame  = ""
      frame << "event: #{ options[:event] }\r\n" if options[:event]
      frame << "id: #{ options[:id] }\r\n" if options[:id]
      frame << "data: #{ message }\r\n\r\n"

      @stream.write(frame)
      true
    end

    def ping(message = nil)
      return false if @ready_state > WebSocket::API::OPEN
      @stream.write(":\r\n\r\n")
      true
    end

    def close
      return if [WebSocket::API::CLOSING, WebSocket::API::CLOSED].include?(@ready_state)

      @ready_state = WebSocket::API::CLOSED
      EventMachine.cancel_timer(@ping_timer)
      @stream.close_connection_after_writing

      event = WebSocket::API::Event.create('close')
      event.init_event('close', false, false)
      dispatch_event(event)
    end

    class Stream < RackStream
      def fail
        @socket_object.close
      end
    end

  end
end