File: local_data_source.cc

package info (click to toggle)
chromium 138.0.7204.183-1~deb12u1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm-proposed-updates
  • size: 6,080,960 kB
  • sloc: cpp: 34,937,079; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,954; asm: 946,768; xml: 739,971; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,811; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (404 lines) | stat: -rw-r--r-- 13,741 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/ash/chromebox_for_meetings/artemis/local_data_source.h"

#include "base/hash/hash.h"
#include "base/i18n/time_formatting.h"
#include "base/process/launch.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_split.h"
#include "base/strings/string_util.h"
#include "base/time/time.h"

namespace ash::cfm {

namespace {

// Local convenience aliases
using mojom::DataFilter::FilterType::CHANGE;
using mojom::DataFilter::FilterType::REGEX;

// Regex used to separate timestamps and severity from rest of data.
// Note that the timestamp and severity fields are both optional fields.
// In the event that a data source produces data that doesn't have one
// or the other, defaults will be provided.
constexpr LazyRE2 kFullLogLineRegex = {
    "^(?:([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9:\\.]+Z) )?"
    "(?:(EMERG|ALERT|CRIT|SEVERE|ERR|ERROR|WARNING|INFO|NOTICE"
    "|DEBUG|VERBOSE[1-4]) )?((?s).*)"};

// Number of characters to ingest per log line to create unique hash.
constexpr size_t kLogMsgHashSize = 50;

// Dangerous internal buffer size in bytes. In practice, we should never hit
// this limit due to previous memory-related mitigations, but let's add safety
// tracking just in case.
constexpr size_t kDangerousBufferSize = 2 * 1000 * 1000;  // 2Mb

}  // namespace

LocalDataSource::LocalDataSource(base::TimeDelta poll_rate,
                                 bool data_needs_redacting,
                                 bool is_incremental)
    : poll_rate_(poll_rate),
      data_needs_redacting_(data_needs_redacting),
      is_incremental_(is_incremental),
      redactor_(nullptr) {}

inline LocalDataSource::~LocalDataSource() = default;

void LocalDataSource::Fetch(FetchCallback callback) {
  if (data_buffer_.empty()) {
    std::move(callback).Run({});
    return;
  }

  // data_buffer_ is a deque, so move data to a proper vector first.
  std::vector<std::string> return_data;
  std::move(data_buffer_.begin(), data_buffer_.end(),
            std::back_inserter(return_data));
  data_buffer_.clear();
  data_buffer_size_ = 0;

  std::move(callback).Run(std::move(return_data));
}

void LocalDataSource::AddWatchDog(
    mojom::DataFilterPtr filter,
    mojo::PendingRemote<mojom::DataWatchDog> pending_watch_dog,
    AddWatchDogCallback callback) {
  if (!IsWatchDogFilterValid(filter)) {
    std::move(callback).Run(false /* success */);
    return;
  }

  mojo::Remote<mojom::DataWatchDog> remote(std::move(pending_watch_dog));
  std::string watchdog_name;

  if (filter->filter_type == CHANGE) {
    // Trigger CHANGE watchdogs immediately with the last known data.
    const std::string data_joined = base::JoinString(last_unique_data_, "\n");
    remote->OnNotify(data_joined);

    change_based_watchdogs_.Add(std::move(remote));
    watchdog_name = "CHANGE";
  } else {
    // Pattern is guaranteed to be populated based on the
    // results of IsWatchDogFilterValid().
    const std::string& pattern = filter->pattern.value();
    if (regex_cache_.count(pattern) == 0) {
      regex_cache_[pattern] = std::make_unique<RE2>(pattern);
    }

    regex_based_watchdogs_[pattern].Add(std::move(remote));
    watchdog_name = pattern;
  }

  VLOG(1) << "Watchdog added to '" << GetDisplayName() << "'; will match on "
          << watchdog_name;
  std::move(callback).Run(true /* success */);
}

void LocalDataSource::Flush() {
  // No-op by default. Use this function to perform any cleanup
  // task needed after a successful processing step.
  return;
}

void LocalDataSource::StartCollectingData() {
  poll_timer_.Start(FROM_HERE, poll_rate_,
                    base::BindRepeating(&LocalDataSource::FillDataBuffer,
                                        weak_ptr_factory_.GetWeakPtr()));
}

void LocalDataSource::AssignDeviceID(const std::string& id) {
  device_id_ = id;
}

void LocalDataSource::FillDataBuffer() {
  // If we've reached our max limit, halt buffer fills temporarily.
  // This indicates that data is not being consumed by the aggregator,
  // so there must be some kind of mojom hang-up. We'll resume when
  // the problem is corrected.
  if (IsDataBufferOverMaxLimit()) {
    if (data_buffer_size_ >= kDangerousBufferSize) {
      LOG(WARNING) << GetDisplayName() << " has reached a dangerously high "
                   << "buffer allocation of " << data_buffer_size_ << " bytes.";
    }
    return;
  }

  std::vector<std::string> next_data = GetNextData();
  if (next_data.empty()) {
    return;
  }

  if (!is_incremental_) {
    // If non-incremental sources (e.g. commands) return the same
    // data as the previous invocation, return early. This will
    // skip any watchdog checks and will prevent the duplicate
    // data from eventually being enqueued to the cloud logger.
    // Note that incremental sources (e.g. logs) will always have
    // new data, so we don't need to check for duplicates.
    if (next_data == last_unique_data_) {
      return;
    }

    // Update our last known unique data.
    last_unique_data_ = next_data;

    // Fire any CHANGE watchdogs. Note that these are not supported
    // for incremental sources as they will always be changing.
    if (!change_based_watchdogs_.empty()) {
      // The OnNotify() callbacks expect a single string of data, so join
      // the vector first.
      const std::string data_joined = base::JoinString(next_data, "\n");
      FireChangeWatchdogCallbacks(data_joined);
    }
  }

  for (const auto& line : next_data) {
    CheckRegexWatchdogsAndFireCallbacks(line);
  }

  if (data_needs_redacting_) {
    RedactDataBuffer(next_data);
  }

  SerializeDataBuffer(next_data);

  // Calculate the size of our serialized data so we can
  // update our internal metrics.
  size_t next_data_size = 0;
  for (const auto& line : next_data) {
    next_data_size += line.size();
  }

  std::move(next_data.begin(), next_data.end(),
            std::back_inserter(data_buffer_));

  data_buffer_size_ += next_data_size;
  VLOG(3) << GetDisplayName() << " has a current buffer allocation of "
          << data_buffer_size_ << " bytes.";
}

bool LocalDataSource::IsDataBufferOverMaxLimit() {
  return data_buffer_size_ > kMaxInternalBufferSize;
}

void LocalDataSource::RedactDataBuffer(std::vector<std::string>& buffer) {
  for (size_t i = 0; i < buffer.size(); i++) {
    buffer[i] = redactor_.Redact(buffer[i]);
  }
}

void LocalDataSource::SerializeDataBuffer(std::vector<std::string>& buffer) {
  if (buffer.empty()) {
    return;
  }

  // Set defaults for data that doesn't have timestamps or severity
  // levels. Use the current time for the timestamp.
  auto default_timestamp =
      (base::Time::Now() - base::Time::UnixEpoch()).InMicroseconds();
  const proto::LogSeverity default_severity = proto::LOG_SEVERITY_DEFAULT;

  // Build each LogEntry object, then replace the buffer data with
  // the serialized entry.
  for (size_t i = 0; i < buffer.size(); i++) {
    proto::LogEntry entry;
    BuildLogEntryFromLogLine(entry, buffer[i], default_timestamp,
                             default_severity);

    std::string serialized_data;
    entry.SerializeToString(&serialized_data);
    buffer[i] = std::move(serialized_data);
  }
}

void LocalDataSource::BuildLogEntryFromLogLine(
    proto::LogEntry& entry,
    const std::string& line,
    const uint64_t default_timestamp,
    const proto::LogSeverity& default_severity) {
  std::string timestamp;
  std::string severity;
  std::string log_msg;

  RE2& regex = GetLogLineRegex();

  if (!RE2::PartialMatch(line, regex, &timestamp, &severity, &log_msg)) {
    LOG(ERROR) << "Unable to parse log line properly: " << line;
    entry.set_timestamp_micros(default_timestamp);
    entry.set_severity(default_severity);
    entry.set_text_payload(line);
  } else {
    uint64_t time_since_epoch;

    // There are three cases to consider here:
    // 1. The timestamp is populated and was parsed properly. Pass
    //    to TimestampStringToUnixTime() to convert to Unix epoch.
    // 2. The source explicitly does not expect timestamps to be
    //    present. Apply the passed-in default. Note that most (if
    //    not all) non-incremental sources fall under this category.
    // 3. The timestamp is not populated, and this is a data source
    //    that expects timestamps to be present, so this line is
    //    likely a new line from a previous log. Carry forward the
    //    last timestamp that was recorded, plus 1 microsecond.
    if (!timestamp.empty()) {
      time_since_epoch = TimestampStringToUnixTime(timestamp);
      if (time_since_epoch != 0) {
        last_recorded_timestamp_ = time_since_epoch;
      }
    } else if (!AreTimestampsExpected()) {
      time_since_epoch = default_timestamp;
    } else {
      time_since_epoch = ++last_recorded_timestamp_;
    }

    // Use the log source and timestamp to create a unique ID that can be
    // used to identify this entry. This will aid in de-duplication on the
    // server side.
    if (is_incremental_ && time_since_epoch != 0) {
      entry.set_insert_id(GetUniqueInsertId(log_msg));
    }

    // Even if the match succeeded, the timestamps and severity are optional
    // matches, so supply a default if they aren't populated.
    entry.set_timestamp_micros(time_since_epoch);
    entry.set_severity(severity.empty() ? default_severity
                                        : SeverityStringToEnum(severity));
    entry.set_text_payload(log_msg);
  }
}

const std::string LocalDataSource::GetUniqueInsertId(
    const std::string& log_msg) {
  std::string to_be_hashed = device_id_ + ":" + GetDisplayName() + ":" +
                             log_msg.substr(0, kLogMsgHashSize);
  size_t hash = base::FastHash(to_be_hashed);
  return base::NumberToString(hash);
}

RE2& LocalDataSource::GetLogLineRegex() {
  // Default regex. Data sources can override this if necessary.
  // See notes in local_data_source.h for restrictions.
  return *kFullLogLineRegex;
}

uint64_t LocalDataSource::TimestampStringToUnixTime(
    const std::string& timestamp) {
  base::Time time;
  if (!base::Time::FromString(timestamp.c_str(), &time)) {
    LOG(ERROR) << "Unable to parse timestamp: " << timestamp;
    return 0;
  }
  return (time - base::Time::UnixEpoch()).InMicroseconds();
}

bool LocalDataSource::AreTimestampsExpected() const {
  // By default, assume that non-incremental sources will not supply
  // timestamps, and assume the opposite for incremental sources. Data
  // sources that do not follow this rule should override this function.
  return is_incremental_;
}

proto::LogSeverity LocalDataSource::SeverityStringToEnum(
    const std::string& severity) {
  if (severity == "EMERGENCY" || severity == "EMERG") {
    return proto::LOG_SEVERITY_EMERGENCY;
  } else if (severity == "ALERT") {
    return proto::LOG_SEVERITY_ALERT;
  } else if (severity == "CRITICAL" || severity == "CRIT") {
    return proto::LOG_SEVERITY_CRITICAL;
  } else if (severity == "ERROR" || severity == "ERR") {
    return proto::LOG_SEVERITY_ERROR;
  } else if (severity == "WARNING") {
    return proto::LOG_SEVERITY_WARNING;
  } else if (severity == "NOTICE") {
    return proto::LOG_SEVERITY_NOTICE;
  } else if (severity == "INFO") {
    return proto::LOG_SEVERITY_INFO;
  } else if (severity == "DEBUG" || severity == "VERBOSE1" ||
             severity == "VERBOSE2" || severity == "VERBOSE3" ||
             severity == "VERBOSE4") {
    return proto::LOG_SEVERITY_DEBUG;
  } else {
    LOG(ERROR) << "Unable to parse severity: " << severity;
    return proto::LOG_SEVERITY_DEFAULT;
  }
}

bool LocalDataSource::IsWatchDogFilterValid(mojom::DataFilterPtr& filter) {
  if (filter->filter_type != CHANGE && filter->filter_type != REGEX) {
    LOG(ERROR) << "Somehow received a DataFilter of unknown type "
               << filter->filter_type;
    return false;
  }

  if (filter->filter_type == CHANGE && is_incremental_) {
    LOG(ERROR) << "Incremental sources do not support change-based watchdogs";
    return false;
  }

  if (filter->filter_type == REGEX) {
    if (filter->pattern.value_or("").empty()) {
      LOG(ERROR) << "Regex watchdog was requested, but the pattern is empty";
      return false;
    }

    const std::string& pattern = filter->pattern.value();

    if (pattern == "*") {
      LOG(ERROR) << "Pattern '*' is too loose. Use CHANGE watchdog instead.";
      return false;
    }

    RE2 test_regex(pattern);

    if (!test_regex.ok()) {
      LOG(ERROR) << "Regex '" << pattern
                 << "' is invalid (err: " << test_regex.error() << ")";
      return false;
    }

  } else if (filter->filter_type == CHANGE) {
    if (filter->pattern.has_value()) {
      LOG(ERROR) << "CHANGE filter requested with pattern";
      return false;
    }
  }

  return true;
}

void LocalDataSource::FireChangeWatchdogCallbacks(const std::string& data) {
  VLOG(2) << "'" << GetDisplayName()
          << "' matched on 'CHANGE' watchdog. Notifying observers.";
  for (const auto& remote : change_based_watchdogs_) {
    remote->OnNotify(data);
  }
}

void LocalDataSource::CheckRegexWatchdogsAndFireCallbacks(
    const std::string& data) {
  for (const auto& it : regex_based_watchdogs_) {
    const auto& pattern = it.first;
    const auto& remotes = it.second;

    if (!RE2::PartialMatch(data, *regex_cache_[pattern])) {
      continue;
    }

    VLOG(2) << "'" << GetDisplayName() << "' matched on '" << pattern
            << "' watchdog. Notifying observers.";

    for (auto& remote : remotes) {
      remote->OnNotify(data);
    }
  }
}

}  // namespace ash::cfm