File: sectorupdate.go

package info (click to toggle)
sia 1.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,340 kB
  • sloc: makefile: 80; sh: 52
file content (491 lines) | stat: -rw-r--r-- 14,576 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
package contractmanager

import (
	"errors"
	"sync"
	"sync/atomic"

	"github.com/NebulousLabs/Sia/build"
	"github.com/NebulousLabs/Sia/crypto"
	"github.com/NebulousLabs/Sia/modules"
)

// commitUpdateSector will commit a sector update to the contract manager,
// writing in metadata and usage info if the sector still exists, and deleting
// the usage info if the sector does not exist. The update is idempotent.
func (wal *writeAheadLog) commitUpdateSector(su sectorUpdate) {
	sf, exists := wal.cm.storageFolders[su.Folder]
	if !exists || atomic.LoadUint64(&sf.atomicUnavailable) == 1 {
		wal.cm.log.Printf("ERROR: unable to locate storage folder for a committed sector update.")
		return
	}

	// If the sector is being cleaned from disk, unset the usage flag.
	if su.Count == 0 {
		sf.clearUsage(su.Index)
		return
	}

	// Set the usage flag and update the on-disk metadata. Abort if the
	// metadata write fails.
	err := wal.writeSectorMetadata(sf, su)
	if err != nil {
		wal.cm.log.Printf("ERROR: unable to write sector metadata for %v: %v\n", sf.path, err)
		return
	}
	sf.setUsage(su.Index)
}

// managedAddPhysicalSector is a WAL operation to add a physical sector to the
// contract manager.
func (wal *writeAheadLog) managedAddPhysicalSector(id sectorID, data []byte, count uint16) error {
	// Sanity check - data should have modules.SectorSize bytes.
	if uint64(len(data)) != modules.SectorSize {
		wal.cm.log.Critical("sector has the wrong size", modules.SectorSize, len(data))
		return errors.New("malformed sector")
	}

	// Find a committed storage folder that has enough space to receive
	// this sector. Keep trying new storage folders if some return
	// errors during disk operations.
	wal.mu.Lock()
	storageFolders := wal.cm.availableStorageFolders()
	wal.mu.Unlock()
	var syncChan chan struct{}
	for len(storageFolders) >= 1 {
		var storageFolderIndex int
		err := func() error {
			// NOTE: Convention is broken when working with WAL lock here, due
			// to the complexity required with managing both the WAL lock and
			// the storage folder lock. Pay close attention when reviewing and
			// modifying.

			// Grab a vacant storage folder.
			wal.mu.Lock()
			var sf *storageFolder
			sf, storageFolderIndex = vacancyStorageFolder(storageFolders)
			if sf == nil {
				// None of the storage folders have enough room to house the
				// sector.
				wal.mu.Unlock()
				return errInsufficientStorageForSector
			}
			defer sf.mu.RUnlock()

			// Grab a sector from the storage folder. WAL lock cannot be
			// released between grabbing the storage folder and grabbing a
			// sector lest another thread request the final available sector in
			// the storage folder.
			sectorIndex, err := randFreeSector(sf.usage)
			if err != nil {
				wal.mu.Unlock()
				wal.cm.log.Critical("a storage folder with full usage was returned from emptiestStorageFolder")
				return err
			}
			// Set the usage, but mark it as uncommitted.
			sf.setUsage(sectorIndex)
			sf.availableSectors[id] = sectorIndex
			wal.mu.Unlock()

			// NOTE: The usage has been set, in the event of failure the usage
			// must be cleared.

			// Try writing the new sector to disk.
			err = writeSector(sf.sectorFile, sectorIndex, data)
			if err != nil {
				wal.cm.log.Printf("ERROR: Unable to write sector for folder %v: %v\n", sf.path, err)
				atomic.AddUint64(&sf.atomicFailedWrites, 1)
				wal.mu.Lock()
				sf.clearUsage(sectorIndex)
				delete(sf.availableSectors, id)
				wal.mu.Unlock()
				return errDiskTrouble
			}

			// Try writing the sector metadata to disk.
			su := sectorUpdate{
				Count:  count,
				ID:     id,
				Folder: sf.index,
				Index:  sectorIndex,
			}
			err = wal.writeSectorMetadata(sf, su)
			if err != nil {
				wal.cm.log.Printf("ERROR: Unable to write sector metadata for folder %v: %v\n", sf.path, err)
				atomic.AddUint64(&sf.atomicFailedWrites, 1)
				wal.mu.Lock()
				sf.clearUsage(sectorIndex)
				delete(sf.availableSectors, id)
				wal.mu.Unlock()
				return errDiskTrouble
			}

			// Sector added successfully, update the WAL and the state.
			sl := sectorLocation{
				index:         sectorIndex,
				storageFolder: sf.index,
				count:         count,
			}
			wal.mu.Lock()
			wal.appendChange(stateChange{
				SectorUpdates: []sectorUpdate{su},
			})
			delete(wal.cm.storageFolders[su.Folder].availableSectors, id)
			wal.cm.sectorLocations[id] = sl
			syncChan = wal.syncChan
			wal.mu.Unlock()
			return nil
		}()
		if err != nil {
			// End the loop if no storage folder proved suitable.
			if storageFolderIndex == -1 {
				storageFolders = nil
				break
			}

			// Remove the storage folder that failed and try the next one.
			storageFolders = append(storageFolders[:storageFolderIndex], storageFolders[storageFolderIndex+1:]...)
			continue
		}
		// Sector added successfully, break.
		break
	}
	if len(storageFolders) < 1 {
		return errInsufficientStorageForSector
	}

	// Wait for the synchronize.
	// sectors.
	<-syncChan
	return nil
}

