1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
// compile
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This code failed on arm64 in the register allocator.
// See issue 33355.
package server
import (
"bytes"
"sync"
)
type client struct {
junk [4]int
mu sync.Mutex
srv *Server
gw *gateway
msgb [100]byte
}
type gateway struct {
cfg *gatewayCfg
outsim *sync.Map
}
type gatewayCfg struct {
replyPfx []byte
}
type Account struct {
Name string
}
type Server struct {
gateway *srvGateway
}
type srvGateway struct {
outo []*client
}
type subscription struct {
queue []byte
client *client
}
type outsie struct {
ni map[string]struct{}
sl *Sublist
qsubs int
}
type Sublist struct {
}
type SublistResult struct {
psubs []*subscription
qsubs [][]*subscription
}
var subPool = &sync.Pool{}
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) {
var gws []*client
gw := c.srv.gateway
for i := 0; i < len(gw.outo); i++ {
gws = append(gws, gw.outo[i])
}
var (
subj = string(subject)
queuesa = [512]byte{}
queues = queuesa[:0]
mreply []byte
dstPfx []byte
checkReply = len(reply) > 0
)
sub := subPool.Get().(*subscription)
if subjectStartsWithGatewayReplyPrefix(subject) {
dstPfx = subject[:8]
}
for i := 0; i < len(gws); i++ {
gwc := gws[i]
if dstPfx != nil {
gwc.mu.Lock()
ok := bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx)
gwc.mu.Unlock()
if !ok {
continue
}
} else {
qr := gwc.gatewayInterest(acc.Name, subj)
queues = queuesa[:0]
for i := 0; i < len(qr.qsubs); i++ {
qsubs := qr.qsubs[i]
queue := qsubs[0].queue
add := true
for _, qn := range qgroups {
if bytes.Equal(queue, qn) {
add = false
break
}
}
if add {
qgroups = append(qgroups, queue)
}
}
if len(queues) == 0 {
continue
}
}
if checkReply {
checkReply = false
mreply = reply
}
mh := c.msgb[:10]
mh = append(mh, subject...)
if len(queues) > 0 {
mh = append(mh, mreply...)
mh = append(mh, queues...)
}
sub.client = gwc
}
subPool.Put(sub)
}
func subjectStartsWithGatewayReplyPrefix(subj []byte) bool {
return len(subj) > 8 && string(subj[:4]) == "foob"
}
func (c *client) gatewayInterest(acc, subj string) *SublistResult {
ei, _ := c.gw.outsim.Load(acc)
var r *SublistResult
e := ei.(*outsie)
r = e.sl.Match(subj)
return r
}
func (s *Sublist) Match(subject string) *SublistResult {
return nil
}
|