File: pool.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (156 lines) | stat: -rw-r--r-- 3,685 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package grpctool

import (
	"context"
	"errors"
	"fmt"
	"io"
	"net/url"
	"sync"
	"time"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/errz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"go.uber.org/zap"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/insecure"
	"k8s.io/utils/clock"
)

const (
	evictIdleConnAfter = 1 * time.Hour
)

type PoolConn interface {
	grpc.ClientConnInterface
	Done()
}

type PoolInterface interface {
	Dial(ctx context.Context, targetUrl string) (PoolConn, error)
	io.Closer
}

type Pool struct {
	mu       sync.Mutex
	log      *zap.Logger
	errRep   errz.ErrReporter
	tlsCreds credentials.TransportCredentials
	dialOpts []grpc.DialOption
	conns    map[string]*connHolder // target -> conn
	clk      clock.PassiveClock
}

func NewPool(log *zap.Logger, errRep errz.ErrReporter, tlsCreds credentials.TransportCredentials, dialOpts ...grpc.DialOption) *Pool {
	return &Pool{
		log:      log,
		errRep:   errRep,
		tlsCreds: tlsCreds,
		dialOpts: dialOpts,
		conns:    map[string]*connHolder{},
		clk:      clock.RealClock{},
	}
}

func (p *Pool) Dial(ctx context.Context, targetUrl string) (PoolConn, error) {
	p.mu.Lock()
	defer p.mu.Unlock()
	conn := p.conns[targetUrl]
	if conn == nil {
		u, err := url.Parse(targetUrl)
		if err != nil {
			return nil, err
		}
		var creds credentials.TransportCredentials
		var target string
		switch u.Scheme {
		case "grpc":
			target = u.Host
			creds = insecure.NewCredentials()
		case "grpcs":
			target = u.Host
			creds = p.tlsCreds
		default:
			return nil, fmt.Errorf("unsupported URL scheme in %s", targetUrl)
		}
		opts := make([]grpc.DialOption, 0, len(p.dialOpts)+1)
		opts = append(opts, grpc.WithTransportCredentials(creds))
		opts = append(opts, p.dialOpts...)
		grpcConn, err := grpc.DialContext(ctx, target, opts...)
		if err != nil {
			return nil, fmt.Errorf("pool gRPC dial: %w", err)
		}
		conn = &connHolder{
			ClientConn: grpcConn,
		}
		p.conns[targetUrl] = conn
	}
	conn.numUsers++
	return &poolConn{
		connHolder: conn,
		done:       p.connDone,
	}, nil
}

func (p *Pool) Close() error {
	p.mu.Lock()
	defer p.mu.Unlock()
	for targetUrl, conn := range p.conns {
		delete(p.conns, targetUrl)
		log := p.log.With(logz.PoolConnectionUrl(targetUrl))
		if conn.numUsers > 0 {
			log.Sugar().Warnf("Closing pool connection that is being used by %d callers", conn.numUsers)
		}
		err := conn.Close()
		if err != nil {
			p.errRep.HandleProcessingError(context.Background(), log, "Error closing pool connection", err)
		} else {
			log.Debug("Closed pool connection")
		}
	}
	return nil
}

func (p *Pool) connDone(conn *connHolder) {
	p.mu.Lock()
	defer p.mu.Unlock()
	conn.numUsers--
	conn.lastUsed = p.clk.Now()
	p.runGcLocked()
}

func (p *Pool) runGcLocked() {
	expireAt := p.clk.Now().Add(-evictIdleConnAfter)
	for targetUrl, conn := range p.conns {
		if conn.numUsers == 0 && conn.lastUsed.Before(expireAt) {
			delete(p.conns, targetUrl)
			err := conn.Close()
			if err != nil {
				p.errRep.HandleProcessingError(context.Background(), p.log.With(logz.PoolConnectionUrl(targetUrl)), "Error closing idle pool connection", err)
			} else {
				p.log.Debug("Closed idle pool connection", logz.PoolConnectionUrl(targetUrl))
			}
		}
	}
}

type connHolder struct {
	*grpc.ClientConn
	lastUsed time.Time
	numUsers int32 // protected by mutex
}

type poolConn struct {
	*connHolder
	done func(conn *connHolder)
}

func (c *poolConn) Done() {
	if c.done == nil {
		panic(errors.New("pool connection Done() called more than once"))
	}
	done := c.done
	c.done = nil
	done(c.connHolder)
}