File: sharing_message_bridge_impl.cc

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (313 lines) | stat: -rw-r--r-- 12,168 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "components/sharing_message/sharing_message_bridge_impl.h"

#include "base/metrics/histogram_functions.h"
#include "base/uuid.h"
#include "components/sharing_message/features.h"
#include "components/sync/model/empty_metadata_change_list.h"
#include "components/sync/model/metadata_batch.h"
#include "components/sync/model/mutable_data_batch.h"
#include "net/base/network_change_notifier.h"

namespace {

void ReplyToCallback(SharingMessageBridge::CommitFinishedCallback callback,
                     const sync_pb::SharingMessageCommitError& commit_error) {
  DCHECK(commit_error.has_error_code());
  base::UmaHistogramExactLinear(
      "Sync.SharingMessage.CommitResult", commit_error.error_code(),
      sync_pb::SharingMessageCommitError::ErrorCode_ARRAYSIZE);
  std::move(callback).Run(commit_error);
}

void ReplyToCallback(
    SharingMessageBridge::CommitFinishedCallback callback,
    sync_pb::SharingMessageCommitError::ErrorCode commit_error_code) {
  sync_pb::SharingMessageCommitError error_message;
  error_message.set_error_code(commit_error_code);
  ReplyToCallback(std::move(callback), error_message);
}

syncer::ClientTagHash GetClientTagHashFromStorageKey(
    const std::string& storage_key) {
  return syncer::ClientTagHash::FromUnhashed(syncer::SHARING_MESSAGE,
                                             storage_key);
}

std::unique_ptr<syncer::EntityData> MoveToEntityData(
    std::unique_ptr<sync_pb::SharingMessageSpecifics> specifics) {
  auto entity_data = std::make_unique<syncer::EntityData>();
  entity_data->name = specifics->message_id();
  entity_data->client_tag_hash =
      GetClientTagHashFromStorageKey(specifics->message_id());
  entity_data->specifics.set_allocated_sharing_message(specifics.release());
  return entity_data;
}

std::unique_ptr<syncer::EntityData> CopyToEntityData(
    const sync_pb::SharingMessageSpecifics& specifics) {
  return MoveToEntityData(
      std::make_unique<sync_pb::SharingMessageSpecifics>(specifics));
}

}  // namespace

SharingMessageBridgeImpl::SharingMessageBridgeImpl(
    std::unique_ptr<syncer::DataTypeLocalChangeProcessor> change_processor)
    : DataTypeSyncBridge(std::move(change_processor)) {
  // Current data type doesn't have persistent storage so it's ready to sync
  // immediately.
  this->change_processor()->ModelReadyToSync(
      std::make_unique<syncer::MetadataBatch>());
}

SharingMessageBridgeImpl::~SharingMessageBridgeImpl() = default;

void SharingMessageBridgeImpl::SendSharingMessage(
    std::unique_ptr<sync_pb::SharingMessageSpecifics> specifics,
    CommitFinishedCallback on_commit_callback) {
  if (!change_processor()->IsTrackingMetadata()) {
    ReplyToCallback(std::move(on_commit_callback),
                    sync_pb::SharingMessageCommitError::SYNC_TURNED_OFF);
    return;
  }

  if (net::NetworkChangeNotifier::GetConnectionType() ==
      net::NetworkChangeNotifier::CONNECTION_NONE) {
    ReplyToCallback(std::move(on_commit_callback),
                    sync_pb::SharingMessageCommitError::SYNC_NETWORK_ERROR);
    return;
  }

  std::unique_ptr<syncer::MetadataChangeList> metadata_change_list =
      CreateMetadataChangeList();
  // Fill in the internal message id with unique generated identifier.
  const std::string message_id =
      base::Uuid::GenerateRandomV4().AsLowercaseString();
  specifics->set_message_id(message_id);
  std::unique_ptr<syncer::EntityData> entity_data =
      MoveToEntityData(std::move(specifics));
  const syncer::ClientTagHash client_tag_hash =
      GetClientTagHashFromStorageKey(message_id);

  DCHECK(pending_commits_.find(client_tag_hash) == pending_commits_.end());
  pending_commits_.emplace(
      client_tag_hash,
      PendingCommit(
          std::make_unique<TimedCallback>(
              std::move(on_commit_callback),
              base::BindOnce(&SharingMessageBridgeImpl::ProcessCommitTimeout,
                             base::Unretained(this), client_tag_hash)),
          entity_data->specifics.sharing_message()));

  change_processor()->Put(message_id, std::move(entity_data),
                          metadata_change_list.get());
}

