1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
|
require File.expand_path('../websocket', __FILE__) unless defined?(Faye::WebSocket)
module Faye
class EventSource
include WebSocket::API::EventTarget
attr_reader :env, :url, :ready_state
DEFAULT_RETRY = 5
def self.eventsource?(env)
return false unless env['REQUEST_METHOD'] == 'GET'
accept = (env['HTTP_ACCEPT'] || '').split(/\s*,\s*/)
accept.include?('text/event-stream')
end
def self.determine_url(env)
WebSocket.determine_url(env, ['https', 'http'])
end
def initialize(env, options = {})
WebSocket.ensure_reactor_running
super()
@env = env
@ping = options[:ping]
@retry = (options[:retry] || DEFAULT_RETRY).to_f
@url = EventSource.determine_url(env)
@stream = Stream.new(self)
@ready_state = WebSocket::API::CONNECTING
headers = ::WebSocket::Driver::Headers.new
if options[:headers]
options[:headers].each { |k,v| headers[k] = v }
end
if callback = @env['async.callback']
callback.call([101, {}, @stream])
end
@stream.write("HTTP/1.1 200 OK\r\n" +
"Content-Type: text/event-stream\r\n" +
"Cache-Control: no-cache, no-store\r\n" +
"Connection: close\r\n" +
headers.to_s +
"\r\n" +
"retry: #{ (@retry * 1000).floor }\r\n\r\n")
EventMachine.next_tick { open }
if @ping
@ping_timer = EventMachine.add_periodic_timer(@ping) { ping }
end
end
def last_event_id
@env['HTTP_LAST_EVENT_ID'] || ''
end
def rack_response
[ -1, {}, [] ]
end
private
def open
return unless @ready_state == WebSocket::API::CONNECTING
@ready_state = WebSocket::API::OPEN
event = WebSocket::API::Event.create('open')
event.init_event('open', false, false)
dispatch_event(event)
end
public
def send(message, options = {})
return false if @ready_state > WebSocket::API::OPEN
message = ::WebSocket::Driver.encode(message.to_s).
gsub(/(\r\n|\r|\n)/, '\1data: ')
frame = ""
frame << "event: #{ options[:event] }\r\n" if options[:event]
frame << "id: #{ options[:id] }\r\n" if options[:id]
frame << "data: #{ message }\r\n\r\n"
@stream.write(frame)
true
end
def ping(message = nil)
return false if @ready_state > WebSocket::API::OPEN
@stream.write(":\r\n\r\n")
true
end
def close
return if [WebSocket::API::CLOSING, WebSocket::API::CLOSED].include?(@ready_state)
@ready_state = WebSocket::API::CLOSED
EventMachine.cancel_timer(@ping_timer)
@stream.close_connection_after_writing
event = WebSocket::API::Event.create('close')
event.init_event('close', false, false)
dispatch_event(event)
end
class Stream < RackStream
def fail
@socket_object.close
end
end
end
end
|