File: report_queue_impl.h

package info (click to toggle)
chromium 138.0.7204.183-1~deb12u1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm-proposed-updates
  • size: 6,080,960 kB
  • sloc: cpp: 34,937,079; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,954; asm: 946,768; xml: 739,971; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,811; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (168 lines) | stat: -rw-r--r-- 6,791 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COMPONENTS_REPORTING_CLIENT_REPORT_QUEUE_IMPL_H_
#define COMPONENTS_REPORTING_CLIENT_REPORT_QUEUE_IMPL_H_

#include <memory>
#include <optional>
#include <queue>
#include <string>
#include <utility>

#include "base/functional/callback.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/sequence_checker.h"
#include "base/task/sequenced_task_runner.h"
#include "components/reporting/client/report_queue.h"
#include "components/reporting/client/report_queue_configuration.h"
#include "components/reporting/proto/synced/record.pb.h"
#include "components/reporting/proto/synced/record_constants.pb.h"
#include "components/reporting/storage/storage_module_interface.h"
#include "components/reporting/util/rate_limiter_interface.h"
#include "components/reporting/util/status.h"
#include "components/reporting/util/statusor.h"
#include "components/reporting/util/wrapped_rate_limiter.h"

namespace reporting {

// A |ReportQueueImpl| is configured with a |ReportQueueConfiguration|.  A
// |ReportQueueImpl| allows a user to |Enqueue| a message for delivery to a
// handler specified by the |Destination| held by the provided
// |ReportQueueConfiguration|. |ReportQueueImpl| handles scheduling storage and
// delivery.
//
// ReportQueues are not meant to be created directly, instead use the
// reporting::ReportQueueProvider::CreateQueue(...) function. See the
// comments for reporting::ReportingClient for example usage.
//
// Enqueue can also be used with a |base::Value| or |std::string|.
class ReportQueueImpl : public ReportQueue {
 public:
  // Factory
  static void Create(
      std::unique_ptr<ReportQueueConfiguration> config,
      scoped_refptr<StorageModuleInterface> storage,
      base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb);

  ReportQueueImpl(const ReportQueueImpl& other) = delete;
  ReportQueueImpl& operator=(const ReportQueueImpl& other) = delete;
  ~ReportQueueImpl() override;

  void Flush(Priority priority, FlushCallback callback) override;

  // Dummy implementation for a regular queue.
  [[nodiscard]] base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
  PrepareToAttachActualQueue() const override;

  // ReportQueue:
  Destination GetDestination() const override;

 protected:
  ReportQueueImpl(std::unique_ptr<ReportQueueConfiguration> config,
                  scoped_refptr<StorageModuleInterface> storage);

 private:
  void AddProducedRecord(RecordProducer record_producer,
                         Priority priority,
                         EnqueueCallback callback) const override;

  const std::unique_ptr<ReportQueueConfiguration> config_;
  const scoped_refptr<StorageModuleInterface> storage_;
};

class SpeculativeReportQueueImpl : public ReportQueue {
 public:
  // Factory method returns a smart pointer with on-thread deleter.
  static std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>
  Create(const SpeculativeConfigSettings& config_settings);

  SpeculativeReportQueueImpl(const SpeculativeReportQueueImpl& other) = delete;
  SpeculativeReportQueueImpl& operator=(
      const SpeculativeReportQueueImpl& other) = delete;
  ~SpeculativeReportQueueImpl() override;

  // Forwards |Flush| to |ReportQueue|, if already created.
  // Returns with failure otherwise.
  void Flush(Priority priority, FlushCallback callback) override;

  // Provides a callback to attach initialized actual queue to the speculative
  // queue.
  [[nodiscard]] base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
  PrepareToAttachActualQueue() const override;

  // ReportQueue:
  Destination GetDestination() const override;

 private:
  // Moveable, non-copyable struct holding a pending record producer for the
  // |pending_record_producers_| queue below.
  struct PendingRecordProducer {
    PendingRecordProducer(RecordProducer producer,
                          EnqueueCallback callback,
                          Priority priority);
    PendingRecordProducer(PendingRecordProducer&& other);
    PendingRecordProducer& operator=(PendingRecordProducer&& other);
    ~PendingRecordProducer();

    RecordProducer record_producer;
    EnqueueCallback record_callback;
    Priority record_priority;
  };

  // Private constructor, used by the factory method  only.
  explicit SpeculativeReportQueueImpl(
      const SpeculativeConfigSettings& config_settings,
      scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner);

  // Forwards |AddProducedRecord| to |ReportQueue|, if already created.
  // Records the record internally otherwise.
  void AddProducedRecord(RecordProducer record_producer,
                         Priority priority,
                         EnqueueCallback callback) const override;

  // Substitutes actual queue to the speculative, when ready.
  // Initiates processesing of all pending records.
  void AttachActualQueue(
      StatusOr<std::unique_ptr<ReportQueue>> status_or_actual_queue);

  // Enqueues head of the |pending_record_producers_| and reapplies for the rest
  // of it.
  void EnqueuePendingRecordProducers() const;

  // Purges all |pending_record_producers_| with error.
  void PurgePendingProducers(Status status) const;

  // Optionally enqueues |record_producer| (owned) to actual queue, if ready.
  // Otherwise adds it to the end of |pending_record_producers_|.
  void MaybeEnqueueRecordProducer(Priority priority,
                                  EnqueueCallback callback,
                                  RecordProducer record_producer) const;

  // Task runner that protects |report_queue_| and |pending_record_producers_|
  // and allows to synchronize the initialization.
  const scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
  SEQUENCE_CHECKER(sequence_checker_);

  // Actual |ReportQueue| once successfully created (immutable after that).
  std::optional<std::unique_ptr<ReportQueue>> actual_report_queue_
      GUARDED_BY_CONTEXT(sequence_checker_);

  // Queue of the pending record producers, collected before actual queue has
  // been created. Declared 'mutable', because it is accessed by 'const'
  // methods.
  mutable std::queue<PendingRecordProducer> pending_record_producers_
      GUARDED_BY_CONTEXT(sequence_checker_);

  // Report queue configuration settings that are supposed to be identical to
  // the one configured with the `actual_report_queue_`.
  const SpeculativeConfigSettings config_settings_;

  // Weak pointer factory.
  base::WeakPtrFactory<SpeculativeReportQueueImpl> weak_ptr_factory_{this};
};
}  // namespace reporting

#endif  // COMPONENTS_REPORTING_CLIENT_REPORT_QUEUE_IMPL_H_