File: stream_cleanup_test.go

package info (click to toggle)
golang-google-grpc 1.27.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, bullseye-backports, experimental, sid
  • size: 5,048 kB
  • sloc: sh: 583; makefile: 66
file content (134 lines) | stat: -rw-r--r-- 4,697 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
 *
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package test

import (
	"context"
	"io"
	"testing"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	testpb "google.golang.org/grpc/test/grpc_testing"
)

func (s) TestStreamCleanup(t *testing.T) {
	const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
	const bodySize = 2 * initialWindowSize   // Something that is not going to fit in a single window
	const callRecvMsgSize uint = 1           // The maximum message size the client can receive

	ss := &stubServer{
		unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
			return &testpb.SimpleResponse{Payload: &testpb.Payload{
				Body: make([]byte, bodySize),
			}}, nil
		},
		emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
			return &testpb.Empty{}, nil
		},
	}
	if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
		t.Fatalf("Error starting endpoint server: %v", err)
	}
	defer ss.Stop()

	if _, err := ss.client.UnaryCall(context.Background(), &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted {
		t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize)
	}
	if _, err := ss.client.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
		t.Fatalf("should succeed, err: %v", err)
	}
}

func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
	const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
	const bodySize = 2 * initialWindowSize   // Something that is not going to fit in a single window

	serverReturnedStatus := make(chan struct{})

	ss := &stubServer{
		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
			defer func() {
				close(serverReturnedStatus)
			}()
			return stream.Send(&testpb.StreamingOutputCallResponse{
				Payload: &testpb.Payload{
					Body: make([]byte, bodySize),
				},
			})
		},
	}
	if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
		t.Fatalf("Error starting endpoint server: %v", err)
	}
	defer ss.Stop()

	// This test makes sure we don't delete stream from server transport's
	// activeStreams list too aggressively.

	// 1. Make a long living stream RPC. So server's activeStream list is not
	// empty.
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	stream, err := ss.client.FullDuplexCall(ctx)
	if err != nil {
		t.Fatalf("FullDuplexCall= _, %v; want _, <nil>", err)
	}

	// 2. Wait for service handler to return status.
	//
	// This will trigger a stream cleanup code, which will eventually remove
	// this stream from activeStream.
	//
	// But the stream removal won't happen because it's supposed to be done
	// after the status is sent by loopyWriter, and the status send is blocked
	// by flow control.
	<-serverReturnedStatus

	// 3. GracefulStop (besides sending goaway) checks the number of
	// activeStreams.
	//
	// It will close the connection if there's no active streams. This won't
	// happen because of the pending stream. But if there's a bug in stream
	// cleanup that causes stream to be removed too aggressively, the connection
	// will be closd and the stream will be broken.
	gracefulStopDone := make(chan struct{})
	go func() {
		defer close(gracefulStopDone)
		ss.s.GracefulStop()
	}()

	// 4. Make sure the stream is not broken.
	if _, err := stream.Recv(); err != nil {
		t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
	}
	if _, err := stream.Recv(); err != io.EOF {
		t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
	}

	timer := time.NewTimer(time.Second)
	select {
	case <-gracefulStopDone:
		timer.Stop()
	case <-timer.C:
		t.Fatalf("s.GracefulStop() didn't finish without 1 second after the last RPC")
	}
}