1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
|
package contractmanager
import (
"encoding/json"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/NebulousLabs/Sia/build"
)
// syncResources will call Sync on all resources that the WAL has open. The
// storage folder files will be left open, as they are not updated atomically.
// The settings file and WAL tmp files will be synced and closed, to perform an
// atomic update to the files.
func (wal *writeAheadLog) syncResources() {
// Syncing occurs over multiple files and disks, and is done in parallel to
// minimize the amount of time that a lock is held over the contract
// manager.
var wg sync.WaitGroup
// Sync the settings file.
wg.Add(1)
go func() {
defer wg.Done()
tmpFilename := filepath.Join(wal.cm.persistDir, settingsFileTmp)
filename := filepath.Join(wal.cm.persistDir, settingsFile)
err := wal.fileSettingsTmp.Sync()
if err != nil {
wal.cm.log.Severe("ERROR: unable to sync the contract manager settings:", err)
}
err = wal.fileSettingsTmp.Close()
if err != nil {
wal.cm.log.Println("unable to close the temporary contract manager settings file:", err)
}
// For testing, provide a place to interrupt the saving of the sync
// file. This makes it easy to simulate certain types of unclean
// shutdown.
if wal.cm.dependencies.disrupt("settingsSyncRename") {
// The current settings file that is being re-written will not be
// saved.
return
}
err = wal.cm.dependencies.renameFile(tmpFilename, filename)
if err != nil {
wal.cm.log.Severe("ERROR: unable to atomically copy the contract manager settings:", err)
}
}()
// Sync all of the storage folders.
for _, sf := range wal.cm.storageFolders {
// Skip operation on unavailable storage folders.
if atomic.LoadUint64(&sf.atomicUnavailable) == 1 {
continue
}
wg.Add(2)
go func(sf *storageFolder) {
defer wg.Done()
err := sf.metadataFile.Sync()
if err != nil {
wal.cm.log.Severe("ERROR: unable to sync a storage folder:", err)
}
}(sf)
go func(sf *storageFolder) {
defer wg.Done()
err := sf.sectorFile.Sync()
if err != nil {
wal.cm.log.Severe("ERROR: unable to sync a storage folder:", err)
}
}(sf)
}
// Sync the temp WAL file, but do not perform the atmoic rename - the
// atomic rename must be guaranteed to happen after all of the other files
// have been synced.
wg.Add(1)
go func() {
defer wg.Done()
err := wal.fileWALTmp.Sync()
if err != nil {
wal.cm.log.Severe("Unable to sync the write-ahead-log:", err)
}
err = wal.fileWALTmp.Close()
if err != nil {
// Log that the host is having trouble saving the uncommitted changes.
// Crash if the list of uncommitted changes has grown very large.
wal.cm.log.Println("ERROR: could not close temporary write-ahead-log in contract manager:", err)
return
}
}()
// Wait for all of the sync calls to finish.
wg.Wait()
// Now that all the Sync calls have completed, rename the WAL tmp file to
// update the WAL.
if !wal.cm.dependencies.disrupt("walRename") {
walTmpName := filepath.Join(wal.cm.persistDir, walFileTmp)
walFileName := filepath.Join(wal.cm.persistDir, walFile)
err := wal.cm.dependencies.renameFile(walTmpName, walFileName)
if err != nil {
// Log that the host is having trouble saving the uncommitted changes.
// Crash if the list of uncommitted changes has grown very large.
wal.cm.log.Severe("ERROR: could not rename temporary write-ahead-log in contract manager:", err)
}
}
// Perform any cleanup actions on the updates.
for _, sc := range wal.uncommittedChanges {
for _, sfe := range sc.StorageFolderExtensions {
wal.commitStorageFolderExtension(sfe)
}
for _, sfr := range sc.StorageFolderReductions {
wal.commitStorageFolderReduction(sfr)
}
for _, sfr := range sc.StorageFolderRemovals {
wal.commitStorageFolderRemoval(sfr)
}
// TODO: Virtual sector handling here.
}
// Now that the WAL is sync'd and updated, any calls waiting on ACID
// guarantees can safely return.
close(wal.syncChan)
wal.syncChan = make(chan struct{})
}
// commit will take all of the changes that have been added to the WAL and
// atomically commit the WAL to disk, then apply the actions in the WAL to the
// state. commit will do lots of syncing disk I/O, and so can take a while,
// especially if there are a large number of actions queued up.
//
// A bool is returned indicating whether or not the commit was successful.
// False does not indiciate an error, it can also indicate that there was
// nothing to do.
//
// commit should only be called from threadedSyncLoop.
func (wal *writeAheadLog) commit() {
// Sync all open, non-WAL files on the host.
wal.syncResources()
// Extract any unfinished long-running jobs from the list of WAL items.
unfinishedAdditions := findUnfinishedStorageFolderAdditions(wal.uncommittedChanges)
unfinishedExtensions := findUnfinishedStorageFolderExtensions(wal.uncommittedChanges)
// Clear the set of uncommitted changes.
wal.uncommittedChanges = nil
// Begin writing to the settings file.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Begin writing to the settings file, which will be synced during the
// next iteration of the sync loop.
var err error
wal.fileSettingsTmp, err = wal.cm.dependencies.createFile(filepath.Join(wal.cm.persistDir, settingsFileTmp))
if err != nil {
wal.cm.log.Severe("Unable to open temporary settings file for writing:", err)
}
ss := wal.cm.savedSettings()
b, err := json.MarshalIndent(ss, "", "\t")
if err != nil {
build.ExtendErr("unable to marshal settings data", err)
}
enc := json.NewEncoder(wal.fileSettingsTmp)
if err := enc.Encode(settingsMetadata.Header); err != nil {
build.ExtendErr("unable to write header to settings temp file", err)
}
if err := enc.Encode(settingsMetadata.Version); err != nil {
build.ExtendErr("unable to write version to settings temp file", err)
}
if _, err = wal.fileSettingsTmp.Write(b); err != nil {
build.ExtendErr("unable to write data settings temp file", err)
}
}()
// Begin writing new changes to the WAL.
wg.Add(1)
go func() {
defer wg.Done()
// Recreate the wal file so that it can receive new updates.
var err error
walTmpName := filepath.Join(wal.cm.persistDir, walFileTmp)
wal.fileWALTmp, err = wal.cm.dependencies.createFile(walTmpName)
if err != nil {
wal.cm.log.Severe("ERROR: unable to create write-ahead-log:", err)
}
// Write the metadata into the WAL.
err = writeWALMetadata(wal.fileWALTmp)
if err != nil {
wal.cm.log.Severe("Unable to properly initialize WAL file, crashing to prevent corruption:", err)
}
// Append all of the remaining long running uncommitted changes to the WAL.
wal.appendChange(stateChange{
UnfinishedStorageFolderAdditions: unfinishedAdditions,
UnfinishedStorageFolderExtensions: unfinishedExtensions,
})
}()
wg.Wait()
}
// spawnSyncLoop prepares and establishes the loop which will be running in the
// background to coordinate disk syncronizations. Disk syncing is done in a
// background loop to help with performance, and to allow multiple things to
// modify the WAL simultaneously.
func (wal *writeAheadLog) spawnSyncLoop() (err error) {
// Create a signal so we know when the sync loop has stopped, which means
// there will be no more open commits.
threadsStopped := make(chan struct{})
syncLoopStopped := make(chan struct{})
wal.syncChan = make(chan struct{})
go wal.threadedSyncLoop(threadsStopped, syncLoopStopped)
wal.cm.tg.AfterStop(func() {
// Wait for another iteration of the sync loop, so that the in-progress
// settings can be saved atomically to disk.
wal.mu.Lock()
syncChan := wal.syncChan
wal.mu.Unlock()
<-syncChan
// Close the threadsStopped channel to let the sync loop know that all
// calls to tg.Add() in the contract manager have cleaned up.
close(threadsStopped)
// Because this is being called in an 'AfterStop' routine, all open
// calls to the contract manager should have completed, and all open
// threads should have closed. The last call to change the contract
// manager should have completed, so the number of uncommitted changes
// should be zero.
<-syncLoopStopped // Wait for the sync loop to signal proper termination.
// Allow unclean shutdown to be simulated by disrupting the removal of
// the WAL file.
if !wal.cm.dependencies.disrupt("cleanWALFile") {
err = wal.cm.dependencies.removeFile(filepath.Join(wal.cm.persistDir, walFile))
if err != nil {
wal.cm.log.Println("Error removing WAL during contract manager shutdown:", err)
}
}
})
return nil
}
// threadedSyncLoop is a background thread that occasionally commits the WAL to
// the state as an ACID transaction. This process can be very slow, so
// transactions to the contract manager are batched automatically and
// occasionally committed together.
func (wal *writeAheadLog) threadedSyncLoop(threadsStopped chan struct{}, syncLoopStopped chan struct{}) {
// Provide a place for the testing to disable the sync loop.
if wal.cm.dependencies.disrupt("threadedSyncLoopStart") {
close(syncLoopStopped)
return
}
syncInterval := 500 * time.Millisecond
for {
select {
case <-threadsStopped:
close(syncLoopStopped)
return
case <-time.After(syncInterval):
// Commit all of the changes in the WAL to disk, and then apply the
// changes.
wal.mu.Lock()
wal.commit()
wal.mu.Unlock()
}
}
}
|