File: istio_echo_server_lib.cc

package info (click to toggle)
grpc 1.51.1-8
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 78,336 kB
  • sloc: cpp: 361,873; python: 72,206; ansic: 37,787; objc: 12,434; ruby: 11,521; sh: 7,652; php: 7,615; makefile: 3,481; xml: 3,246; cs: 1,836; javascript: 1,614; java: 465; pascal: 227; awk: 132
file content (213 lines) | stat: -rw-r--r-- 8,615 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include "test/cpp/interop/istio_echo_server_lib.h"

#include <thread>

#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_split.h"
#include "absl/synchronization/blocking_counter.h"

#include <grpcpp/client_context.h>
#include <grpcpp/grpcpp.h>

#include "src/core/lib/gprpp/host_port.h"
#include "src/proto/grpc/testing/istio_echo.pb.h"

using proto::EchoRequest;
using proto::EchoResponse;
using proto::EchoTestService;
using proto::ForwardEchoRequest;
using proto::ForwardEchoResponse;

namespace grpc {
namespace testing {
namespace {

const absl::string_view kRequestIdField = "x-request-id";
const absl::string_view kServiceVersionField = "ServiceVersion";
// const absl::string_view kServicePortField = "ServicePort";
const absl::string_view kStatusCodeField = "StatusCode";
// const absl::string_view kUrlField = "URL";
const absl::string_view kHostField = "Host";
const absl::string_view kHostnameField = "Hostname";
// const absl::string_view kMethodField = "Method";
const absl::string_view kRequestHeader = "RequestHeader";
// const absl::string_view kResponseHeader = "ResponseHeader";
// const absl::string_view kClusterField = "Cluster";
// const absl::string_view kIstioVersionField = "IstioVersion";
const absl::string_view kIpField = "IP";  // The Requester’s IP Address.

absl::string_view StringRefToStringView(const string_ref& r) {
  return absl::string_view(r.data(), r.size());
}

struct EchoCall {
  grpc::ClientContext context;
  proto::EchoResponse response;
  Status status;
};

}  // namespace

EchoTestServiceImpl::EchoTestServiceImpl(std::string hostname,
                                         std::string service_version,
                                         std::string forwarding_address)
    : hostname_(std::move(hostname)),
      service_version_(std::move(service_version)),
      forwarding_address_(std::move(forwarding_address)) {
  forwarding_stub_ = EchoTestService::NewStub(
      CreateChannel(forwarding_address_, InsecureChannelCredentials()));
}

Status EchoTestServiceImpl::Echo(ServerContext* context,
                                 const EchoRequest* request,
                                 EchoResponse* response) {
  std::string s;
  absl::StrAppend(&s, kHostField, "=",
                  StringRefToStringView(context->ExperimentalGetAuthority()),
                  "\n");
  const std::multimap<string_ref, string_ref> metadata =
      context->client_metadata();
  for (const auto& kv : metadata) {
    // Skip all binary headers.
    if (kv.first.ends_with("-bin")) {
      continue;
    }
    absl::StrAppend(&s, kRequestHeader, "=", StringRefToStringView(kv.first),
                    ":", StringRefToStringView(kv.second), "\n");
  }
  absl::string_view host;
  absl::string_view port;
  std::string peer = context->peer();
  grpc_core::SplitHostPort(peer, &host, &port);
  // This is not a complete list, but also not all fields are used. May
  //  need to add/remove fields later, if required by tests. Only keep the
  //  fields needed for now.
  //
  //  absl::StrAppend(&s,kServicePortField,"=",this->port_,"\n");
  //  absl::StrAppend(&s,kClusterField,"=",this->cluster_,"\n");
  //  absl::StrAppend(&s,kIstioVersionField,"=",this->istio_version_,"\n");
  absl::StrAppend(&s, kServiceVersionField, "=", this->service_version_, "\n");
  absl::StrAppend(&s, kIpField, "=", host, "\n");
  absl::StrAppend(&s, kStatusCodeField, "=", std::to_string(200), "\n");
  absl::StrAppend(&s, kHostnameField, "=", this->hostname_, "\n");
  absl::StrAppend(&s, "Echo=", request->message(), "\n");
  response->set_message(s);
  gpr_log(GPR_INFO, "Echo response:\n%s", s.c_str());
  return Status::OK;
}

Status EchoTestServiceImpl::ForwardEcho(ServerContext* context,
                                        const ForwardEchoRequest* request,
                                        ForwardEchoResponse* response) {
  std::string raw_url = request->url();
  size_t colon = raw_url.find_first_of(':');
  std::string scheme;
  if (colon == std::string::npos) {
    return Status(
        StatusCode::INVALID_ARGUMENT,
        absl::StrFormat("No protocol configured for url %s", raw_url));
  }
  scheme = raw_url.substr(0, colon);
  std::shared_ptr<Channel> channel;
  if (scheme == "xds") {
    // We can optionally add support for TLS creds, but we are primarily
    // concerned with proxyless-grpc here.
    gpr_log(GPR_INFO, "Creating channel to %s using xDS Creds",
            raw_url.c_str());
    channel =
        CreateChannel(raw_url, XdsCredentials(InsecureChannelCredentials()));
  } else if (scheme == "grpc") {
    // We don't really want to test this but the istio test infrastructure needs
    // this to be supported. If we ever decide to add support for this properly,
    // we would need to add support for TLS creds here.
    absl::string_view address = absl::StripPrefix(raw_url, "grpc://");
    gpr_log(GPR_INFO, "Creating channel to %s", std::string(address).c_str());
    channel = CreateChannel(std::string(address), InsecureChannelCredentials());
  } else {
    gpr_log(GPR_INFO, "Protocol %s not supported. Forwarding to %s",
            scheme.c_str(), forwarding_address_.c_str());
    ClientContext forwarding_ctx;
    forwarding_ctx.set_deadline(context->deadline());
    return forwarding_stub_->ForwardEcho(&forwarding_ctx, *request, response);
  }
  auto stub = EchoTestService::NewStub(channel);
  auto count = request->count() == 0 ? 1 : request->count();
  // Calculate the amount of time to sleep after each call.
  std::chrono::duration<double> duration_per_query =
      std::chrono::nanoseconds::zero();
  if (request->qps() > 0) {
    duration_per_query =
        std::chrono::nanoseconds(std::chrono::seconds(1)) / request->qps();
  }
  std::vector<EchoCall> calls(count);
  EchoRequest echo_request;
  echo_request.set_message(request->message());
  absl::BlockingCounter counter(count);
  for (int i = 0; i < count; ++i) {
    calls[i].context.AddMetadata(std::string(kRequestIdField),
                                 std::to_string(i));
    for (const auto& header : request->headers()) {
      if (header.key() != kHostField) {
        calls[i].context.AddMetadata(header.key(), header.value());
      }
    }
    constexpr int kDefaultTimeout = 5 * 1000 * 1000 /* 5 seconds */;
    std::chrono::system_clock::time_point deadline =
        std::chrono::system_clock::now() +
        std::chrono::microseconds(request->timeout_micros() > 0
                                      ? request->timeout_micros()
                                      : kDefaultTimeout);
    calls[i].context.set_deadline(deadline);
    stub->async()->Echo(&calls[i].context, &echo_request, &calls[i].response,
                        [&, index = i](Status s) {
                          calls[index].status = s;
                          counter.DecrementCount();
                        });
    std::this_thread::sleep_for(duration_per_query);
  }
  // Wait for all calls to be done.
  counter.Wait();
  for (int i = 0; i < count; ++i) {
    if (calls[i].status.ok()) {
      std::string body;
      // The test infrastructure might expect the entire struct instead of
      // just the message.
      absl::StrAppend(&body, absl::StrFormat("[%d] grpcecho.Echo(%s)\n", i,
                                             request->message()));
      auto contents =
          absl::StrSplit(calls[i].response.message(), '\n', absl::SkipEmpty());
      for (const auto& line : contents) {
        absl::StrAppend(&body, absl::StrFormat("[%d body] %s\n", i, line));
      }
      response->add_output(body);
      gpr_log(GPR_INFO, "Forward Echo response:%d\n%s", i, body.c_str());
    } else {
      gpr_log(GPR_ERROR, "RPC %d failed %d: %s", i,
              calls[i].status.error_code(),
              calls[i].status.error_message().c_str());
      response->clear_output();
      return calls[i].status;
    }
  }
  return Status::OK;
}

}  // namespace testing
}  // namespace grpc