File: update.go

package info (click to toggle)
sia 1.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,340 kB
  • sloc: makefile: 80; sh: 52
file content (312 lines) | stat: -rw-r--r-- 10,248 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package host

// TODO: Need to check that 'RevisionConfirmed' is sensitive to whether or not
// it was the *most recent* revision that got confirmed.

import (
	"encoding/binary"
	"encoding/json"

	"github.com/NebulousLabs/Sia/crypto"
	"github.com/NebulousLabs/Sia/modules"
	"github.com/NebulousLabs/Sia/types"

	"github.com/NebulousLabs/bolt"
)

// initRescan is a helper function of initConsensusSubscribe, and is called when
// the host and the consensus set have become desynchronized. Desynchronization
// typically happens if the user is replacing or altering the persistent files
// in the consensus set or the host.
func (h *Host) initRescan() error {
	// Reset all of the variables that have relevance to the consensus set.
	var allObligations []storageObligation
	// Reset all of the consensus-relevant variables in the host.
	h.blockHeight = 0

	// Reset all of the storage obligations.
	err := h.db.Update(func(tx *bolt.Tx) error {
		bsu := tx.Bucket(bucketStorageObligations)
		c := bsu.Cursor()
		for k, soBytes := c.First(); soBytes != nil; k, soBytes = c.Next() {
			var so storageObligation
			err := json.Unmarshal(soBytes, &so)
			if err != nil {
				return err
			}
			so.OriginConfirmed = false
			so.RevisionConfirmed = false
			so.ProofConfirmed = false
			allObligations = append(allObligations, so)
			soBytes, err = json.Marshal(so)
			if err != nil {
				return err
			}
			err = bsu.Put(k, soBytes)
			if err != nil {
				return err
			}
		}
		return nil
	})
	if err != nil {
		return err
	}

	// Subscribe to the consensus set. This is a blocking call that will not
	// return until the host has fully caught up to the current block.
	//
	// Convention dictates that the host should not make external calls while
	// under lock, but this function happens at startup while blocking. Because
	// it happens while blocking, and because there is no actual host lock held
	// at this time, none of the host external functions are exposed, so it is
	// save to make the exported call.
	err = h.cs.ConsensusSetSubscribe(h, modules.ConsensusChangeBeginning)
	if err != nil {
		return err
	}
	h.tg.OnStop(func() {
		h.cs.Unsubscribe(h)
	})

	// Re-queue all of the action items for the storage obligations.
	for i, so := range allObligations {
		soid := so.id()
		err1 := h.queueActionItem(h.blockHeight+resubmissionTimeout, soid)
		err2 := h.queueActionItem(so.expiration()-revisionSubmissionBuffer, soid)
		err3 := h.queueActionItem(so.expiration()+resubmissionTimeout, soid)
		err = composeErrors(err1, err2, err3)
		if err != nil {
			h.log.Println("dropping storage obligation during rescan, id", so.id())
		}

		// AcceptTransactionSet needs to be called in a goroutine to avoid a
		// deadlock.
		go func(i int) {
			err := h.tpool.AcceptTransactionSet(allObligations[i].OriginTransactionSet)
			if err != nil {
				h.log.Println("Unable to submit contract transaction set after rescan:", soid)
			}
		}(i)
	}
	return nil
}

// initConsensusSubscription subscribes the host to the consensus set.
func (h *Host) initConsensusSubscription() error {
	// Convention dictates that the host should not make external calls while
	// under lock, but this function happens at startup while blocking. Because
	// it happens while blocking, and because there is no actual host lock held
	// at this time, none of the host external functions are exposed, so it is
	// save to make the exported call.
	err := h.cs.ConsensusSetSubscribe(h, h.recentChange)
	if err == modules.ErrInvalidConsensusChangeID {
		// Perform a rescan of the consensus set if the change id that the host
		// has is unrecognized by the consensus set. This will typically only
		// happen if the user has been replacing files inside the Sia folder
		// structure.
		return h.initRescan()
	}
	if err != nil {
		return err
	}
	h.tg.OnStop(func() {
		h.cs.Unsubscribe(h)
	})
	return nil
}

