1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
|
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/engine_impl/directory_update_handler.h"
#include <stdint.h>
#include <vector>
#include "base/memory/ptr_util.h"
#include "components/sync/base/data_type_histogram.h"
#include "components/sync/engine_impl/conflict_resolver.h"
#include "components/sync/engine_impl/cycle/data_type_debug_info_emitter.h"
#include "components/sync/engine_impl/cycle/status_controller.h"
#include "components/sync/engine_impl/update_applicator.h"
#include "components/sync/syncable/directory.h"
#include "components/sync/syncable/model_neutral_mutable_entry.h"
#include "components/sync/syncable/syncable_changes_version.h"
#include "components/sync/syncable/syncable_model_neutral_write_transaction.h"
#include "components/sync/syncable/syncable_write_transaction.h"
namespace syncer {
using syncable::SYNCER;
DirectoryUpdateHandler::DirectoryUpdateHandler(
syncable::Directory* dir,
ModelType type,
scoped_refptr<ModelSafeWorker> worker,
DataTypeDebugInfoEmitter* debug_info_emitter)
: dir_(dir),
type_(type),
worker_(worker),
debug_info_emitter_(debug_info_emitter) {}
DirectoryUpdateHandler::~DirectoryUpdateHandler() {}
bool DirectoryUpdateHandler::IsInitialSyncEnded() const {
return dir_->InitialSyncEndedForType(type_);
}
void DirectoryUpdateHandler::GetDownloadProgress(
sync_pb::DataTypeProgressMarker* progress_marker) const {
dir_->GetDownloadProgress(type_, progress_marker);
}
void DirectoryUpdateHandler::GetDataTypeContext(
sync_pb::DataTypeContext* context) const {
syncable::ModelNeutralWriteTransaction trans(FROM_HERE, SYNCER, dir_);
dir_->GetDataTypeContext(&trans, type_, context);
}
SyncerError DirectoryUpdateHandler::ProcessGetUpdatesResponse(
const sync_pb::DataTypeProgressMarker& progress_marker,
const sync_pb::DataTypeContext& mutated_context,
const SyncEntityList& applicable_updates,
StatusController* status) {
syncable::ModelNeutralWriteTransaction trans(FROM_HERE, SYNCER, dir_);
if (progress_marker.ByteSize() > 0) {
SyncRecordDatatypeBin("DataUse.Sync.ProgressMarker.Bytes",
ModelTypeToHistogramInt(type_),
progress_marker.ByteSize());
}
if (mutated_context.has_context()) {
sync_pb::DataTypeContext local_context;
dir_->GetDataTypeContext(&trans, type_, &local_context);
// Only update the local context if it is still relevant. If the local
// version is higher, it means a local change happened while the mutation
// was in flight, and the local context takes priority.
if (mutated_context.version() >= local_context.version() &&
local_context.context() != mutated_context.context()) {
dir_->SetDataTypeContext(&trans, type_, mutated_context);
// TODO(zea): trigger the datatype's UpdateDataTypeContext method.
} else if (mutated_context.version() < local_context.version()) {
// A GetUpdates using the old context was in progress when the context was
// set. Fail this get updates cycle, to force a retry.
DVLOG(1) << "GU Context conflict detected, forcing GU retry.";
debug_info_emitter_->EmitUpdateCountersUpdate();
return DATATYPE_TRIGGERED_RETRY;
}
}
// Auto-create permanent folder for the type if the progress marker
// changes from empty to non-empty.
if (IsTypeWithClientGeneratedRoot(type_) &&
dir_->HasEmptyDownloadProgress(type_) &&
IsValidProgressMarker(progress_marker)) {
CreateTypeRoot(&trans);
}
UpdateSyncEntities(&trans, applicable_updates, status);
if (IsValidProgressMarker(progress_marker)) {
ExpireEntriesIfNeeded(&trans, progress_marker);
UpdateProgressMarker(progress_marker);
}
debug_info_emitter_->EmitUpdateCountersUpdate();
return SYNCER_OK;
}
void DirectoryUpdateHandler::CreateTypeRoot(
syncable::ModelNeutralWriteTransaction* trans) {
syncable::ModelNeutralMutableEntry entry(
trans, syncable::CREATE_NEW_TYPE_ROOT, type_);
if (!entry.good()) {
// This will fail only if matching entry already exists, for example
// if the type gets disabled and its progress marker gets cleared,
// then the type gets re-enabled again.
DVLOG(1) << "Type root folder " << ModelTypeToRootTag(type_)
<< " already exists.";
return;
}
entry.PutServerIsDir(true);
entry.PutUniqueServerTag(ModelTypeToRootTag(type_));
}
void DirectoryUpdateHandler::ApplyUpdates(StatusController* status) {
if (IsApplyUpdatesRequired()) {
// This will invoke handlers that belong to the model and its thread, so we
// switch to the appropriate thread before we start this work.
WorkCallback c =
base::Bind(&DirectoryUpdateHandler::ApplyUpdatesImpl,
// We wait until the callback is executed. We can safely use
// Unretained.
base::Unretained(this), base::Unretained(status));
worker_->DoWorkAndWaitUntilDone(c);
debug_info_emitter_->EmitUpdateCountersUpdate();
debug_info_emitter_->EmitStatusCountersUpdate();
}
PostApplyUpdates();
}
void DirectoryUpdateHandler::PassiveApplyUpdates(StatusController* status) {
if (IsApplyUpdatesRequired()) {
// Just do the work here instead of deferring to another thread.
ApplyUpdatesImpl(status);
debug_info_emitter_->EmitUpdateCountersUpdate();
debug_info_emitter_->EmitStatusCountersUpdate();
}
PostApplyUpdates();
}
SyncerError DirectoryUpdateHandler::ApplyUpdatesImpl(StatusController* status) {
syncable::WriteTransaction trans(FROM_HERE, syncable::SYNCER, dir_);
std::vector<int64_t> handles;
dir_->GetUnappliedUpdateMetaHandles(&trans, FullModelTypeSet(type_),
&handles);
// First set of update application passes.
UpdateApplicator applicator(dir_->GetCryptographer(&trans));
applicator.AttemptApplications(&trans, handles);
// The old StatusController counters.
status->increment_num_updates_applied_by(applicator.updates_applied());
status->increment_num_hierarchy_conflicts_by(
applicator.hierarchy_conflicts());
status->increment_num_encryption_conflicts_by(
applicator.encryption_conflicts());
// The new UpdateCounter counters.
UpdateCounters* counters = debug_info_emitter_->GetMutableUpdateCounters();
counters->num_updates_applied += applicator.updates_applied();
counters->num_hierarchy_conflict_application_failures =
applicator.hierarchy_conflicts();
counters->num_encryption_conflict_application_failures +=
applicator.encryption_conflicts();
if (applicator.simple_conflict_ids().size() != 0) {
// Resolve the simple conflicts we just detected.
ConflictResolver resolver;
resolver.ResolveConflicts(&trans, dir_->GetCryptographer(&trans),
applicator.simple_conflict_ids(), status,
counters);
// Conflict resolution sometimes results in more updates to apply.
handles.clear();
dir_->GetUnappliedUpdateMetaHandles(&trans, FullModelTypeSet(type_),
&handles);
UpdateApplicator conflict_applicator(dir_->GetCryptographer(&trans));
conflict_applicator.AttemptApplications(&trans, handles);
// We count the number of updates from both applicator passes.
status->increment_num_updates_applied_by(
conflict_applicator.updates_applied());
counters->num_updates_applied += conflict_applicator.updates_applied();
// Encryption conflicts should remain unchanged by the resolution of simple
// conflicts. Those can only be solved by updating our nigori key bag.
DCHECK_EQ(conflict_applicator.encryption_conflicts(),
applicator.encryption_conflicts());
// Hierarchy conflicts should also remain unchanged, for reasons that are
// more subtle. Hierarchy conflicts exist when the application of a pending
// update from the server would make the local folder hierarchy
// inconsistent. The resolution of simple conflicts could never affect the
// hierarchy conflicting item directly, because hierarchy conflicts are not
// processed by the conflict resolver. It could, in theory, modify the
// local hierarchy on which hierarchy conflict detection depends. However,
// the conflict resolution algorithm currently in use does not allow this.
DCHECK_EQ(conflict_applicator.hierarchy_conflicts(),
applicator.hierarchy_conflicts());
// There should be no simple conflicts remaining. We know this because the
// resolver should have resolved all the conflicts we detected last time
// and, by the two previous assertions, that no conflicts have been
// downgraded from encryption or hierarchy down to simple.
DCHECK(conflict_applicator.simple_conflict_ids().empty());
}
return SYNCER_OK;
}
void DirectoryUpdateHandler::PostApplyUpdates() {
// If this is a type with client generated root, the root node has been
// created locally and didn't go through ApplyUpdatesImpl.
// Mark it as having the initial download completed so that the type
// reports as properly initialized (which is done by changing the root's
// base version to a value other than CHANGES_VERSION).
// This does nothing if the root's base version is already other than
// CHANGES_VERSION.
if (IsTypeWithClientGeneratedRoot(type_)) {
syncable::ModelNeutralWriteTransaction trans(FROM_HERE, SYNCER, dir_);
dir_->MarkInitialSyncEndedForType(&trans, type_);
}
}
bool DirectoryUpdateHandler::IsApplyUpdatesRequired() {
if (IsControlType(type_)) {
return false; // We don't process control types here.
}
return dir_->TypeHasUnappliedUpdates(type_);
}
void DirectoryUpdateHandler::UpdateSyncEntities(
syncable::ModelNeutralWriteTransaction* trans,
const SyncEntityList& applicable_updates,
StatusController* status) {
UpdateCounters* counters = debug_info_emitter_->GetMutableUpdateCounters();
counters->num_updates_received += applicable_updates.size();
ProcessDownloadedUpdates(dir_, trans, type_, applicable_updates, status,
counters);
}
bool DirectoryUpdateHandler::IsValidProgressMarker(
const sync_pb::DataTypeProgressMarker& progress_marker) const {
if (progress_marker.token().empty()) {
return false;
}
int field_number = progress_marker.data_type_id();
ModelType model_type = GetModelTypeFromSpecificsFieldNumber(field_number);
if (!IsRealDataType(model_type) || type_ != model_type) {
NOTREACHED() << "Update handler of type " << ModelTypeToString(type_)
<< " asked to process progress marker with invalid type "
<< field_number;
return false;
}
return true;
}
void DirectoryUpdateHandler::UpdateProgressMarker(
const sync_pb::DataTypeProgressMarker& progress_marker) {
if (progress_marker.has_gc_directive() || !cached_gc_directive_) {
dir_->SetDownloadProgress(type_, progress_marker);
} else {
sync_pb::DataTypeProgressMarker merged_marker = progress_marker;
merged_marker.mutable_gc_directive()->CopyFrom(*cached_gc_directive_);
dir_->SetDownloadProgress(type_, merged_marker);
}
}
void DirectoryUpdateHandler::ExpireEntriesIfNeeded(
syncable::ModelNeutralWriteTransaction* trans,
const sync_pb::DataTypeProgressMarker& progress_marker) {
if (!cached_gc_directive_) {
sync_pb::DataTypeProgressMarker current_marker;
GetDownloadProgress(¤t_marker);
if (current_marker.has_gc_directive()) {
cached_gc_directive_ =
base::MakeUnique<sync_pb::GarbageCollectionDirective>(
current_marker.gc_directive());
}
}
if (!progress_marker.has_gc_directive())
return;
const sync_pb::GarbageCollectionDirective& new_gc_directive =
progress_marker.gc_directive();
if (new_gc_directive.has_version_watermark() &&
(!cached_gc_directive_ ||
cached_gc_directive_->version_watermark() <
new_gc_directive.version_watermark())) {
ExpireEntriesByVersion(dir_, trans, type_,
new_gc_directive.version_watermark());
}
cached_gc_directive_ =
base::MakeUnique<sync_pb::GarbageCollectionDirective>(new_gc_directive);
}
} // namespace syncer
|