1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
package tntengine
import (
"context"
"fmt"
"time"
"github.com/FZambia/tarantool"
)
const (
defaultConnectTimeout = time.Second
defaultRequestTimeout = time.Second
defaultReadTimeout = 5 * time.Second
defaultWriteTimeout = time.Second
)
// Shard represents single Tarantool instance.
type Shard struct {
config ShardConfig
subCh chan subRequest
mc *MultiConnection
}
// ShardConfig allows providing options to connect to Tarantool.
type ShardConfig struct {
// Addresses of Tarantool instances.
Addresses []string
// User for auth.
User string
// Password for auth.
Password string
// ConnectionMode for shard.
ConnectionMode ConnectionMode
}
func NewShard(c ShardConfig) (*Shard, error) {
shard := &Shard{
config: c,
subCh: make(chan subRequest),
}
mc, err := Connect(c.Addresses, tarantool.Opts{
ConnectTimeout: defaultConnectTimeout,
RequestTimeout: defaultRequestTimeout,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
ReconnectDelay: 50 * time.Millisecond,
User: c.User,
Password: c.Password,
SkipSchema: true,
}, MultiOpts{
ConnectionMode: c.ConnectionMode,
})
if err != nil {
return nil, fmt.Errorf("error creating req connection to %#v: %w", c.Addresses, err)
}
shard.mc = mc
return shard, nil
}
func (s *Shard) Exec(request *tarantool.Request) (*tarantool.Response, error) {
conn, err := s.mc.LeaderConn()
if err != nil {
return nil, err
}
return conn.Exec(request)
}
func (s *Shard) ExecTyped(request *tarantool.Request, result interface{}) error {
conn, err := s.mc.LeaderConn()
if err != nil {
return err
}
return conn.ExecTyped(request, result)
}
func (s *Shard) pubSubConn() (*tarantool.Connection, func(), error) {
conn, err := s.mc.NewLeaderConn(tarantool.Opts{
ConnectTimeout: defaultConnectTimeout,
RequestTimeout: 5 * time.Second,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
ReconnectDelay: 0,
User: s.config.User,
Password: s.config.Password,
SkipSchema: true,
})
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
ok, err := s.mc.IsLeader(conn)
if err != nil || !ok {
s.mc.LeaderChanged()
_ = conn.Close()
}
}
}
}()
return conn, cancel, nil
}
// subRequest is an internal request to subscribe or unsubscribe from one or more channels
type subRequest struct {
channels []string
subscribe bool
err chan error
}
// newSubRequest creates a new request to subscribe or unsubscribe form a channel.
func newSubRequest(chIDs []string, subscribe bool) subRequest {
return subRequest{
channels: chIDs,
subscribe: subscribe,
err: make(chan error, 1),
}
}
// done should only be called once for subRequest.
func (sr *subRequest) done(err error) {
sr.err <- err
}
func (sr *subRequest) result() error {
return <-sr.err
}
|