base::WeakPtr<syncer::DataTypeControllerDelegate>
SharingMessageBridgeImpl::GetControllerDelegate() {
  return change_processor()->GetControllerDelegate();
}

std::unique_ptr<syncer::MetadataChangeList>
SharingMessageBridgeImpl::CreateMetadataChangeList() {
  // The data type intentionally doesn't persist the data on disk, so metadata
  // is just ignored.
  return std::make_unique<syncer::EmptyMetadataChangeList>();
}

std::optional<syncer::ModelError> SharingMessageBridgeImpl::MergeFullSyncData(
    std::unique_ptr<syncer::MetadataChangeList> metadata_change_list,
    syncer::EntityChangeList entity_data) {
  DCHECK(entity_data.empty());
  DCHECK(change_processor()->IsTrackingMetadata());
  return {};
}

std::optional<syncer::ModelError>
SharingMessageBridgeImpl::ApplyIncrementalSyncChanges(
    std::unique_ptr<syncer::MetadataChangeList> metadata_change_list,
    syncer::EntityChangeList entity_changes) {
  sync_pb::SharingMessageCommitError no_error_message;
  no_error_message.set_error_code(sync_pb::SharingMessageCommitError::NONE);
  for (const std::unique_ptr<syncer::EntityChange>& change : entity_changes) {
    // For commit-only data type we expect only |ACTION_DELETE| changes.
    DCHECK_EQ(syncer::EntityChange::ACTION_DELETE, change->type());

    const syncer::ClientTagHash client_tag_hash =
        GetClientTagHashFromStorageKey(change->storage_key());
    ProcessCommitResponse(client_tag_hash, no_error_message);
  }
  return {};
}

std::unique_ptr<syncer::DataBatch> SharingMessageBridgeImpl::GetDataForCommit(
    StorageKeyList storage_keys) {
  auto batch = std::make_unique<syncer::MutableDataBatch>();

  for (const std::string& storage_key : storage_keys) {
    auto iter =
        pending_commits_.find(GetClientTagHashFromStorageKey(storage_key));
    if (iter == pending_commits_.end()) {
      continue;
    }
    batch->Put(storage_key, CopyToEntityData(iter->second.specifics));
  }

  return batch;
}

std::unique_ptr<syncer::DataBatch>
SharingMessageBridgeImpl::GetAllDataForDebugging() {
  auto batch = std::make_unique<syncer::MutableDataBatch>();

  for (const auto& cth_to_commit : pending_commits_) {
    std::unique_ptr<syncer::EntityData> entity_data =
        CopyToEntityData(cth_to_commit.second.specifics);
    const std::string storage_key = GetStorageKey(*entity_data);
    batch->Put(storage_key, std::move(entity_data));
  }

  return batch;
}

std::string SharingMessageBridgeImpl::GetClientTag(
    const syncer::EntityData& entity_data) const {
  return GetStorageKey(entity_data);
}

std::string SharingMessageBridgeImpl::GetStorageKey(
    const syncer::EntityData& entity_data) const {
  DCHECK(entity_data.specifics.has_sharing_message());
  return entity_data.specifics.sharing_message().message_id();
}

bool SharingMessageBridgeImpl::IsEntityDataValid(
    const syncer::EntityData& entity_data) const {
  // SHARING_MESSAGE is a commit only data type so this method is not called.
  NOTREACHED();
}

void SharingMessageBridgeImpl::OnCommitAttemptErrors(
    const syncer::FailedCommitResponseDataList& error_response_list) {
  for (const syncer::FailedCommitResponseData& response : error_response_list) {
    // We do not want to retry committing again, thus, the bridge has to untrack
    // the failed item.
    change_processor()->UntrackEntityForClientTagHash(response.client_tag_hash);
    ProcessCommitResponse(
        response.client_tag_hash,
        response.datatype_specific_error.sharing_message_error());
  }
}

