File: server.go

package info (click to toggle)
golang-github-asaskevich-eventbus 0.0~git20200907.49d4230-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 100 kB
  • sloc: makefile: 2
file content (153 lines) | stat: -rw-r--r-- 4,033 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package EventBus

import (
	"errors"
	"fmt"
	"net"
	"net/http"
	"net/rpc"
	"sync"
)

// SubscribeType - how the client intends to subscribe
type SubscribeType int

const (
	// Subscribe - subscribe to all events
	Subscribe SubscribeType = iota
	// SubscribeOnce - subscribe to only one event
	SubscribeOnce
)

const (
	// RegisterService - Server subscribe service method
	RegisterService = "ServerService.Register"
)

// SubscribeArg - object to hold subscribe arguments from remote event handlers
type SubscribeArg struct {
	ClientAddr    string
	ClientPath    string
	ServiceMethod string
	SubscribeType SubscribeType
	Topic         string
}

// Server - object capable of being subscribed to by remote handlers
type Server struct {
	eventBus    Bus
	address     string
	path        string
	subscribers map[string][]*SubscribeArg
	service     *ServerService
}

// NewServer - create a new Server at the address and path
func NewServer(address, path string, eventBus Bus) *Server {
	server := new(Server)
	server.eventBus = eventBus
	server.address = address
	server.path = path
	server.subscribers = make(map[string][]*SubscribeArg)
	server.service = &ServerService{server, &sync.WaitGroup{}, false}
	return server
}

// EventBus - returns wrapped event bus
func (server *Server) EventBus() Bus {
	return server.eventBus
}

func (server *Server) rpcCallback(subscribeArg *SubscribeArg) func(args ...interface{}) {
	return func(args ...interface{}) {
		client, connErr := rpc.DialHTTPPath("tcp", subscribeArg.ClientAddr, subscribeArg.ClientPath)
		defer client.Close()
		if connErr != nil {
			fmt.Errorf("dialing: %v", connErr)
		}
		clientArg := new(ClientArg)
		clientArg.Topic = subscribeArg.Topic
		clientArg.Args = args
		var reply bool
		err := client.Call(subscribeArg.ServiceMethod, clientArg, &reply)
		if err != nil {
			fmt.Errorf("dialing: %v", err)
		}
	}
}

// HasClientSubscribed - True if a client subscribed to this server with the same topic
func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool {
	if topicSubscribers, ok := server.subscribers[arg.Topic]; ok {
		for _, topicSubscriber := range topicSubscribers {
			if *topicSubscriber == *arg {
				return true
			}
		}
	}
	return false
}

// Start - starts a service for remote clients to subscribe to events
func (server *Server) Start() error {
	var err error
	service := server.service
	if !service.started {
		rpcServer := rpc.NewServer()
		rpcServer.Register(service)
		rpcServer.HandleHTTP(server.path, "/debug"+server.path)
		l, e := net.Listen("tcp", server.address)
		if e != nil {
			err = e
			fmt.Errorf("listen error: %v", e)
		}
		service.started = true
		service.wg.Add(1)
		go http.Serve(l, nil)
	} else {
		err = errors.New("Server bus already started")
	}
	return err
}

// Stop - signal for the service to stop serving
func (server *Server) Stop() {
	service := server.service
	if service.started {
		service.wg.Done()
		service.started = false
	}
}

// ServerService - service object to listen to remote subscriptions
type ServerService struct {
	server  *Server
	wg      *sync.WaitGroup
	started bool
}

// Register - Registers a remote handler to this event bus
// for a remote subscribe - a given client address only needs to subscribe once
// event will be republished in local event bus
func (service *ServerService) Register(arg *SubscribeArg, success *bool) error {
	subscribers := service.server.subscribers
	if !service.server.HasClientSubscribed(arg) {
		rpcCallback := service.server.rpcCallback(arg)
		switch arg.SubscribeType {
		case Subscribe:
			service.server.eventBus.Subscribe(arg.Topic, rpcCallback)
		case SubscribeOnce:
			service.server.eventBus.SubscribeOnce(arg.Topic, rpcCallback)
		}
		var topicSubscribers []*SubscribeArg
		if _, ok := subscribers[arg.Topic]; ok {
			topicSubscribers = []*SubscribeArg{arg}
		} else {
			topicSubscribers = subscribers[arg.Topic]
			topicSubscribers = append(topicSubscribers, arg)
		}
		subscribers[arg.Topic] = topicSubscribers
	}
	*success = true
	return nil
}