// ProcessConsensusChange will be called by the consensus set every time there
// is a change to the blockchain.
func (h *Host) ProcessConsensusChange(cc modules.ConsensusChange) {
	// Add is called at the beginning of the function, but Done cannot be
	// called until all of the threads spawned by this function have also
	// terminated. This function should not block while these threads wait to
	// terminate.
	h.mu.Lock()
	defer h.mu.Unlock()

	// Wrap the whole parsing into a single large database tx to keep things
	// efficient.
	var actionItems []types.FileContractID
	err := h.db.Update(func(tx *bolt.Tx) error {
		for _, block := range cc.RevertedBlocks {
			// Look for transactions relevant to open storage obligations.
			for _, txn := range block.Transactions {
				// Check for file contracts.
				if len(txn.FileContracts) > 0 {
					for j := range txn.FileContracts {
						fcid := txn.FileContractID(uint64(j))
						so, err := getStorageObligation(tx, fcid)
						if err != nil {
							// The storage folder may not exist, or the disk
							// may be having trouble. Either way, we ignore the
							// problem. If the disk is having trouble, the user
							// will have to perform a rescan.
							continue
						}
						so.OriginConfirmed = false
						err = putStorageObligation(tx, so)
						if err != nil {
							continue
						}
					}
				}

				// Check for file contract revisions.
				if len(txn.FileContractRevisions) > 0 {
					for _, fcr := range txn.FileContractRevisions {
						so, err := getStorageObligation(tx, fcr.ParentID)
						if err != nil {
							// The storage folder may not exist, or the disk
							// may be having trouble. Either way, we ignore the
							// problem. If the disk is having trouble, the user
							// will have to perform a rescan.
							continue
						}
						so.RevisionConfirmed = false
						err = putStorageObligation(tx, so)
						if err != nil {
							continue
						}
					}
				}

				// Check for storage proofs.
				if len(txn.StorageProofs) > 0 {
					for _, sp := range txn.StorageProofs {
						// Check database for relevant storage proofs.
						so, err := getStorageObligation(tx, sp.ParentID)
						if err != nil {
							// The storage folder may not exist, or the disk
							// may be having trouble. Either way, we ignore the
							// problem. If the disk is having trouble, the user
							// will have to perform a rescan.
							continue
						}
						so.ProofConfirmed = false
						err = putStorageObligation(tx, so)
						if err != nil {
							continue
						}
					}
				}
			}

			// Height is not adjusted when dealing with the genesis block because
			// the default height is 0 and the genesis block height is 0. If
			// removing the genesis block, height will already be at height 0 and
			// should not update, lest an underflow occur.
			if block.ID() != types.GenesisID {
				h.blockHeight--
			}
		}
		for _, block := range cc.AppliedBlocks {
			// Look for transactions relevant to open storage obligations.
			for _, txn := range block.Transactions {
				// Check for file contracts.
				if len(txn.FileContracts) > 0 {
					for i := range txn.FileContracts {
						fcid := txn.FileContractID(uint64(i))
						so, err := getStorageObligation(tx, fcid)
						if err != nil {
							// The storage folder may not exist, or the disk
							// may be having trouble. Either way, we ignore the
							// problem. If the disk is having trouble, the user
							// will have to perform a rescan.
							continue
						}
						so.OriginConfirmed = true
						err = putStorageObligation(tx, so)
						if err != nil {
							continue
						}
					}
				}

				// Check for file contract revisions.
				if len(txn.FileContractRevisions) > 0 {
					for _, fcr := range txn.FileContractRevisions {
						so, err := getStorageObligation(tx, fcr.ParentID)
						if err != nil {
							// The storage folder may not exist, or the disk
							// may be having trouble. Either way, we ignore the
							// problem. If the disk is having trouble, the user
							// will have to perform a rescan.
							continue
						}
						so.RevisionConfirmed = true
						err = putStorageObligation(tx, so)
						if err != nil {
							continue
						}
					}
				}

				// Check for storage proofs.
				if len(txn.StorageProofs) > 0 {
					for _, sp := range txn.StorageProofs {
						so, err := getStorageObligation(tx, sp.ParentID)
						if err != nil {
							// The storage folder may not exist, or the disk
							// may be having trouble. Either way, we ignore the
							// problem. If the disk is having trouble, the user
							// will have to perform a rescan.
							continue
						}
						so.ProofConfirmed = true
						err = putStorageObligation(tx, so)
						if err != nil {
							continue
						}
					}
				}
			}

			// Height is not adjusted when dealing with the genesis block because
			// the default height is 0 and the genesis block height is 0. If adding
			// the genesis block, height will already be at height 0 and should not
			// update.
			if block.ID() != types.GenesisID {
				h.blockHeight++
			}

			// Handle any action items relevant to the current height.
			bai := tx.Bucket(bucketActionItems)
			heightBytes := make([]byte, 8)
			binary.BigEndian.PutUint64(heightBytes, uint64(h.blockHeight)) // BigEndian used so bolt will keep things sorted automatically.
			existingItems := bai.Get(heightBytes)

			// From the existing items, pull out a storage obligation.
			knownActionItems := make(map[types.FileContractID]struct{})
			obligationIDs := make([]types.FileContractID, len(existingItems)/crypto.HashSize)
			for i := 0; i < len(existingItems); i += crypto.HashSize {
				copy(obligationIDs[i/crypto.HashSize][:], existingItems[i:i+crypto.HashSize])
			}
			for _, soid := range obligationIDs {
				_, exists := knownActionItems[soid]
				if !exists {
					actionItems = append(actionItems, soid)
					knownActionItems[soid] = struct{}{}
				}
			}
		}
		return nil
	})
	if err != nil {
		h.log.Println(err)
	}
	for i := range actionItems {
		go h.threadedHandleActionItem(actionItems[i])
	}

	// Update the host's recent change pointer to point to the most recent
	// change.
	h.recentChange = cc.ID

	// Save the host.
	err = h.saveSync()
	if err != nil {
		h.log.Println("ERROR: could not save during ProcessConsensusChange:", err)
	}
}