syncer::DataTypeSyncBridge::CommitAttemptFailedBehavior
SharingMessageBridgeImpl::OnCommitAttemptFailed(
    syncer::SyncCommitError commit_error) {
  // Full commit failed means we need to drop all entities and report an error
  // using callback.
  sync_pb::SharingMessageCommitError::ErrorCode sharing_message_error_code;
  switch (commit_error) {
    case syncer::SyncCommitError::kNetworkError:
      sharing_message_error_code =
          sync_pb::SharingMessageCommitError::SYNC_NETWORK_ERROR;
      break;
    case syncer::SyncCommitError::kAuthError:
      // Ignore the auth error because it may be a temporary error and the
      // message will be sent on the second attempt.
      return CommitAttemptFailedBehavior::kShouldRetryOnNextCycle;
    case syncer::SyncCommitError::kServerError:
    case syncer::SyncCommitError::kBadServerResponse:
      sharing_message_error_code =
          sync_pb::SharingMessageCommitError::SYNC_SERVER_ERROR;
      break;
  }

  sync_pb::SharingMessageCommitError sync_error_message;
  sync_error_message.set_error_code(sharing_message_error_code);
  for (auto& cth_and_commit : pending_commits_) {
    change_processor()->UntrackEntityForClientTagHash(cth_and_commit.first);
    cth_and_commit.second.timed_callback->Run(sync_error_message);
  }
  pending_commits_.clear();
  return CommitAttemptFailedBehavior::kDontRetryOnNextCycle;
}

void SharingMessageBridgeImpl::ApplyDisableSyncChanges(
    std::unique_ptr<syncer::MetadataChangeList> metadata_change_list) {
  sync_pb::SharingMessageCommitError sync_disabled_error_message;
  sync_disabled_error_message.set_error_code(
      sync_pb::SharingMessageCommitError::SYNC_TURNED_OFF);
  for (auto& cth_and_commit : pending_commits_) {
    change_processor()->UntrackEntityForClientTagHash(cth_and_commit.first);
    cth_and_commit.second.timed_callback->Run(sync_disabled_error_message);
  }
  pending_commits_.clear();
}

void SharingMessageBridgeImpl::OnSyncPaused() {
  // The controller always clears metadata so this is only reachable for the
  // case where the initial download is interrupted before MergeFullSyncData()
  // is invoked, which means there are no outgoing messages.
  CHECK(!change_processor()->IsTrackingMetadata());
  CHECK(pending_commits_.empty());
}

void SharingMessageBridgeImpl::ProcessCommitTimeout(
    const syncer::ClientTagHash& client_tag_hash) {
  change_processor()->UntrackEntityForClientTagHash(client_tag_hash);
  sync_pb::SharingMessageCommitError error_message;
  error_message.set_error_code(
      sync_pb::SharingMessageCommitError::SYNC_TIMEOUT);
  ProcessCommitResponse(client_tag_hash, error_message);
}

void SharingMessageBridgeImpl::ProcessCommitResponse(
    const syncer::ClientTagHash& client_tag_hash,
    const sync_pb::SharingMessageCommitError& commit_error_message) {
  const auto iter = pending_commits_.find(client_tag_hash);
  if (iter == pending_commits_.end()) {
    // This may happen if tasks from OnUpdateReceived and OneShotTimer were
    // added at one time.
    return;
  }
  iter->second.timed_callback->Run(commit_error_message);
  pending_commits_.erase(iter);
}

SharingMessageBridgeImpl::TimedCallback::TimedCallback(
    CommitFinishedCallback commit_callback,
    base::OnceClosure timeout_callback)
    : commit_callback_(std::move(commit_callback)) {
  timer_.Start(FROM_HERE, SharingMessageBridgeImpl::kCommitTimeout,
               std::move(timeout_callback));
}

SharingMessageBridgeImpl::TimedCallback::~TimedCallback() = default;

void SharingMessageBridgeImpl::TimedCallback::Run(
    const sync_pb::SharingMessageCommitError& commit_error) {
  DCHECK(commit_callback_);

  ReplyToCallback(std::move(commit_callback_), commit_error);
  // |timer_| may be already stopped if Run is called from
  // ProcessCommitTimeout.
  if (timer_.IsRunning()) {
    timer_.Stop();
  }
}

SharingMessageBridgeImpl::PendingCommit::PendingCommit(
    std::unique_ptr<TimedCallback> timed_callback,
    sync_pb::SharingMessageSpecifics specifics)
    : timed_callback(std::move(timed_callback)),
      specifics(std::move(specifics)) {}

SharingMessageBridgeImpl::PendingCommit::~PendingCommit() = default;

SharingMessageBridgeImpl::PendingCommit::PendingCommit(PendingCommit&&) =
    default;
SharingMessageBridgeImpl::PendingCommit&
SharingMessageBridgeImpl::PendingCommit::operator=(PendingCommit&&) = default;