File: client.go

package info (click to toggle)
golang-github-grpc-ecosystem-go-grpc-middleware 2.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 1,564 kB
  • sloc: makefile: 100; sh: 9
file content (79 lines) | stat: -rw-r--r-- 2,569 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.

// Go gRPC Middleware monitoring interceptors for client-side gRPC.

package interceptors

import (
	"context"
	"io"
	"time"

	"google.golang.org/grpc"
)

// UnaryClientInterceptor is a gRPC client-side interceptor that provides reporting for Unary RPCs.
func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterceptor {
	return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
		r := newReport(NewClientCallMeta(method, nil, req))
		reporter, newCtx := reportable.ClientReporter(ctx, r.callMeta)

		reporter.PostMsgSend(req, nil, time.Since(r.startTime))
		err := invoker(newCtx, method, req, reply, cc, opts...)
		reporter.PostMsgReceive(reply, err, time.Since(r.startTime))
		reporter.PostCall(err, time.Since(r.startTime))
		return err
	}
}

// StreamClientInterceptor is a gRPC client-side interceptor that provides reporting for Stream RPCs.
func StreamClientInterceptor(reportable ClientReportable) grpc.StreamClientInterceptor {
	return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
		r := newReport(NewClientCallMeta(method, desc, nil))
		reporter, newCtx := reportable.ClientReporter(ctx, r.callMeta)

		clientStream, err := streamer(newCtx, desc, cc, method, opts...)
		if err != nil {
			reporter.PostCall(err, time.Since(r.startTime))
			return nil, err
		}
		return &monitoredClientStream{ClientStream: clientStream, startTime: r.startTime, hasServerStream: desc.ServerStreams, reporter: reporter}, nil
	}
}

// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to report.
type monitoredClientStream struct {
	grpc.ClientStream

	startTime       time.Time
	hasServerStream bool
	reporter        Reporter
}

func (s *monitoredClientStream) SendMsg(m any) error {
	start := time.Now()
	err := s.ClientStream.SendMsg(m)
	s.reporter.PostMsgSend(m, err, time.Since(start))
	return err
}

func (s *monitoredClientStream) RecvMsg(m any) error {
	start := time.Now()
	err := s.ClientStream.RecvMsg(m)
	s.reporter.PostMsgReceive(m, err, time.Since(start))

	if s.hasServerStream {
		if err == nil {
			return nil
		}
		var postErr error
		if err != io.EOF {
			postErr = err
		}
		s.reporter.PostCall(postErr, time.Since(s.startTime))
	} else {
		s.reporter.PostCall(err, time.Since(s.startTime))
	}
	return err
}