File: inbound_stream_to_outbound_stream.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (42 lines) | stat: -rw-r--r-- 1,907 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package grpctool

type InboundStreamToOutboundStream struct {
	PipeInboundToOutbound func() error
	PipeOutboundToInbound func() error
}

func (x *InboundStreamToOutboundStream) Pipe() error {
	// Cancellation
	//
	// If one of the streams breaks, the other one needs to be aborted too ASAP. Waiting for a timeout
	// is a waste of resources and a bad API with unpredictable latency.
	//
	// The outbound stream is automatically aborted if there is a problem with inbound stream because
	// it uses the inbound stream's context.
	// Unlike the above, if there is a problem with the outbound stream, reads from the inbound stream in
	// PipeInboundToOutbound() are unaffected so can stay blocked for an arbitrary amount of time.
	//
	// To make gRPC abort those method calls, gRPC stream handler (i.e. this method) should just return from the call.
	// See https://github.com/grpc/grpc-go/issues/465#issuecomment-179414474
	//
	// Same as above for the HTTP server, see:
	// - https://github.com/golang/go/blob/go1.18.1/src/net/http/server.go#L1829
	// - https://github.com/golang/go/blob/go1.18.1/src/net/http/server.go#L1884
	// - https://github.com/golang/go/blob/go1.18.1/src/net/http/server.go#L1968
	//
	// To implement the above strategy, we read from the inbound stream in a separate goroutine and return from this
	// handler whenever there is an error, aborting reads from the inbound stream.

	// Channel of size 1 to ensure that if we return early, the other goroutine has space for the value.
	// We don't care about that value if we already got a non-nil error.
	res := make(chan error, 1)
	go func() {
		res <- x.PipeInboundToOutbound()
	}()
	err := x.PipeOutboundToInbound()
	if err != nil {
		return err // unblocks reads from inbound stream in the other goroutine if it is stuck
	}
	// Wait for the other goroutine to return to cleanly finish reading from the inbound stream
	return <-res
}