File: update_injector.go

package info (click to toggle)
golang-github-protonmail-gluon 0.17.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 16,020 kB
  • sloc: sh: 55; makefile: 5
file content (92 lines) | stat: -rw-r--r-- 2,156 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package backend

import (
	"context"
	"sync"

	"github.com/ProtonMail/gluon/async"
	"github.com/ProtonMail/gluon/connector"
	"github.com/ProtonMail/gluon/imap"
	"github.com/ProtonMail/gluon/logging"
)

// updateInjector allows anyone to publish custom imap updates alongside the updates that are generated from the
// Connector.
type updateInjector struct {
	// updatesCh is the channel that delivers API updates to the mailserver.
	updatesCh chan imap.Update

	// forwardWG is used to ensure we wait until the forward() goroutine has finished executing.
	forwardWG     sync.WaitGroup
	forwardQuitCh chan struct{}
}

// newUpdateInjector creates a new updateInjector.
//
// nolint:contextcheck
func newUpdateInjector(connector connector.Connector, userID string, panicHandler async.PanicHandler) *updateInjector {
	injector := &updateInjector{
		updatesCh:     make(chan imap.Update),
		forwardQuitCh: make(chan struct{}),
	}

	injector.forwardWG.Add(1)

	async.GoAnnotated(context.Background(), panicHandler, func(ctx context.Context) {
		injector.forward(ctx, connector.GetUpdates())
	}, logging.Labels{
		"Action": "Forwarding updates",
		"UserID": userID,
	})

	return injector
}

// GetUpdates returns a channel on which updates from the server are sent.
func (u *updateInjector) GetUpdates() <-chan imap.Update {
	return u.updatesCh
}

func (u *updateInjector) Close(ctx context.Context) error {
	close(u.forwardQuitCh)
	u.forwardWG.Wait()

	return nil
}

// forward pulls updates off the stream and forwards them to the outgoing update channel.
func (u *updateInjector) forward(ctx context.Context, updateCh <-chan imap.Update) {
	defer func() {
		close(u.updatesCh)
		u.forwardWG.Done()
	}()

	for {
		select {
		case <-ctx.Done():
			return
		case update, ok := <-updateCh:
			if !ok {
				return
			}

			u.send(ctx, update)

		case <-u.forwardQuitCh:
			return
		}
	}
}

// send the update on the updates channel, optionally blocking until it has been processed.
func (u *updateInjector) send(ctx context.Context, update imap.Update) {
	select {
	case <-u.forwardQuitCh:
		return

	case u.updatesCh <- update:

	case <-ctx.Done():
		return
	}
}