// managedAddVirtualSector will add a virtual sector to the contract manager.
func (wal *writeAheadLog) managedAddVirtualSector(id sectorID, location sectorLocation) error {
	// Update the location count.
	if location.count == 65535 {
		return errMaxVirtualSectors
	}
	location.count += 1

	// Prepare the sector update.
	su := sectorUpdate{
		Count:  location.count,
		ID:     id,
		Folder: location.storageFolder,
		Index:  location.index,
	}

	// Append the sector update to the WAL.
	wal.mu.Lock()
	sf, exists := wal.cm.storageFolders[su.Folder]
	if !exists || atomic.LoadUint64(&sf.atomicUnavailable) == 1 {
		// Need to check that the storage folder exists before syncing the
		// commit that increases the virtual sector count.
		wal.mu.Unlock()
		return errStorageFolderNotFound
	}
	wal.appendChange(stateChange{
		SectorUpdates: []sectorUpdate{su},
	})
	wal.cm.sectorLocations[id] = location
	syncChan := wal.syncChan
	wal.mu.Unlock()
	<-syncChan

	// Update the metadata on disk. Metadata is updated on disk after the sync
	// so that there is no risk of obliterating the previous count in the event
	// that the change is not fully committed during unclean shutdown.
	err := wal.writeSectorMetadata(sf, su)
	if err != nil {
		// Revert the sector update in the WAL to reflect the fact that adding
		// the sector has failed.
		su.Count--
		wal.mu.Lock()
		wal.appendChange(stateChange{
			SectorUpdates: []sectorUpdate{su},
		})
		wal.cm.sectorLocations[id] = location
		wal.mu.Unlock()
		<-syncChan
		return build.ExtendErr("unable to write sector metadata during addSector call", err)
	}
	return nil
}

