File: message_pump_epoll.cc

package info (click to toggle)
chromium 120.0.6099.224-1~deb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 6,112,112 kB
  • sloc: cpp: 32,907,025; ansic: 8,148,123; javascript: 3,679,536; python: 2,031,248; asm: 959,718; java: 804,675; xml: 617,256; sh: 111,417; objc: 100,835; perl: 88,443; cs: 53,032; makefile: 29,579; fortran: 24,137; php: 21,162; tcl: 21,147; sql: 20,809; ruby: 17,735; pascal: 12,864; yacc: 8,045; lisp: 3,388; lex: 1,323; ada: 727; awk: 329; jsp: 267; csh: 117; exp: 43; sed: 37
file content (381 lines) | stat: -rw-r--r-- 13,238 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/message_loop/message_pump_epoll.h"

#include <sys/epoll.h>
#include <sys/eventfd.h>

#include <cstddef>
#include <cstdint>
#include <utility>

#include "base/auto_reset.h"
#include "base/check_op.h"
#include "base/memory/ref_counted.h"
#include "base/posix/eintr_wrapper.h"
#include "base/ranges/algorithm.h"
#include "base/threading/thread_checker.h"
#include "base/trace_event/base_tracing.h"
#include "third_party/abseil-cpp/absl/types/optional.h"

namespace base {

MessagePumpEpoll::MessagePumpEpoll() {
  epoll_.reset(epoll_create(/*ignored_but_must_be_positive=*/1));
  PCHECK(epoll_.is_valid());

  wake_event_.reset(eventfd(0, EFD_NONBLOCK));
  PCHECK(wake_event_.is_valid());

  epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
  int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
  PCHECK(rv == 0);
}

MessagePumpEpoll::~MessagePumpEpoll() = default;

bool MessagePumpEpoll::WatchFileDescriptor(int fd,
                                           bool persistent,
                                           int mode,
                                           FdWatchController* controller,
                                           FdWatcher* watcher) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
              "persistent", persistent, "watch_read", mode & WATCH_READ,
              "watch_write", mode & WATCH_WRITE);

  const InterestParams params{
      .fd = fd,
      .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
      .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
      .one_shot = !persistent,
  };

  auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
  EpollEventEntry& entry = it->second;
  scoped_refptr<Interest> existing_interest = controller->epoll_interest();
  if (existing_interest && existing_interest->params().IsEqual(params)) {
    // WatchFileDescriptor() has already been called for this controller at
    // least once before, and as in the most common cases, it is now being
    // called again with the same parameters.
    //
    // We don't need to allocate and register a new Interest in this case, but
    // we can instead reactivate the existing (presumably deactivated,
    // non-persistent) Interest.
    existing_interest->set_active(true);
  } else {
    entry.interests.push_back(controller->AssignEpollInterest(params));
    if (existing_interest) {
      UnregisterInterest(existing_interest);
    }
  }

  if (is_new_fd_entry) {
    AddEpollEvent(entry);
  } else {
    UpdateEpollEvent(entry);
  }

  controller->set_epoll_pump(weak_ptr_factory_.GetWeakPtr());
  controller->set_watcher(watcher);
  return true;
}

void MessagePumpEpoll::Run(Delegate* delegate) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  RunState run_state(delegate);
  AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
  for (;;) {
    // Do some work and see if the next task is ready right away.
    Delegate::NextWorkInfo next_work_info = delegate->DoWork();
    const bool immediate_work_available = next_work_info.is_immediate();
    if (run_state.should_quit) {
      break;
    }

    // Process any immediately ready IO event, but don't wait for more yet.
    const bool processed_events = WaitForEpollEvents(TimeDelta());
    if (run_state.should_quit) {
      break;
    }

    if (immediate_work_available || processed_events) {
      continue;
    }

    const bool did_idle_work = delegate->DoIdleWork();
    if (run_state.should_quit) {
      break;
    }
    if (did_idle_work) {
      continue;
    }

    TimeDelta timeout = TimeDelta::Max();
    DCHECK(!next_work_info.delayed_run_time.is_null());
    if (!next_work_info.delayed_run_time.is_max()) {
      timeout = next_work_info.remaining_delay();
    }
    delegate->BeforeWait();
    WaitForEpollEvents(timeout);
    if (run_state.should_quit) {
      break;
    }
  }
}

