1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_BROWSER_ASH_CHROMEBOX_FOR_MEETINGS_ARTEMIS_LOCAL_DATA_SOURCE_H_
#define CHROME_BROWSER_ASH_CHROMEBOX_FOR_MEETINGS_ARTEMIS_LOCAL_DATA_SOURCE_H_
#include <deque>
#include "base/memory/weak_ptr.h"
#include "chromeos/services/chromebox_for_meetings/public/mojom/meet_devices_data_aggregator.mojom.h"
#include "chromeos/services/chromebox_for_meetings/public/proto/logs_payload.pb.h"
#include "components/feedback/redaction_tool/redaction_tool.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/remote_set.h"
#include "third_party/re2/src/re2/re2.h"
namespace ash::cfm {
// Maximum bytes that can be in the internal buffer before we halt
// collecting data temporarily. In the working case, we should never
// hit this limit, but we may reach it if we're unable to enqueue logs
// via Fetch() for whatever reason (eg a network outage).
inline constexpr int kMaxInternalBufferSize = 500000; // 500Kb
class LocalDataSource : public mojom::DataSource {
public:
LocalDataSource(base::TimeDelta poll_rate,
bool data_needs_redacting,
bool is_incremental);
LocalDataSource(const LocalDataSource&) = delete;
LocalDataSource& operator=(const LocalDataSource&) = delete;
~LocalDataSource() override;
// mojom::DataSource implementation
void Fetch(FetchCallback callback) override;
void AddWatchDog(mojom::DataFilterPtr filter,
mojo::PendingRemote<mojom::DataWatchDog> pending_watch_dog,
AddWatchDogCallback callback) override;
void Flush() override;
void StartCollectingData();
void AssignDeviceID(const std::string& id);
protected:
void FillDataBuffer();
// Make this virtual so unittests can override it
virtual void SerializeDataBuffer(std::vector<std::string>& buffer);
// Returns a unique identifier for logging purposes only.
virtual const std::string& GetDisplayName() = 0;
// Returns an array of the "next" data. For commands or other stately
// data sources, this will likely just be the current state. For sources
// that are incremental, like log files, this might be the next batch of
// lines in the file (or otherwise). This data will be added to the internal
// buffer for temporary storage until the next call to Fetch().
virtual std::vector<std::string> GetNextData() = 0;
void BuildLogEntryFromLogLine(proto::LogEntry& entry,
const std::string& line,
const uint64_t default_timestamp,
const proto::LogSeverity& default_severity);
// Returns the regex that should be used to parse every line of data.
// Note: this regex is expected to contain three groups, marked with
// parentheses, in the following order: a timestamp, a severity level,
// and the rest of the message. Failure to provide three groups will
// render the regex invalid, and defaults for time/severity will
// be used.
virtual RE2& GetLogLineRegex();
// Converts a parsed timestamp string to microseconds since the UNIX epoch.
virtual uint64_t TimestampStringToUnixTime(const std::string& timestamp);
// Returns true if this data source is expected to contain timestamps,
// and returns false otherwise.
virtual bool AreTimestampsExpected() const;
private:
bool IsDataBufferOverMaxLimit();
void RedactDataBuffer(std::vector<std::string>& buffer);
const std::string GetUniqueInsertId(const std::string& log_msg);
proto::LogSeverity SeverityStringToEnum(const std::string& severity);
bool IsWatchDogFilterValid(mojom::DataFilterPtr& filter);
void FireChangeWatchdogCallbacks(const std::string& data);
void CheckRegexWatchdogsAndFireCallbacks(const std::string& data);
base::RepeatingTimer poll_timer_;
base::TimeDelta poll_rate_;
// The unique ID associated with the CfM device.
std::string device_id_;
// True if we should pass the data through the redactor tool
// before uploading, False otherwise.
bool data_needs_redacting_;
// Set to True when the data source yields incremental data, like a
// log file, and False when the source simply yields current state.
const bool is_incremental_;
// Contains a chain of the most recent data. Will be returned
// at the next call to Fetch().
std::deque<std::string> data_buffer_;
// Current size of data buffer in bytes.
size_t data_buffer_size_ = 0;
// Redaction tool for PII redaction
redaction::RedactionTool redactor_;
// Contains the most recent unique data from GetNextData(). Only used
// for non-incremental sources to avoid spamming the same data.
std::vector<std::string> last_unique_data_;
// Contains a copy of the most recently seen timestamp. This timestamp
// be "applied forward" to subsequent logs that don't have a timestamp.
// This is needed to support timestamped logs that contain newlines.
uint64_t last_recorded_timestamp_ = 0;
// Contains a set of watchdogs to be fired when the output contains
// any change since the previous output.
mojo::RemoteSet<mojom::DataWatchDog> change_based_watchdogs_;
// Contains a collection of watchdogs to be fired when the output matches
// one or more regex patterns. The patterns are the keys, which map to a
// set of remotes. Supports multiple watchdogs per pattern.
std::map<const std::string, mojo::RemoteSet<mojom::DataWatchDog>>
regex_based_watchdogs_;
// Regex objects are expensive to create, so cache them here and
// reuse for repeated usages and duplicate watchdogs.
std::map<const std::string, std::unique_ptr<RE2>> regex_cache_;
// Must be the last class member.
base::WeakPtrFactory<LocalDataSource> weak_ptr_factory_{this};
};
} // namespace ash::cfm
#endif // CHROME_BROWSER_ASH_CHROMEBOX_FOR_MEETINGS_ARTEMIS_LOCAL_DATA_SOURCE_H_
|