1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
_ "net/http/pprof"
"github.com/centrifugal/centrifuge"
)
func handleLog(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}
func authMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
newCtx := centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{
UserID: "42",
Info: []byte(`{"name": "Alexander"}`),
})
r = r.WithContext(newCtx)
h.ServeHTTP(w, r)
})
}
func waitExitSignal(n *centrifuge.Node) {
sigCh := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
_ = n.Shutdown(context.Background())
done <- true
}()
<-done
}
func main() {
cfg := centrifuge.DefaultConfig
cfg.LogLevel = centrifuge.LogLevelInfo
cfg.LogHandler = handleLog
node, _ := centrifuge.New(cfg)
node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
cred, _ := centrifuge.GetCredentials(ctx)
return centrifuge.ConnectReply{
// Subscribe to personal several server-side channel.
Subscriptions: map[string]centrifuge.SubscribeOptions{
"#" + cred.UserID: {Presence: true},
},
}, nil
})
node.OnConnect(func(client *centrifuge.Client) {
presenceStats, err := node.PresenceStats("#" + client.UserID())
if err != nil {
client.Disconnect(centrifuge.DisconnectServerError)
return
}
if presenceStats.NumClients >= 2 {
err = node.Disconnect(
client.UserID(),
centrifuge.WithDisconnect(centrifuge.DisconnectConnectionLimit),
centrifuge.WithClientWhitelist([]string{client.ID()}),
)
if err != nil {
client.Disconnect(centrifuge.DisconnectServerError)
return
}
}
transport := client.Transport()
log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())
client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
})
})
if err := node.Run(); err != nil {
log.Fatal(err)
}
websocketHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
ReadBufferSize: 1024,
UseWriteBufferPool: true,
})
http.Handle("/connection/websocket", authMiddleware(websocketHandler))
sockjsHandler := centrifuge.NewSockjsHandler(node, centrifuge.SockjsConfig{
URL: "https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js",
HandlerPrefix: "/connection/sockjs",
WebsocketReadBufferSize: 1024,
WebsocketWriteBufferSize: 1024,
})
http.Handle("/connection/sockjs/", authMiddleware(sockjsHandler))
http.Handle("/", http.FileServer(http.Dir("./")))
go func() {
if err := http.ListenAndServe(":8000", nil); err != nil {
log.Fatal(err)
}
}()
waitExitSignal(node)
log.Println("bye!")
}
|