1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
|
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/ash/chromebox_for_meetings/artemis/local_data_source.h"
#include "base/hash/hash.h"
#include "base/i18n/time_formatting.h"
#include "base/process/launch.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_split.h"
#include "base/strings/string_util.h"
#include "base/time/time.h"
namespace ash::cfm {
namespace {
// Local convenience aliases
using mojom::DataFilter::FilterType::CHANGE;
using mojom::DataFilter::FilterType::REGEX;
// Regex used to separate timestamps and severity from rest of data.
// Note that the timestamp and severity fields are both optional fields.
// In the event that a data source produces data that doesn't have one
// or the other, defaults will be provided.
constexpr LazyRE2 kFullLogLineRegex = {
"^(?:([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9:\\.]+Z) )?"
"(?:(EMERG|ALERT|CRIT|SEVERE|ERR|ERROR|WARNING|INFO|NOTICE"
"|DEBUG|VERBOSE[1-4]) )?((?s).*)"};
// Number of characters to ingest per log line to create unique hash.
constexpr size_t kLogMsgHashSize = 50;
// Dangerous internal buffer size in bytes. In practice, we should never hit
// this limit due to previous memory-related mitigations, but let's add safety
// tracking just in case.
constexpr size_t kDangerousBufferSize = 2 * 1000 * 1000; // 2Mb
} // namespace
LocalDataSource::LocalDataSource(base::TimeDelta poll_rate,
bool data_needs_redacting,
bool is_incremental)
: poll_rate_(poll_rate),
data_needs_redacting_(data_needs_redacting),
is_incremental_(is_incremental),
redactor_(nullptr) {}
inline LocalDataSource::~LocalDataSource() = default;
void LocalDataSource::Fetch(FetchCallback callback) {
if (data_buffer_.empty()) {
std::move(callback).Run({});
return;
}
// data_buffer_ is a deque, so move data to a proper vector first.
std::vector<std::string> return_data;
std::move(data_buffer_.begin(), data_buffer_.end(),
std::back_inserter(return_data));
data_buffer_.clear();
data_buffer_size_ = 0;
std::move(callback).Run(std::move(return_data));
}
void LocalDataSource::AddWatchDog(
mojom::DataFilterPtr filter,
mojo::PendingRemote<mojom::DataWatchDog> pending_watch_dog,
AddWatchDogCallback callback) {
if (!IsWatchDogFilterValid(filter)) {
std::move(callback).Run(false /* success */);
return;
}
mojo::Remote<mojom::DataWatchDog> remote(std::move(pending_watch_dog));
std::string watchdog_name;
if (filter->filter_type == CHANGE) {
// Trigger CHANGE watchdogs immediately with the last known data.
const std::string data_joined = base::JoinString(last_unique_data_, "\n");
remote->OnNotify(data_joined);
change_based_watchdogs_.Add(std::move(remote));
watchdog_name = "CHANGE";
} else {
// Pattern is guaranteed to be populated based on the
// results of IsWatchDogFilterValid().
const std::string& pattern = filter->pattern.value();
if (regex_cache_.count(pattern) == 0) {
regex_cache_[pattern] = std::make_unique<RE2>(pattern);
}
regex_based_watchdogs_[pattern].Add(std::move(remote));
watchdog_name = pattern;
}
VLOG(1) << "Watchdog added to '" << GetDisplayName() << "'; will match on "
<< watchdog_name;
std::move(callback).Run(true /* success */);
}
void LocalDataSource::Flush() {
// No-op by default. Use this function to perform any cleanup
// task needed after a successful processing step.
return;
}
void LocalDataSource::StartCollectingData() {
poll_timer_.Start(FROM_HERE, poll_rate_,
base::BindRepeating(&LocalDataSource::FillDataBuffer,
weak_ptr_factory_.GetWeakPtr()));
}
void LocalDataSource::AssignDeviceID(const std::string& id) {
device_id_ = id;
}
void LocalDataSource::FillDataBuffer() {
// If we've reached our max limit, halt buffer fills temporarily.
// This indicates that data is not being consumed by the aggregator,
// so there must be some kind of mojom hang-up. We'll resume when
// the problem is corrected.
if (IsDataBufferOverMaxLimit()) {
if (data_buffer_size_ >= kDangerousBufferSize) {
LOG(WARNING) << GetDisplayName() << " has reached a dangerously high "
<< "buffer allocation of " << data_buffer_size_ << " bytes.";
}
return;
}
std::vector<std::string> next_data = GetNextData();
if (next_data.empty()) {
return;
}
if (!is_incremental_) {
// If non-incremental sources (e.g. commands) return the same
// data as the previous invocation, return early. This will
// skip any watchdog checks and will prevent the duplicate
// data from eventually being enqueued to the cloud logger.
// Note that incremental sources (e.g. logs) will always have
// new data, so we don't need to check for duplicates.
if (next_data == last_unique_data_) {
return;
}
// Update our last known unique data.
last_unique_data_ = next_data;
// Fire any CHANGE watchdogs. Note that these are not supported
// for incremental sources as they will always be changing.
if (!change_based_watchdogs_.empty()) {
// The OnNotify() callbacks expect a single string of data, so join
// the vector first.
const std::string data_joined = base::JoinString(next_data, "\n");
FireChangeWatchdogCallbacks(data_joined);
}
}
for (const auto& line : next_data) {
CheckRegexWatchdogsAndFireCallbacks(line);
}
if (data_needs_redacting_) {
RedactDataBuffer(next_data);
}
SerializeDataBuffer(next_data);
// Calculate the size of our serialized data so we can
// update our internal metrics.
size_t next_data_size = 0;
for (const auto& line : next_data) {
next_data_size += line.size();
}
std::move(next_data.begin(), next_data.end(),
std::back_inserter(data_buffer_));
data_buffer_size_ += next_data_size;
VLOG(3) << GetDisplayName() << " has a current buffer allocation of "
<< data_buffer_size_ << " bytes.";
}
bool LocalDataSource::IsDataBufferOverMaxLimit() {
return data_buffer_size_ > kMaxInternalBufferSize;
}
void LocalDataSource::RedactDataBuffer(std::vector<std::string>& buffer) {
for (size_t i = 0; i < buffer.size(); i++) {
buffer[i] = redactor_.Redact(buffer[i]);
}
}
void LocalDataSource::SerializeDataBuffer(std::vector<std::string>& buffer) {
if (buffer.empty()) {
return;
}
// Set defaults for data that doesn't have timestamps or severity
// levels. Use the current time for the timestamp.
auto default_timestamp =
(base::Time::Now() - base::Time::UnixEpoch()).InMicroseconds();
const proto::LogSeverity default_severity = proto::LOG_SEVERITY_DEFAULT;
// Build each LogEntry object, then replace the buffer data with
// the serialized entry.
for (size_t i = 0; i < buffer.size(); i++) {
proto::LogEntry entry;
BuildLogEntryFromLogLine(entry, buffer[i], default_timestamp,
default_severity);
std::string serialized_data;
entry.SerializeToString(&serialized_data);
buffer[i] = std::move(serialized_data);
}
}
void LocalDataSource::BuildLogEntryFromLogLine(
proto::LogEntry& entry,
const std::string& line,
const uint64_t default_timestamp,
const proto::LogSeverity& default_severity) {
std::string timestamp;
std::string severity;
std::string log_msg;
RE2& regex = GetLogLineRegex();
if (!RE2::PartialMatch(line, regex, ×tamp, &severity, &log_msg)) {
LOG(ERROR) << "Unable to parse log line properly: " << line;
entry.set_timestamp_micros(default_timestamp);
entry.set_severity(default_severity);
entry.set_text_payload(line);
} else {
uint64_t time_since_epoch;
// There are three cases to consider here:
// 1. The timestamp is populated and was parsed properly. Pass
// to TimestampStringToUnixTime() to convert to Unix epoch.
// 2. The source explicitly does not expect timestamps to be
// present. Apply the passed-in default. Note that most (if
// not all) non-incremental sources fall under this category.
// 3. The timestamp is not populated, and this is a data source
// that expects timestamps to be present, so this line is
// likely a new line from a previous log. Carry forward the
// last timestamp that was recorded, plus 1 microsecond.
if (!timestamp.empty()) {
time_since_epoch = TimestampStringToUnixTime(timestamp);
if (time_since_epoch != 0) {
last_recorded_timestamp_ = time_since_epoch;
}
} else if (!AreTimestampsExpected()) {
time_since_epoch = default_timestamp;
} else {
time_since_epoch = ++last_recorded_timestamp_;
}
// Use the log source and timestamp to create a unique ID that can be
// used to identify this entry. This will aid in de-duplication on the
// server side.
if (is_incremental_ && time_since_epoch != 0) {
entry.set_insert_id(GetUniqueInsertId(log_msg));
}
// Even if the match succeeded, the timestamps and severity are optional
// matches, so supply a default if they aren't populated.
entry.set_timestamp_micros(time_since_epoch);
entry.set_severity(severity.empty() ? default_severity
: SeverityStringToEnum(severity));
entry.set_text_payload(log_msg);
}
}
const std::string LocalDataSource::GetUniqueInsertId(
const std::string& log_msg) {
std::string to_be_hashed = device_id_ + ":" + GetDisplayName() + ":" +
log_msg.substr(0, kLogMsgHashSize);
size_t hash = base::FastHash(to_be_hashed);
return base::NumberToString(hash);
}
RE2& LocalDataSource::GetLogLineRegex() {
// Default regex. Data sources can override this if necessary.
// See notes in local_data_source.h for restrictions.
return *kFullLogLineRegex;
}
uint64_t LocalDataSource::TimestampStringToUnixTime(
const std::string& timestamp) {
base::Time time;
if (!base::Time::FromString(timestamp.c_str(), &time)) {
LOG(ERROR) << "Unable to parse timestamp: " << timestamp;
return 0;
}
return (time - base::Time::UnixEpoch()).InMicroseconds();
}
bool LocalDataSource::AreTimestampsExpected() const {
// By default, assume that non-incremental sources will not supply
// timestamps, and assume the opposite for incremental sources. Data
// sources that do not follow this rule should override this function.
return is_incremental_;
}
proto::LogSeverity LocalDataSource::SeverityStringToEnum(
const std::string& severity) {
if (severity == "EMERGENCY" || severity == "EMERG") {
return proto::LOG_SEVERITY_EMERGENCY;
} else if (severity == "ALERT") {
return proto::LOG_SEVERITY_ALERT;
} else if (severity == "CRITICAL" || severity == "CRIT") {
return proto::LOG_SEVERITY_CRITICAL;
} else if (severity == "ERROR" || severity == "ERR") {
return proto::LOG_SEVERITY_ERROR;
} else if (severity == "WARNING") {
return proto::LOG_SEVERITY_WARNING;
} else if (severity == "NOTICE") {
return proto::LOG_SEVERITY_NOTICE;
} else if (severity == "INFO") {
return proto::LOG_SEVERITY_INFO;
} else if (severity == "DEBUG" || severity == "VERBOSE1" ||
severity == "VERBOSE2" || severity == "VERBOSE3" ||
severity == "VERBOSE4") {
return proto::LOG_SEVERITY_DEBUG;
} else {
LOG(ERROR) << "Unable to parse severity: " << severity;
return proto::LOG_SEVERITY_DEFAULT;
}
}
bool LocalDataSource::IsWatchDogFilterValid(mojom::DataFilterPtr& filter) {
if (filter->filter_type != CHANGE && filter->filter_type != REGEX) {
LOG(ERROR) << "Somehow received a DataFilter of unknown type "
<< filter->filter_type;
return false;
}
if (filter->filter_type == CHANGE && is_incremental_) {
LOG(ERROR) << "Incremental sources do not support change-based watchdogs";
return false;
}
if (filter->filter_type == REGEX) {
if (filter->pattern.value_or("").empty()) {
LOG(ERROR) << "Regex watchdog was requested, but the pattern is empty";
return false;
}
const std::string& pattern = filter->pattern.value();
if (pattern == "*") {
LOG(ERROR) << "Pattern '*' is too loose. Use CHANGE watchdog instead.";
return false;
}
RE2 test_regex(pattern);
if (!test_regex.ok()) {
LOG(ERROR) << "Regex '" << pattern
<< "' is invalid (err: " << test_regex.error() << ")";
return false;
}
} else if (filter->filter_type == CHANGE) {
if (filter->pattern.has_value()) {
LOG(ERROR) << "CHANGE filter requested with pattern";
return false;
}
}
return true;
}
void LocalDataSource::FireChangeWatchdogCallbacks(const std::string& data) {
VLOG(2) << "'" << GetDisplayName()
<< "' matched on 'CHANGE' watchdog. Notifying observers.";
for (const auto& remote : change_based_watchdogs_) {
remote->OnNotify(data);
}
}
void LocalDataSource::CheckRegexWatchdogsAndFireCallbacks(
const std::string& data) {
for (const auto& it : regex_based_watchdogs_) {
const auto& pattern = it.first;
const auto& remotes = it.second;
if (!RE2::PartialMatch(data, *regex_cache_[pattern])) {
continue;
}
VLOG(2) << "'" << GetDisplayName() << "' matched on '" << pattern
<< "' watchdog. Notifying observers.";
for (auto& remote : remotes) {
remote->OnNotify(data);
}
}
}
} // namespace ash::cfm
|