File: server.go

package info (click to toggle)
golang-github-r3labs-sse 2.10.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 244 kB
  • sloc: makefile: 32
file content (156 lines) | stat: -rw-r--r-- 4,086 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package sse

import (
	"encoding/base64"
	"sync"
	"time"
)

// DefaultBufferSize size of the queue that holds the streams messages.
const DefaultBufferSize = 1024

// Server Is our main struct
type Server struct {
	// Extra headers adding to the HTTP response to each client
	Headers map[string]string
	// Sets a ttl that prevents old events from being transmitted
	EventTTL time.Duration
	// Specifies the size of the message buffer for each stream
	BufferSize int
	// Encodes all data as base64
	EncodeBase64 bool
	// Splits an events data into multiple data: entries
	SplitData bool
	// Enables creation of a stream when a client connects
	AutoStream bool
	// Enables automatic replay for each new subscriber that connects
	AutoReplay bool

	// Specifies the function to run when client subscribe or un-subscribe
	OnSubscribe   func(streamID string, sub *Subscriber)
	OnUnsubscribe func(streamID string, sub *Subscriber)

	streams   map[string]*Stream
	muStreams sync.RWMutex
}

// New will create a server and setup defaults
func New() *Server {
	return &Server{
		BufferSize: DefaultBufferSize,
		AutoStream: false,
		AutoReplay: true,
		streams:    make(map[string]*Stream),
		Headers:    map[string]string{},
	}
}

// NewWithCallback will create a server and setup defaults with callback function
func NewWithCallback(onSubscribe, onUnsubscribe func(streamID string, sub *Subscriber)) *Server {
	return &Server{
		BufferSize:    DefaultBufferSize,
		AutoStream:    false,
		AutoReplay:    true,
		streams:       make(map[string]*Stream),
		Headers:       map[string]string{},
		OnSubscribe:   onSubscribe,
		OnUnsubscribe: onUnsubscribe,
	}
}

// Close shuts down the server, closes all of the streams and connections
func (s *Server) Close() {
	s.muStreams.Lock()
	defer s.muStreams.Unlock()

	for id := range s.streams {
		s.streams[id].close()
		delete(s.streams, id)
	}
}

// CreateStream will create a new stream and register it
func (s *Server) CreateStream(id string) *Stream {
	s.muStreams.Lock()
	defer s.muStreams.Unlock()

	if s.streams[id] != nil {
		return s.streams[id]
	}

	str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe)
	str.run()

	s.streams[id] = str

	return str
}

// RemoveStream will remove a stream
func (s *Server) RemoveStream(id string) {
	s.muStreams.Lock()
	defer s.muStreams.Unlock()

	if s.streams[id] != nil {
		s.streams[id].close()
		delete(s.streams, id)
	}
}

// StreamExists checks whether a stream by a given id exists
func (s *Server) StreamExists(id string) bool {
	return s.getStream(id) != nil
}

// Publish sends a mesage to every client in a streamID.
// If the stream's buffer is full, it blocks until the message is sent out to
// all subscribers (but not necessarily arrived the clients), or when the
// stream is closed.
func (s *Server) Publish(id string, event *Event) {
	stream := s.getStream(id)
	if stream == nil {
		return
	}

	select {
	case <-stream.quit:
	case stream.event <- s.process(event):
	}
}

// TryPublish is the same as Publish except that when the operation would cause
// the call to be blocked, it simply drops the message and returns false.
// Together with a small BufferSize, it can be useful when publishing the
// latest message ASAP is more important than reliable delivery.
func (s *Server) TryPublish(id string, event *Event) bool {
	stream := s.getStream(id)
	if stream == nil {
		return false
	}

	select {
	case stream.event <- s.process(event):
		return true
	default:
		return false
	}
}

func (s *Server) getStream(id string) *Stream {
	s.muStreams.RLock()
	defer s.muStreams.RUnlock()
	return s.streams[id]
}

func (s *Server) process(event *Event) *Event {
	if s.EncodeBase64 {
		output := make([]byte, base64.StdEncoding.EncodedLen(len(event.Data)))
		base64.StdEncoding.Encode(output, event.Data)
		event.Data = output
	}
	return event
}