// managedDeleteSector will delete a sector (physical) from the contract manager.
func (wal *writeAheadLog) managedDeleteSector(id sectorID) error {
	// Write the sector delete to the WAL.
	var location sectorLocation
	var syncChan chan struct{}
	var sf *storageFolder
	err := func() error {
		wal.mu.Lock()
		defer wal.mu.Unlock()

		// Fetch the metadata related to the sector.
		var exists bool
		location, exists = wal.cm.sectorLocations[id]
		if !exists {
			return ErrSectorNotFound
		}
		sf, exists = wal.cm.storageFolders[location.storageFolder]
		if !exists || atomic.LoadUint64(&sf.atomicUnavailable) == 1 {
			wal.cm.log.Critical("deleting a sector from a storage folder that does not exist?")
			return errStorageFolderNotFound
		}

		// Inform the WAL of the sector update.
		wal.appendChange(stateChange{
			SectorUpdates: []sectorUpdate{{
				Count:  0,
				ID:     id,
				Folder: location.storageFolder,
				Index:  location.index,
			}},
		})

		// Delete the sector and mark the usage as available.
		delete(wal.cm.sectorLocations, id)
		sf.availableSectors[id] = location.index

		// Block until the change has been committed.
		syncChan = wal.syncChan
		return nil
	}()
	if err != nil {
		return err
	}
	<-syncChan

	// Only update the usage after the sector delete has been committed to disk
	// fully.
	wal.mu.Lock()
	delete(sf.availableSectors, id)
	sf.clearUsage(location.index)
	wal.mu.Unlock()
	return nil
}

// managedRemoveSector will remove a sector (virtual or physical) from the
// contract manager.
func (wal *writeAheadLog) managedRemoveSector(id sectorID) error {
	// Inform the WAL of the removed sector.
	var location sectorLocation
	var su sectorUpdate
	var sf *storageFolder
	var syncChan chan struct{}
	err := func() error {
		wal.mu.Lock()
		defer wal.mu.Unlock()

		// Grab the number of virtual sectors that have been committed with
		// this root.
		var exists bool
		location, exists = wal.cm.sectorLocations[id]
		if !exists {
			return ErrSectorNotFound
		}
		sf, exists = wal.cm.storageFolders[location.storageFolder]
		if !exists || atomic.LoadUint64(&sf.atomicUnavailable) == 1 {
			wal.cm.log.Critical("deleting a sector from a storage folder that does not exist?")
			return errStorageFolderNotFound
		}

		// Inform the WAL of the sector update.
		location.count--
		su = sectorUpdate{
			Count:  location.count,
			ID:     id,
			Folder: location.storageFolder,
			Index:  location.index,
		}
		wal.appendChange(stateChange{
			SectorUpdates: []sectorUpdate{su},
		})

		// Update the in-memeory representation of the sector.
		if location.count == 0 {
			// Delete the sector and mark it as available.
			delete(wal.cm.sectorLocations, id)
			sf.availableSectors[id] = location.index
		} else {
			// Reduce the sector usage.
			wal.cm.sectorLocations[id] = location
		}
		syncChan = wal.syncChan
		return nil
	}()
	if err != nil {
		return err
	}
	// synchronize before updating the metadata or clearing the usage.
	<-syncChan

	// Update the metadata, and the usage.
	if location.count != 0 {
		err = wal.writeSectorMetadata(sf, su)
		if err != nil {
			// Revert the previous change.
			wal.mu.Lock()
			su.Count++
			location.count++
			wal.appendChange(stateChange{
				SectorUpdates: []sectorUpdate{su},
			})
			wal.cm.sectorLocations[id] = location
			wal.mu.Unlock()
			return build.ExtendErr("failed to write sector metadata", err)
		}
	}

	// Only update the usage after the sector removal has been committed to
	// disk entirely. The usage is not updated until after the commit has
	// completed to prevent the actual sector data from being overwritten in
	// the event of unclean shutdown.
	if location.count == 0 {
		wal.mu.Lock()
		sf.clearUsage(location.index)
		delete(sf.availableSectors, id)
		wal.mu.Unlock()
	}
	return nil
}

// writeSectorMetadata will take a sector update and write the related metadata
// to disk.
func (wal *writeAheadLog) writeSectorMetadata(sf *storageFolder, su sectorUpdate) error {
	err := writeSectorMetadata(sf.metadataFile, su.Index, su.ID, su.Count)
	if err != nil {
		wal.cm.log.Printf("ERROR: unable to write sector metadata to folder %v when adding sector: %v\n", su.Folder, err)
		atomic.AddUint64(&sf.atomicFailedWrites, 1)
		return err
	}
	atomic.AddUint64(&sf.atomicSuccessfulWrites, 1)
	return nil
}

