1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
|
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "test/cpp/interop/istio_echo_server_lib.h"
#include <thread>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_split.h"
#include "absl/synchronization/blocking_counter.h"
#include <grpcpp/client_context.h>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/gprpp/host_port.h"
#include "src/proto/grpc/testing/istio_echo.pb.h"
using proto::EchoRequest;
using proto::EchoResponse;
using proto::EchoTestService;
using proto::ForwardEchoRequest;
using proto::ForwardEchoResponse;
namespace grpc {
namespace testing {
namespace {
const absl::string_view kRequestIdField = "x-request-id";
const absl::string_view kServiceVersionField = "ServiceVersion";
// const absl::string_view kServicePortField = "ServicePort";
const absl::string_view kStatusCodeField = "StatusCode";
// const absl::string_view kUrlField = "URL";
const absl::string_view kHostField = "Host";
const absl::string_view kHostnameField = "Hostname";
// const absl::string_view kMethodField = "Method";
const absl::string_view kRequestHeader = "RequestHeader";
// const absl::string_view kResponseHeader = "ResponseHeader";
// const absl::string_view kClusterField = "Cluster";
// const absl::string_view kIstioVersionField = "IstioVersion";
const absl::string_view kIpField = "IP"; // The Requester’s IP Address.
absl::string_view StringRefToStringView(const string_ref& r) {
return absl::string_view(r.data(), r.size());
}
struct EchoCall {
grpc::ClientContext context;
proto::EchoResponse response;
Status status;
};
} // namespace
EchoTestServiceImpl::EchoTestServiceImpl(std::string hostname,
std::string service_version,
std::string forwarding_address)
: hostname_(std::move(hostname)),
service_version_(std::move(service_version)),
forwarding_address_(std::move(forwarding_address)) {
forwarding_stub_ = EchoTestService::NewStub(
CreateChannel(forwarding_address_, InsecureChannelCredentials()));
}
Status EchoTestServiceImpl::Echo(ServerContext* context,
const EchoRequest* request,
EchoResponse* response) {
std::string s;
absl::StrAppend(&s, kHostField, "=",
StringRefToStringView(context->ExperimentalGetAuthority()),
"\n");
const std::multimap<string_ref, string_ref> metadata =
context->client_metadata();
for (const auto& kv : metadata) {
// Skip all binary headers.
if (kv.first.ends_with("-bin")) {
continue;
}
absl::StrAppend(&s, kRequestHeader, "=", StringRefToStringView(kv.first),
":", StringRefToStringView(kv.second), "\n");
}
absl::string_view host;
absl::string_view port;
std::string peer = context->peer();
grpc_core::SplitHostPort(peer, &host, &port);
// This is not a complete list, but also not all fields are used. May
// need to add/remove fields later, if required by tests. Only keep the
// fields needed for now.
//
// absl::StrAppend(&s,kServicePortField,"=",this->port_,"\n");
// absl::StrAppend(&s,kClusterField,"=",this->cluster_,"\n");
// absl::StrAppend(&s,kIstioVersionField,"=",this->istio_version_,"\n");
absl::StrAppend(&s, kServiceVersionField, "=", this->service_version_, "\n");
absl::StrAppend(&s, kIpField, "=", host, "\n");
absl::StrAppend(&s, kStatusCodeField, "=", std::to_string(200), "\n");
absl::StrAppend(&s, kHostnameField, "=", this->hostname_, "\n");
absl::StrAppend(&s, "Echo=", request->message(), "\n");
response->set_message(s);
gpr_log(GPR_INFO, "Echo response:\n%s", s.c_str());
return Status::OK;
}
Status EchoTestServiceImpl::ForwardEcho(ServerContext* context,
const ForwardEchoRequest* request,
ForwardEchoResponse* response) {
std::string raw_url = request->url();
size_t colon = raw_url.find_first_of(':');
std::string scheme;
if (colon == std::string::npos) {
return Status(
StatusCode::INVALID_ARGUMENT,
absl::StrFormat("No protocol configured for url %s", raw_url));
}
scheme = raw_url.substr(0, colon);
std::shared_ptr<Channel> channel;
if (scheme == "xds") {
// We can optionally add support for TLS creds, but we are primarily
// concerned with proxyless-grpc here.
gpr_log(GPR_INFO, "Creating channel to %s using xDS Creds",
raw_url.c_str());
channel =
CreateChannel(raw_url, XdsCredentials(InsecureChannelCredentials()));
} else if (scheme == "grpc") {
// We don't really want to test this but the istio test infrastructure needs
// this to be supported. If we ever decide to add support for this properly,
// we would need to add support for TLS creds here.
absl::string_view address = absl::StripPrefix(raw_url, "grpc://");
gpr_log(GPR_INFO, "Creating channel to %s", std::string(address).c_str());
channel = CreateChannel(std::string(address), InsecureChannelCredentials());
} else {
gpr_log(GPR_INFO, "Protocol %s not supported. Forwarding to %s",
scheme.c_str(), forwarding_address_.c_str());
ClientContext forwarding_ctx;
forwarding_ctx.set_deadline(context->deadline());
return forwarding_stub_->ForwardEcho(&forwarding_ctx, *request, response);
}
auto stub = EchoTestService::NewStub(channel);
auto count = request->count() == 0 ? 1 : request->count();
// Calculate the amount of time to sleep after each call.
std::chrono::duration<double> duration_per_query =
std::chrono::nanoseconds::zero();
if (request->qps() > 0) {
duration_per_query =
std::chrono::nanoseconds(std::chrono::seconds(1)) / request->qps();
}
std::vector<EchoCall> calls(count);
EchoRequest echo_request;
echo_request.set_message(request->message());
absl::BlockingCounter counter(count);
for (int i = 0; i < count; ++i) {
calls[i].context.AddMetadata(std::string(kRequestIdField),
std::to_string(i));
for (const auto& header : request->headers()) {
if (header.key() != kHostField) {
calls[i].context.AddMetadata(header.key(), header.value());
}
}
constexpr int kDefaultTimeout = 5 * 1000 * 1000 /* 5 seconds */;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() +
std::chrono::microseconds(request->timeout_micros() > 0
? request->timeout_micros()
: kDefaultTimeout);
calls[i].context.set_deadline(deadline);
stub->async()->Echo(&calls[i].context, &echo_request, &calls[i].response,
[&, index = i](Status s) {
calls[index].status = s;
counter.DecrementCount();
});
std::this_thread::sleep_for(duration_per_query);
}
// Wait for all calls to be done.
counter.Wait();
for (int i = 0; i < count; ++i) {
if (calls[i].status.ok()) {
std::string body;
// The test infrastructure might expect the entire struct instead of
// just the message.
absl::StrAppend(&body, absl::StrFormat("[%d] grpcecho.Echo(%s)\n", i,
request->message()));
auto contents =
absl::StrSplit(calls[i].response.message(), '\n', absl::SkipEmpty());
for (const auto& line : contents) {
absl::StrAppend(&body, absl::StrFormat("[%d body] %s\n", i, line));
}
response->add_output(body);
gpr_log(GPR_INFO, "Forward Echo response:%d\n%s", i, body.c_str());
} else {
gpr_log(GPR_ERROR, "RPC %d failed %d: %s", i,
calls[i].status.error_code(),
calls[i].status.error_message().c_str());
response->clear_output();
return calls[i].status;
}
}
return Status::OK;
}
} // namespace testing
} // namespace grpc
|