void MessagePumpEpoll::Quit() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(run_state_) << "Quit() called outside of Run()";
  run_state_->should_quit = true;
}

void MessagePumpEpoll::ScheduleWork() {
  const uint64_t value = 1;
  ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));

  // EAGAIN here implies that the write() would overflow of the event counter,
  // which is a condition we can safely ignore. It implies that the event
  // counter is non-zero and therefore readable, which is enough to ensure that
  // any pending wait eventually wakes up.
  DPCHECK(n == sizeof(value) || errno == EAGAIN);
}

void MessagePumpEpoll::ScheduleDelayedWork(
    const Delegate::NextWorkInfo& next_work_info) {
  // Nothing to do. This can only be called from the same thread as Run(), so
  // the pump must be in between waits. The scheduled work therefore will be
  // seen in time for the next wait.
}

void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  const uint32_t events = entry.ComputeActiveEvents();
  epoll_event event{.events = events, .data = {.ptr = &entry}};
  int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
  DPCHECK(rv == 0);
  entry.registered_events = events;
}

void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  const uint32_t events = entry.ComputeActiveEvents();
  if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
    // Persistent events don't need to be modified if no bits are changing.
    return;
  }
  epoll_event event{.events = events, .data = {.ptr = &entry}};
  int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
  DPCHECK(rv == 0);
  entry.registered_events = events;
}

void MessagePumpEpoll::UnregisterInterest(
    const scoped_refptr<Interest>& interest) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  const int fd = interest->params().fd;
  auto entry_it = entries_.find(fd);
  DCHECK(entry_it != entries_.end());

  EpollEventEntry& entry = entry_it->second;
  auto& interests = entry.interests;
  auto* it = ranges::find(interests, interest);
  DCHECK(it != interests.end());
  interests.erase(it);

  if (interests.empty()) {
    entries_.erase(entry_it);
    int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, fd, nullptr);
    DPCHECK(rv == 0);
  } else {
    UpdateEpollEvent(entry);
  }
}

bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
  // are integral milliseconds. Round up to the next millisecond.
  // TODO(https://crbug.com/1382894): Consider higher-resolution timeouts.
  const int epoll_timeout =
      timeout.is_max() ? -1
                       : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
  epoll_event events[16];
  const int epoll_result =
      epoll_wait(epoll_.get(), events, std::size(events), epoll_timeout);
  if (epoll_result < 0) {
    DPCHECK(errno == EINTR);
    return false;
  }

  if (epoll_result == 0) {
    return false;
  }

  const base::span<epoll_event> ready_events(events,
                                             static_cast<size_t>(epoll_result));
  for (auto& e : ready_events) {
    if (e.data.ptr == &wake_event_) {
      // Wake-up events are always safe to handle immediately. Unlike other
      // events used by MessagePumpEpoll they also don't point to an
      // EpollEventEntry, so we handle them separately here.
      HandleWakeUp();
      e.data.ptr = nullptr;
      continue;
    }

    // To guard against one of the ready events unregistering and thus
    // invalidating one of the others here, first link each entry to the
    // corresponding epoll_event returned by epoll_wait(). We do this before
    // dispatching any events, and the second pass below will only dispatch an
    // event if its epoll_event data is still valid.
    auto& entry = EpollEventEntry::FromEpollEvent(e);
    DCHECK(!entry.active_event);
    EpollEventEntry::FromEpollEvent(e).active_event = &e;
  }

  for (auto& e : ready_events) {
    if (e.data.ptr) {
      auto& entry = EpollEventEntry::FromEpollEvent(e);
      entry.active_event = nullptr;
      OnEpollEvent(entry, e.events);
    }
  }

  return true;
}

