File: WatchmanConnection.cpp

package info (click to toggle)
watchman 4.9.0-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,992 kB
  • sloc: cpp: 27,459; python: 6,538; java: 3,404; php: 3,257; ansic: 2,803; javascript: 1,116; makefile: 671; ruby: 364; sh: 124; xml: 102; lisp: 4
file content (395 lines) | stat: -rw-r--r-- 11,527 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
/* Copyright 2016-present Facebook, Inc.
 * Licensed under the Apache License, Version 2.0 */

#include "WatchmanConnection.h"

#include <folly/ExceptionWrapper.h>
#include <folly/SocketAddress.h>
#include <folly/Subprocess.h>
#include <folly/experimental/bser/Bser.h>
#include <folly/futures/InlineExecutor.h>

namespace watchman {

using namespace folly::bser;
using namespace folly;

// Ordered with the most likely kind first
static const std::vector<dynamic> kUnilateralLabels{"subscription", "log"};

static const dynamic kError("error");
static const dynamic kCapabilities("capabilities");

// We'll just dispatch bser decodes and callbacks inline unless they
// give us an alternative environment
static InlineExecutor inlineExecutor;

WatchmanConnection::WatchmanConnection(
    EventBase* eventBase,
    Optional<std::string>&& sockPath,
    Optional<WatchmanConnection::Callback>&& callback,
    Executor* cpuExecutor)
    : eventBase_(eventBase),
      sockPath_(std::move(sockPath)),
      callback_(std::move(callback)),
      cpuExecutor_(cpuExecutor ? cpuExecutor : &inlineExecutor),
      versionCmd_(nullptr),
      bufQ_(IOBufQueue::cacheChainLength()) {
  CHECK_NOTNULL(eventBase);
}

WatchmanConnection::~WatchmanConnection() {
  close();
}

folly::Future<std::string> WatchmanConnection::getSockPath() {
  // Take explicit configuration first
  if (sockPath_.hasValue()) {
    return makeFuture(sockPath_.value());
  }

  // Else use the environmental variable used by watchman to report
  // the active socket path
  auto var = getenv("WATCHMAN_SOCK");
  if (var && *var) {
    return makeFuture(std::string(var));
  }

  return via(cpuExecutor_, [] {
    // Else discover it from the CLI
    folly::Subprocess proc(
        {"watchman", "--output-encoding=bser", "get-sockname"},
        folly::Subprocess::Options().pipeStdout().pipeStderr().usePath());
    SCOPE_FAIL {
      // Always clean up to avoid Subprocess asserting on destruction
      proc.kill();
      proc.wait();
    };
    auto out_pair = proc.communicate();
    auto result = parseBser(out_pair.first);
    proc.waitChecked();
    return result["sockname"].asString();
  });
}

Future<dynamic> WatchmanConnection::connect(folly::dynamic versionArgs) {
  if (!versionArgs.isObject()) {
    throw WatchmanError("versionArgs must be object");
  }
  versionCmd_ = folly::dynamic::array("version", versionArgs);

  auto res = getSockPath().then(
    [shared_this=shared_from_this()] (std::string&& path) {
      shared_this->eventBase_->runInEventBaseThread([=] {
        folly::SocketAddress addr;
        addr.setFromPath(path);

        shared_this->sock_ =
          folly::AsyncSocket::newSocket(shared_this->eventBase_);
        shared_this->sock_->connect(shared_this.get(), addr);
      }
    );

    return shared_this->connectPromise_.getFuture();
  });
  return res;
}

void WatchmanConnection::close() {
  if (closing_) {
    return;
  }
  closing_ = true;
  if (sock_) {
    eventBase_->runImmediatelyOrRunInEventBaseThreadAndWait([this] {
      sock_->close();
      sock_.reset();
    });
  }
  failQueuedCommands(
      make_exception_wrapper<WatchmanError>(
          "WatchmanConnection::close() was called"));
}

// The convention for Watchman responses is that they represent
// an error if they contain the "error" key.  We want to report
// those as exceptions, but it is easier to do that via a Try
Try<dynamic> WatchmanConnection::watchmanResponseToTry(dynamic&& value) {
  auto error = value.get_ptr(kError);
  if (error) {
    return Try<dynamic>(make_exception_wrapper<WatchmanResponseError>(value));
  }
  return Try<dynamic>(std::move(value));
}

void WatchmanConnection::connectSuccess() noexcept {
  try {
    sock_->setReadCB(this);
    sock_->setCloseOnExec();

    run(versionCmd_).then(
      [shared_this=shared_from_this()] (dynamic&& result) {
        // If there is no "capabilities" key then the version of
        // watchman is too old; treat this as an error
        if (!result.get_ptr(kCapabilities)) {
          result["error"] =
              "This watchman server has no support for capabilities, "
              "please upgrade to the current stable version of watchman";
          shared_this->connectPromise_.setTry(
            shared_this->watchmanResponseToTry(std::move(result)));
          return;
        }
        shared_this->connectPromise_.setValue(std::move(result));
      }
    ).onError(
      [shared_this=shared_from_this()]
      (const folly::exception_wrapper& e) {
        shared_this->connectPromise_.setException(e);
      }
    );
  } catch(const std::exception& e) {
    connectPromise_.setException(
      folly::exception_wrapper(std::current_exception(), e));
  } catch(...) {
    connectPromise_.setException(
      folly::exception_wrapper(std::current_exception()));
  }
}

void WatchmanConnection::connectErr(
    const folly::AsyncSocketException& ex) noexcept {
  connectPromise_.setException(ex);
}

WatchmanConnection::QueuedCommand::QueuedCommand(const dynamic& command)
    : cmd(command) {}

Future<dynamic> WatchmanConnection::run(const dynamic& command) noexcept {
  auto cmd = std::make_shared<QueuedCommand>(command);
  if (broken_) {
    cmd->promise.setException(WatchmanError("The connection was broken"));
    return cmd->promise.getFuture();
  }
  if (!sock_) {
    cmd->promise.setException(WatchmanError(
        "No socket (did you call connect() and check result for exceptions?)"));
    return cmd->promise.getFuture();
  }

  bool shouldWrite;
  {
    std::lock_guard<std::mutex> g(mutex_);
    // We only need to call sendCommand if we don't have a command in
    // progress; the completion handler will trigger it once we receive
    // the response
    shouldWrite = commandQ_.empty();
    commandQ_.push_back(cmd);
  }

  if (shouldWrite) {
    eventBase_->runInEventBaseThread(
      [shared_this=shared_from_this()] {
        shared_this->sendCommand();
      }
    );
  }

  return cmd->promise.getFuture();
}

// Generate a failure for all queued commands
void WatchmanConnection::failQueuedCommands(
    const folly::exception_wrapper& ex) {
  std::lock_guard<std::mutex> g(mutex_);
  auto q = commandQ_;
  commandQ_.clear();

  broken_ = true;
  for (auto& cmd : q) {
    if (!cmd->promise.isFulfilled()) {
      cmd->promise.setException(ex);
    }
  }

  // If the user has explicitly closed the connection no need for callback
  if (callback_ && !closing_) {
    cpuExecutor_->add([shared_this=shared_from_this(), ex] {
      (*(shared_this->callback_))(folly::Try<folly::dynamic>(ex));
    });
  }
}

// Sends the next eligible command to the Watchman service
void WatchmanConnection::sendCommand(bool pop) {
  std::shared_ptr<QueuedCommand> cmd;

  {
    std::lock_guard<std::mutex> g(mutex_);

    if (pop) {
      // We finished processing this one, discard it and focus
      // on the next item, if any.
      commandQ_.pop_front();
    }
    if (commandQ_.empty()) {
      return;
    }
    cmd = commandQ_.front();
  }

  sock_->writeChain(this, toBserIOBuf(cmd->cmd, serialization_opts()));
}

void WatchmanConnection::popAndSendCommand() {
  sendCommand(/* pop = */ true);
}

// Called when AsyncSocket::writeChain completes
void WatchmanConnection::writeSuccess() noexcept {
  // Don't care particularly
}

// Called when AsyncSocket::writeChain fails
void WatchmanConnection::writeErr(
    size_t,
    const folly::AsyncSocketException& ex) noexcept {
  failQueuedCommands(ex);
}

// Called when AsyncSocket wants to give us data
void WatchmanConnection::getReadBuffer(void** bufReturn, size_t* lenReturn) {
  std::lock_guard<std::mutex> g(mutex_);
  const auto ret = bufQ_.preallocate(2048, 2048);
  *bufReturn = ret.first;
  *lenReturn = ret.second;
}

// Called when AsyncSocket gave us data
void WatchmanConnection::readDataAvailable(size_t len) noexcept {
  {
    std::lock_guard<std::mutex> g(mutex_);
    bufQ_.postallocate(len);
  }
  cpuExecutor_->add([shared_this=shared_from_this()] {
    shared_this->decodeNextResponse();
  });
}

std::unique_ptr<folly::IOBuf> WatchmanConnection::splitNextPdu() {
  std::lock_guard<std::mutex> g(mutex_);
  if (!bufQ_.front()) {
    return nullptr;
  }

  // Do we have enough data to decode the next item?
  size_t pdu_len = 0;
  try {
    pdu_len = decodePduLength(bufQ_.front());
  } catch (const std::out_of_range&) {
    // Don't have enough data yet
    return nullptr;
  }

  if (pdu_len > bufQ_.chainLength()) {
    // Don't have enough data yet
    return nullptr;
  }

  // Remove the PDU blob from the front of the chain
  return bufQ_.split(pdu_len);
}

// Try to peel off one or more PDU's from our buffer queue.
// Decode each complete PDU from BSER -> dynamic and dispatch
// either the associated QueuedCommand or to the callback_ for
// unilateral responses.
// This is executed via the cpuExecutor.  We only allow one
// thread to carry out the decoding at a time so that the callbacks
// are triggered in the order that they are received.  It is possible
// for us to receive a large PDU followed by a small one and for the
// small one to finish decoding before the large one, so we must
// serialize the dispatching.
void WatchmanConnection::decodeNextResponse() {
  {
    std::lock_guard<std::mutex> g(mutex_);
    if (decoding_) {
      return;
    }
    decoding_ = true;
  }

  SCOPE_EXIT {
    std::lock_guard<std::mutex> g(mutex_);
    decoding_ = false;
  };

  while (true) {
    auto pdu = splitNextPdu();
    if (!pdu) {
      return;
    }

    try {
      auto decoded = parseBser(pdu.get());

      bool is_unilateral = false;
      // Check for a unilateral response
      for (const auto& k : kUnilateralLabels) {
        if (decoded.get_ptr(k)) {
          // This is a unilateral response
          if (callback_.hasValue()) {
            callback_.value()(watchmanResponseToTry(std::move(decoded)));
            is_unilateral = true;
            break;
          }
          // No callback; usage error :-/
          failQueuedCommands(
              std::runtime_error("No unilateral callback has been installed"));
          return;
        }
      }
      if (is_unilateral) {
        continue;
      }

      // It's actually a command response; get the cmd so that we
      // can fulfil its promise
      std::shared_ptr<QueuedCommand> cmd;
      {
        std::lock_guard<std::mutex> g(mutex_);
        if (commandQ_.empty()) {
          failQueuedCommands(
              std::runtime_error("No commands have been queued"));
          return;
        }
        cmd = commandQ_.front();
      }

      // Dispatch outside of the lock in case it tries to send another
      // command
      cmd->promise.setTry(watchmanResponseToTry(std::move(decoded)));

      // Now we're in a position to send the next queued command.
      // We remove it after dispatching the try above in case that
      // queued up more commands; we want to be the one thing that
      // is responsible for sending the next queued command here
      popAndSendCommand();
    } catch (const std::exception& ex) {
      failQueuedCommands(ex);
      return;
    }
  }
}

// Called when AsyncSocket hits EOF
void WatchmanConnection::readEOF() noexcept {
  failQueuedCommands(
      std::system_error(ENOTCONN, std::system_category(), "connection closed"));
}

// Called when AsyncSocket has a read error
void WatchmanConnection::readErr(
    const folly::AsyncSocketException& ex) noexcept {
  failQueuedCommands(ex);
}
} // namespace watchman