File: AsyncHttpFetch.cc

package info (click to toggle)
trafficserver 9.2.5%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 53,008 kB
  • sloc: cpp: 345,484; ansic: 31,134; python: 24,200; sh: 7,271; makefile: 3,045; perl: 2,261; java: 277; pascal: 119; sql: 94; xml: 2
file content (232 lines) | stat: -rw-r--r-- 7,939 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
/**
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
 */

#include "tscpp/api/GlobalPlugin.h"
#include "tscpp/api/TransactionPlugin.h"
#include "tscpp/api/Logger.h"
#include "tscpp/api/Async.h"
#include "tscpp/api/AsyncHttpFetch.h"
#include "tscpp/api/AsyncTimer.h"
#include "tscpp/api/PluginInit.h"
#include <cstring>
#include <cassert>
#include <utility>

using namespace atscppapi;
using std::string;

// This is for the -T tag debugging
// To view the debug messages ./traffic_server -T "async_http_fetch_example.*"
#define TAG "async_http_fetch_example"

namespace
{
GlobalPlugin *plugin;
}

class AsyncHttpFetch2 : public AsyncHttpFetch
{
public:
  explicit AsyncHttpFetch2(const string &request) : AsyncHttpFetch(request){};
};

class AsyncHttpFetch3 : public AsyncHttpFetch
{
public:
  AsyncHttpFetch3(const string &request, HttpMethod method) : AsyncHttpFetch(request, method){};
};

class DelayedAsyncHttpFetch : public AsyncHttpFetch, public AsyncReceiver<AsyncTimer>
{
public:
  DelayedAsyncHttpFetch(const string &request, HttpMethod method, std::shared_ptr<Mutex> mutex)
    : AsyncHttpFetch(request, method), mutex_(std::move(mutex)), timer_(nullptr){};
  void
  run() override
  {
    timer_ = new AsyncTimer(AsyncTimer::TYPE_ONE_OFF, 1000 /* 1s */);
    Async::execute(this, timer_, mutex_);
  }
  void
  handleAsyncComplete(AsyncTimer & /*timer ATS_UNUSED */) override
  {
    TS_DEBUG(TAG, "Receiver should not be reachable");
    if (!getDispatchController()->dispatch()) {
      TS_ERROR(TAG, "Failed to dispatch()");
    }
    delete this;
  }
  bool
  isAlive()
  {
    return getDispatchController()->isEnabled();
  }
  ~DelayedAsyncHttpFetch() override { delete timer_; }

private:
  std::shared_ptr<Mutex> mutex_;
  AsyncTimer *timer_;
};