void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  const bool readable = (events & EPOLLIN) != 0;
  const bool writable = (events & EPOLLOUT) != 0;

  // Under different circumstances, peer closure may raise both/either EPOLLHUP
  // and/or EPOLLERR. Treat them as equivalent.
  const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;

  // Copy the set of Interests, since interests may be added to or removed from
  // `entry` during the loop below. This copy is inexpensive in practice
  // because the size of this vector is expected to be very small (<= 2).
  auto interests = entry.interests;

  // Any of these interests' event handlers may destroy any of the others'
  // controllers. Start all of them watching for destruction before we actually
  // dispatch any events.
  for (const auto& interest : interests) {
    interest->WatchForControllerDestruction();
  }

  for (const auto& interest : interests) {
    if (!interest->active()) {
      continue;
    }

    const bool can_read = (readable || disconnected) && interest->params().read;
    const bool can_write = writable && interest->params().write;
    if (!can_read && !can_write) {
      // If this Interest is active but not watching for whichever event was
      // raised here, there's nothing to do. This can occur if a descriptor has
      // multiple active interests, since only one interest needs to be
      // satisfied in order for us to process an epoll event.
      continue;
    }

    if (interest->params().one_shot) {
      // This is a one-shot event watch which is about to be triggered. We
      // deactivate the interest and update epoll immediately. The event handler
      // may reactivate it.
      interest->set_active(false);
      UpdateEpollEvent(entry);
    }

    if (!interest->was_controller_destroyed()) {
      HandleEvent(entry.fd, can_read, can_write, interest->controller());
    }
  }

  for (const auto& interest : interests) {
    interest->StopWatchingForControllerDestruction();
  }
}

void MessagePumpEpoll::HandleEvent(int fd,
                                   bool can_read,
                                   bool can_write,
                                   FdWatchController* controller) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
  // HandleNotification() is called outside of Run() (e.g. in unit tests).
  Delegate::ScopedDoWorkItem scoped_do_work_item;
  if (run_state_) {
    scoped_do_work_item = run_state_->delegate->BeginWorkItem();
  }

  // Trace events must begin after the above BeginWorkItem() so that the
  // ensuing "ThreadController active" outscopes all the events under it.
  TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
              controller->created_from_location(), "fd", fd, "can_read",
              can_read, "can_write", can_write, "context",
              static_cast<void*>(controller));
  TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
      controller->created_from_location().file_name());
  if (can_read && can_write) {
    bool controller_was_destroyed = false;
    bool* previous_was_destroyed_flag =
        std::exchange(controller->was_destroyed_, &controller_was_destroyed);

    controller->OnFdWritable();
    if (!controller_was_destroyed) {
      controller->OnFdReadable();
    }
    if (!controller_was_destroyed) {
      controller->was_destroyed_ = previous_was_destroyed_flag;
    } else if (previous_was_destroyed_flag) {
      *previous_was_destroyed_flag = true;
    }
  } else if (can_write) {
    controller->OnFdWritable();
  } else if (can_read) {
    controller->OnFdReadable();
  }
}

void MessagePumpEpoll::HandleWakeUp() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  uint64_t value;
  ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
  DPCHECK(n == sizeof(value));
}

MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}

MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
  if (active_event) {
    DCHECK_EQ(this, active_event->data.ptr);
    active_event->data.ptr = nullptr;
  }
}

uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() {
  uint32_t events = 0;
  bool one_shot = true;
  for (const auto& interest : interests) {
    if (!interest->active()) {
      continue;
    }
    const InterestParams& params = interest->params();
    events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
    one_shot &= params.one_shot;
  }
  if (events != 0 && one_shot) {
    return events | EPOLLONESHOT;
  }
  return events;
}

}  // namespace base