1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
package EventBus
import (
"errors"
"fmt"
"net"
"net/http"
"net/rpc"
"sync"
)
// SubscribeType - how the client intends to subscribe
type SubscribeType int
const (
// Subscribe - subscribe to all events
Subscribe SubscribeType = iota
// SubscribeOnce - subscribe to only one event
SubscribeOnce
)
const (
// RegisterService - Server subscribe service method
RegisterService = "ServerService.Register"
)
// SubscribeArg - object to hold subscribe arguments from remote event handlers
type SubscribeArg struct {
ClientAddr string
ClientPath string
ServiceMethod string
SubscribeType SubscribeType
Topic string
}
// Server - object capable of being subscribed to by remote handlers
type Server struct {
eventBus Bus
address string
path string
subscribers map[string][]*SubscribeArg
service *ServerService
}
// NewServer - create a new Server at the address and path
func NewServer(address, path string, eventBus Bus) *Server {
server := new(Server)
server.eventBus = eventBus
server.address = address
server.path = path
server.subscribers = make(map[string][]*SubscribeArg)
server.service = &ServerService{server, &sync.WaitGroup{}, false}
return server
}
// EventBus - returns wrapped event bus
func (server *Server) EventBus() Bus {
return server.eventBus
}
func (server *Server) rpcCallback(subscribeArg *SubscribeArg) func(args ...interface{}) {
return func(args ...interface{}) {
client, connErr := rpc.DialHTTPPath("tcp", subscribeArg.ClientAddr, subscribeArg.ClientPath)
defer client.Close()
if connErr != nil {
fmt.Errorf("dialing: %v", connErr)
}
clientArg := new(ClientArg)
clientArg.Topic = subscribeArg.Topic
clientArg.Args = args
var reply bool
err := client.Call(subscribeArg.ServiceMethod, clientArg, &reply)
if err != nil {
fmt.Errorf("dialing: %v", err)
}
}
}
// HasClientSubscribed - True if a client subscribed to this server with the same topic
func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool {
if topicSubscribers, ok := server.subscribers[arg.Topic]; ok {
for _, topicSubscriber := range topicSubscribers {
if *topicSubscriber == *arg {
return true
}
}
}
return false
}
// Start - starts a service for remote clients to subscribe to events
func (server *Server) Start() error {
var err error
service := server.service
if !service.started {
rpcServer := rpc.NewServer()
rpcServer.Register(service)
rpcServer.HandleHTTP(server.path, "/debug"+server.path)
l, e := net.Listen("tcp", server.address)
if e != nil {
err = e
fmt.Errorf("listen error: %v", e)
}
service.started = true
service.wg.Add(1)
go http.Serve(l, nil)
} else {
err = errors.New("Server bus already started")
}
return err
}
// Stop - signal for the service to stop serving
func (server *Server) Stop() {
service := server.service
if service.started {
service.wg.Done()
service.started = false
}
}
// ServerService - service object to listen to remote subscriptions
type ServerService struct {
server *Server
wg *sync.WaitGroup
started bool
}
// Register - Registers a remote handler to this event bus
// for a remote subscribe - a given client address only needs to subscribe once
// event will be republished in local event bus
func (service *ServerService) Register(arg *SubscribeArg, success *bool) error {
subscribers := service.server.subscribers
if !service.server.HasClientSubscribed(arg) {
rpcCallback := service.server.rpcCallback(arg)
switch arg.SubscribeType {
case Subscribe:
service.server.eventBus.Subscribe(arg.Topic, rpcCallback)
case SubscribeOnce:
service.server.eventBus.SubscribeOnce(arg.Topic, rpcCallback)
}
var topicSubscribers []*SubscribeArg
if _, ok := subscribers[arg.Topic]; ok {
topicSubscribers = []*SubscribeArg{arg}
} else {
topicSubscribers = subscribers[arg.Topic]
topicSubscribers = append(topicSubscribers, arg)
}
subscribers[arg.Topic] = topicSubscribers
}
*success = true
return nil
}
|