1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
package daemon
import (
"errors"
"net"
"sync"
"src.elv.sh/pkg/daemon/daemondefs"
"src.elv.sh/pkg/daemon/internal/api"
"src.elv.sh/pkg/rpc"
"src.elv.sh/pkg/store/storedefs"
)
const retriesOnShutdown = 3
var (
// ErrDaemonUnreachable is returned when the daemon cannot be reached after
// several retries.
ErrDaemonUnreachable = errors.New("daemon offline")
)
// Implementation of the Client interface.
type client struct {
sockPath string
rpcClient *rpc.Client
waits sync.WaitGroup
}
// NewClient creates a new Client instance that talks to the socket. Connection
// creation is deferred to the first request.
func NewClient(sockPath string) daemondefs.Client {
return &client{sockPath, nil, sync.WaitGroup{}}
}
// SockPath returns the socket path that the Client talks to. If the client is
// nil, it returns an empty string.
func (c *client) SockPath() string {
return c.sockPath
}
// ResetConn resets the current connection. A new connection will be established
// the next time a request is made. If the client is nil, it does nothing.
func (c *client) ResetConn() error {
if c.rpcClient == nil {
return nil
}
rc := c.rpcClient
c.rpcClient = nil
return rc.Close()
}
// Close waits for all outstanding requests to finish and close the connection.
// If the client is nil, it does nothing and returns nil.
func (c *client) Close() error {
c.waits.Wait()
return c.ResetConn()
}
func (c *client) call(f string, req, res any) error {
c.waits.Add(1)
defer c.waits.Done()
for attempt := 0; attempt < retriesOnShutdown; attempt++ {
if c.rpcClient == nil {
conn, err := net.Dial("unix", c.sockPath)
if err != nil {
return err
}
c.rpcClient = rpc.NewClient(conn)
}
err := c.rpcClient.Call(api.ServiceName+"."+f, req, res)
if err == rpc.ErrShutdown {
// Clear rpcClient so as to reconnect next time
c.rpcClient = nil
continue
} else {
return err
}
}
return ErrDaemonUnreachable
}
// Convenience methods for RPC methods. These are quite repetitive; when the
// number of RPC calls grow above some threshold, a code generator should be
// written to generate them.
func (c *client) Version() (int, error) {
req := &api.VersionRequest{}
res := &api.VersionResponse{}
err := c.call("Version", req, res)
return res.Version, err
}
func (c *client) Pid() (int, error) {
req := &api.PidRequest{}
res := &api.PidResponse{}
err := c.call("Pid", req, res)
return res.Pid, err
}
func (c *client) NextCmdSeq() (int, error) {
req := &api.NextCmdRequest{}
res := &api.NextCmdSeqResponse{}
err := c.call("NextCmdSeq", req, res)
return res.Seq, err
}
func (c *client) AddCmd(text string) (int, error) {
req := &api.AddCmdRequest{Text: text}
res := &api.AddCmdResponse{}
err := c.call("AddCmd", req, res)
return res.Seq, err
}
func (c *client) DelCmd(seq int) error {
req := &api.DelCmdRequest{Seq: seq}
res := &api.DelCmdResponse{}
err := c.call("DelCmd", req, res)
return err
}
func (c *client) Cmd(seq int) (string, error) {
req := &api.CmdRequest{Seq: seq}
res := &api.CmdResponse{}
err := c.call("Cmd", req, res)
return res.Text, err
}
func (c *client) CmdsWithSeq(from, upto int) ([]storedefs.Cmd, error) {
req := &api.CmdsWithSeqRequest{From: from, Upto: upto}
res := &api.CmdsWithSeqResponse{}
err := c.call("CmdsWithSeq", req, res)
return res.Cmds, err
}
func (c *client) NextCmd(from int, prefix string) (storedefs.Cmd, error) {
req := &api.NextCmdRequest{From: from, Prefix: prefix}
res := &api.NextCmdResponse{}
err := c.call("NextCmd", req, res)
return storedefs.Cmd{Text: res.Text, Seq: res.Seq}, err
}
func (c *client) PrevCmd(upto int, prefix string) (storedefs.Cmd, error) {
req := &api.PrevCmdRequest{Upto: upto, Prefix: prefix}
res := &api.PrevCmdResponse{}
err := c.call("PrevCmd", req, res)
return storedefs.Cmd{Text: res.Text, Seq: res.Seq}, err
}
func (c *client) AddDir(dir string, incFactor float64) error {
req := &api.AddDirRequest{Dir: dir, IncFactor: incFactor}
res := &api.AddDirResponse{}
err := c.call("AddDir", req, res)
return err
}
func (c *client) DelDir(dir string) error {
req := &api.DelDirRequest{Dir: dir}
res := &api.DelDirResponse{}
err := c.call("DelDir", req, res)
return err
}
func (c *client) Dirs(blacklist map[string]struct{}) ([]storedefs.Dir, error) {
req := &api.DirsRequest{Blacklist: blacklist}
res := &api.DirsResponse{}
err := c.call("Dirs", req, res)
return res.Dirs, err
}
|