File: sinks.go

package info (click to toggle)
docker.io 26.1.5%2Bdfsg1-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 68,576 kB
  • sloc: sh: 5,748; makefile: 912; ansic: 664; asm: 228; python: 162
file content (95 lines) | stat: -rw-r--r-- 2,702 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package watch

import (
	"fmt"
	"time"

	events "github.com/docker/go-events"
)

// ErrSinkTimeout is returned from the Write method when a sink times out.
var ErrSinkTimeout = fmt.Errorf("timeout exceeded, tearing down sink")

// timeoutSink is a sink that wraps another sink with a timeout. If the
// embedded sink fails to complete a Write operation within the specified
// timeout, the Write operation of the timeoutSink fails.
type timeoutSink struct {
	timeout time.Duration
	sink    events.Sink
}

func (s timeoutSink) Write(event events.Event) error {
	errChan := make(chan error)
	go func(c chan<- error) {
		c <- s.sink.Write(event)
	}(errChan)

	timer := time.NewTimer(s.timeout)
	select {
	case err := <-errChan:
		timer.Stop()
		return err
	case <-timer.C:
		s.sink.Close()
		return ErrSinkTimeout
	}
}

func (s timeoutSink) Close() error {
	return s.sink.Close()
}

// dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid
// debug log messages that may be confusing. It is possible that the queue
// will try to write an event to its destination channel while the queue is
// being removed from the broadcaster. Since the channel is closed before the
// queue, there is a narrow window when this is possible. In some event-based
// dropping events when a sink is removed from a broadcaster is a problem, but
// for the usage in this watch package that's the expected behavior.
type dropErrClosed struct {
	sink events.Sink
}

func (s dropErrClosed) Write(event events.Event) error {
	err := s.sink.Write(event)
	if err == events.ErrSinkClosed {
		return nil
	}
	return err
}

func (s dropErrClosed) Close() error {
	return s.sink.Close()
}

// dropErrClosedChanGen is a ChannelSinkGenerator for dropErrClosed sinks wrapping
// unbuffered channels.
type dropErrClosedChanGen struct{}

func (s *dropErrClosedChanGen) NewChannelSink() (events.Sink, *events.Channel) {
	ch := events.NewChannel(0)
	return dropErrClosed{sink: ch}, ch
}

// TimeoutDropErrChanGen is a ChannelSinkGenerator that creates a channel,
// wrapped by the dropErrClosed sink and a timeout.
type TimeoutDropErrChanGen struct {
	timeout time.Duration
}

// NewChannelSink creates a new sink chain of timeoutSink->dropErrClosed->Channel
func (s *TimeoutDropErrChanGen) NewChannelSink() (events.Sink, *events.Channel) {
	ch := events.NewChannel(0)
	return timeoutSink{
		timeout: s.timeout,
		sink: dropErrClosed{
			sink: ch,
		},
	}, ch
}

// NewTimeoutDropErrSinkGen returns a generator of timeoutSinks wrapping dropErrClosed
// sinks, wrapping unbuffered channel sinks.
func NewTimeoutDropErrSinkGen(timeout time.Duration) ChannelSinkGenerator {
	return &TimeoutDropErrChanGen{timeout: timeout}
}