class TransactionHookPlugin : public TransactionPlugin,
                              public AsyncReceiver<AsyncHttpFetch>,
                              public AsyncReceiver<AsyncHttpFetch2>,
                              public AsyncReceiver<AsyncHttpFetch3>,
                              public AsyncReceiver<DelayedAsyncHttpFetch>
{
public:
  explicit TransactionHookPlugin(Transaction &transaction)
    : TransactionPlugin(transaction), transaction_(transaction), num_fetches_pending_(0), post_request_(nullptr)
  {
    TS_DEBUG(TAG, "Constructed TransactionHookPlugin, saved a reference to this transaction.");
    registerHook(HOOK_SEND_REQUEST_HEADERS);
  }

  void
  handleSendRequestHeaders(Transaction & /*transaction ATS_UNUSED */) override
  {
    Async::execute<AsyncHttpFetch>(this, new AsyncHttpFetch("http://127.0.0.1/"), getMutex());
    ++num_fetches_pending_;
    post_request_ = new AsyncHttpFetch("http://127.0.0.1/post", "data");

    Async::execute<AsyncHttpFetch>(this, new AsyncHttpFetch("http://127.0.0.1/post", "data"), getMutex());
    ++num_fetches_pending_;

    // we'll add some custom headers for this request
    AsyncHttpFetch2 *provider2 = new AsyncHttpFetch2("http://127.0.0.1/");
    Headers &request_headers   = provider2->getRequestHeaders();
    request_headers.set("Header1", "Value1");
    request_headers.set("Header2", "Value2");
    Async::execute<AsyncHttpFetch2>(this, provider2, getMutex());
    ++num_fetches_pending_;

    DelayedAsyncHttpFetch *delayed_provider = new DelayedAsyncHttpFetch("url", HTTP_METHOD_GET, getMutex());
    Async::execute<DelayedAsyncHttpFetch>(this, delayed_provider, getMutex());

    // canceling right after starting in this case, but cancel() can be called any time
    TS_DEBUG(TAG, "Will cancel delayed fetch");
    if (!delayed_provider->isAlive()) {
      TS_ERROR(TAG, "provider is NOT alive!");
    }

    delayed_provider->cancel();
    if (delayed_provider->isAlive()) {
      TS_ERROR(TAG, "provider is alive!");
    }
  }

  void
  handleAsyncComplete(AsyncHttpFetch &async_http_fetch) override
  {
    // This will be called when our async event is complete.
    TS_DEBUG(TAG, "AsyncHttpFetch completed");
    handleAnyAsyncComplete(async_http_fetch);
  }

  void
  handleAsyncComplete(AsyncHttpFetch2 &async_http_fetch) override
  {
    // This will be called when our async event is complete.
    TS_DEBUG(TAG, "AsyncHttpFetch2 completed");
    handleAnyAsyncComplete(async_http_fetch);
  }

  ~TransactionHookPlugin() override
  {
    TS_DEBUG(TAG, "Destroyed TransactionHookPlugin!");
    // since we die right away, we should not receive the callback for this (using POST request this time)
    Async::execute<AsyncHttpFetch3>(this, new AsyncHttpFetch3("http://127.0.0.1/", HTTP_METHOD_POST), getMutex());
  }

  void
  handleAsyncComplete(AsyncHttpFetch3 & /* async_http_fetch ATS_UNUSED */) override
  {
    assert(!"AsyncHttpFetch3 shouldn't have completed!");
  }

  void
  handleAsyncComplete(DelayedAsyncHttpFetch & /*async_http_fetch ATS_UNUSED */) override
  {
    assert(!"Should've been canceled!");
  }

private:
  Transaction &transaction_;
  int num_fetches_pending_;
  AsyncHttpFetch *post_request_;

  void
  handleAnyAsyncComplete(AsyncHttpFetch &async_http_fetch)
  {
    // This will be called when our async event is complete.
    TS_DEBUG(TAG, "Fetch completed for URL [%s]", async_http_fetch.getRequestUrl().getUrlString().c_str());
    const Response &response = async_http_fetch.getResponse();
    if (async_http_fetch.getResult() == AsyncHttpFetch::RESULT_SUCCESS) {
      TS_DEBUG(TAG, "Response version is [%s], status code %d, reason phrase [%s]",
               HTTP_VERSION_STRINGS[response.getVersion()].c_str(), response.getStatusCode(), response.getReasonPhrase().c_str());

      TS_DEBUG(TAG, "Response Headers: \n%s\n", response.getHeaders().str().c_str());

      const void *body;
      size_t body_size;
      async_http_fetch.getResponseBody(body, body_size);
      TS_DEBUG(TAG, "Response body is %zu bytes long and is [%.*s]", body_size, static_cast<int>(body_size),
               static_cast<const char *>(body));
    } else {
      TS_ERROR(TAG, "Fetch did not complete successfully; Result %d", static_cast<int>(async_http_fetch.getResult()));
    }
    if (--num_fetches_pending_ == 0) {
      TS_DEBUG(TAG, "Reenabling transaction");
      transaction_.resume();
    }
  }
};

class GlobalHookPlugin : public GlobalPlugin
{
public:
  GlobalHookPlugin()
  {
    TS_DEBUG(TAG, "Registering a global hook HOOK_READ_REQUEST_HEADERS_POST_REMAP");
    registerHook(HOOK_READ_REQUEST_HEADERS_POST_REMAP);
  }

  void
  handleReadRequestHeadersPostRemap(Transaction &transaction) override
  {
    TS_DEBUG(TAG, "Received a request in handleReadRequestHeadersPostRemap.");

    // If we don't make sure to check if it's an internal request we can get ourselves into an infinite loop!
    if (!transaction.isInternalRequest()) {
      transaction.addPlugin(new TransactionHookPlugin(transaction));
    } else {
      TS_DEBUG(TAG, "Ignoring internal transaction");
    }
    transaction.resume();
  }
};

void
TSPluginInit(int argc ATSCPPAPI_UNUSED, const char *argv[] ATSCPPAPI_UNUSED)
{
  TS_DEBUG(TAG, "Loaded async_http_fetch_example plugin");
  if (!RegisterGlobalPlugin("CPP_Example_AsyncHttpFetch", "apache", "dev@trafficserver.apache.org")) {
    return;
  }
  plugin = new GlobalHookPlugin();
}