// AddSector will add a sector to the contract manager.
func (cm *ContractManager) AddSector(root crypto.Hash, sectorData []byte) error {
	// Prevent shutdown until this function completes.
	err := cm.tg.Add()
	if err != nil {
		return err
	}
	defer cm.tg.Done()

	// Hold a sector lock throughout the duration of the function, but release
	// before syncing.
	id := cm.managedSectorID(root)
	cm.wal.managedLockSector(id)
	defer cm.wal.managedUnlockSector(id)

	// Determine whether the sector is virtual or physical.
	cm.wal.mu.Lock()
	location, exists := cm.sectorLocations[id]
	cm.wal.mu.Unlock()
	if exists {
		err = cm.wal.managedAddVirtualSector(id, location)
	} else {
		err = cm.wal.managedAddPhysicalSector(id, sectorData, 1)
	}
	if err != nil {
		cm.log.Println("ERROR: Unable to add sector:", err)
		return err
	}
	return nil
}

// AddSectorBatch is a non-ACID call to add a bunch of sectors at once.
// Necessary for compatibility with old renters.
//
// TODO: Make ACID, and definitely improve the performance as well.
func (cm *ContractManager) AddSectorBatch(sectorRoots []crypto.Hash) error {
	// Prevent shutdown until this function completes.
	err := cm.tg.Add()
	if err != nil {
		return err
	}
	defer cm.tg.Done()

	// Add each sector in a separate goroutine.
	var wg sync.WaitGroup
	for _, root := range sectorRoots {
		wg.Add(1)
		go func(root crypto.Hash) {
			defer wg.Done()

			// Hold a sector lock throughout the duration of the function, but release
			// before syncing.
			id := cm.managedSectorID(root)
			cm.wal.managedLockSector(id)
			defer cm.wal.managedUnlockSector(id)

			// Add the sector as virtual.
			cm.wal.mu.Lock()
			location, exists := cm.sectorLocations[id]
			cm.wal.mu.Unlock()
			if exists {
				cm.wal.managedAddVirtualSector(id, location)
			}
		}(root)
	}
	wg.Wait()
	return nil
}

// DeleteSector will delete a sector from the contract manager. If multiple
// copies of the sector exist, all of them will be removed. This should only be
// used to remove offensive data, as it will cause corruption in the contract
// manager. This corruption puts the contract manager at risk of failing
// storage proofs. If the amount of data removed is small, the risk is small.
// This operation will not destabilize the contract manager.
func (cm *ContractManager) DeleteSector(root crypto.Hash) error {
	cm.tg.Add()
	defer cm.tg.Done()
	id := cm.managedSectorID(root)
	cm.wal.managedLockSector(id)
	defer cm.wal.managedUnlockSector(id)

	return cm.wal.managedDeleteSector(id)
}

// RemoveSector will remove a sector from the contract manager. If multiple
// copies of the sector exist, only one will be removed.
func (cm *ContractManager) RemoveSector(root crypto.Hash) error {
	cm.tg.Add()
	defer cm.tg.Done()
	id := cm.managedSectorID(root)
	cm.wal.managedLockSector(id)
	defer cm.wal.managedUnlockSector(id)

	return cm.wal.managedRemoveSector(id)
}

// RemoveSectorBatch is a non-ACID call to remove a bunch of sectors at once.
// Necessary for compatibility with old renters.
//
// TODO: Make ACID, and definitely improve the performance as well.
func (cm *ContractManager) RemoveSectorBatch(sectorRoots []crypto.Hash) error {
	// Prevent shutdown until this function completes.
	err := cm.tg.Add()
	if err != nil {
		return err
	}
	defer cm.tg.Done()

	// Add each sector in a separate goroutine.
	var wg sync.WaitGroup
	for _, root := range sectorRoots {
		wg.Add(1)
		go func(root crypto.Hash) {
			id := cm.managedSectorID(root)
			cm.wal.managedLockSector(id)
			cm.wal.managedRemoveSector(id) // Error is ignored.
			cm.wal.managedUnlockSector(id)
			wg.Done()
		}(root)
	}
	wg.Wait()
	return nil
}