File: subscribe.go

package info (click to toggle)
sia 1.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,340 kB
  • sloc: makefile: 80; sh: 52
file content (108 lines) | stat: -rw-r--r-- 3,576 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package transactionpool

import (
	"github.com/NebulousLabs/Sia/build"
	"github.com/NebulousLabs/Sia/encoding"
	"github.com/NebulousLabs/Sia/modules"
	"github.com/NebulousLabs/Sia/types"
)

// updateSubscribersTransactions sends a new transaction pool update to all
// subscribers.
func (tp *TransactionPool) updateSubscribersTransactions() {
	diff := new(modules.TransactionPoolDiff)
	// Create all of the diffs for reverted sets.
	for id := range tp.subscriberSets {
		// The transaction set is still in the transaction pool, no need to
		// create an update.
		_, exists := tp.transactionSets[id]
		if exists {
			continue
		}

		// Report that this set has been removed. Negative diffs don't have all
		// fields filled out.
		diff.RevertedTransactions = append(diff.RevertedTransactions, modules.TransactionSetID(id))
	}

	// Clear the subscriber sets map.
	for _, revert := range diff.RevertedTransactions {
		delete(tp.subscriberSets, TransactionSetID(revert))
	}

	// Create all of the diffs for sets that have been recently created.
	for id, set := range tp.transactionSets {
		_, exists := tp.subscriberSets[id]
		if exists {
			// The transaction set has already been sent in an update.
			continue
		}

		// Report that this transaction set is new to the transaction pool.
		ids := make([]types.TransactionID, 0, len(set))
		sizes := make([]uint64, 0, len(set))
		for i := range set {
			encodedTxn := encoding.Marshal(set[i])
			sizes = append(sizes, uint64(len(encodedTxn)))
			ids = append(ids, set[i].ID())
		}
		ut := &modules.UnconfirmedTransactionSet{
			Change: tp.transactionSetDiffs[id],
			ID:     modules.TransactionSetID(id),

			IDs:          ids,
			Sizes:        sizes,
			Transactions: set,
		}
		// Add this diff to our set of subscriber diffs.
		tp.subscriberSets[id] = ut
		diff.AppliedTransactions = append(diff.AppliedTransactions, ut)
	}

	for _, subscriber := range tp.subscribers {
		subscriber.ReceiveUpdatedUnconfirmedTransactions(diff)
	}
}

// TransactionPoolSubscribe adds a subscriber to the transaction pool.
// Subscribers will receive the full transaction set every time there is a
// significant change to the transaction pool.
func (tp *TransactionPool) TransactionPoolSubscribe(subscriber modules.TransactionPoolSubscriber) {
	tp.mu.Lock()
	defer tp.mu.Unlock()

	// Check that this subscriber is not already subscribed.
	for _, s := range tp.subscribers {
		if s == subscriber {
			build.Critical("refusing to double-subscribe subscriber")
		}
	}

	// Add the subscriber to the subscriber list.
	tp.subscribers = append(tp.subscribers, subscriber)

	// Send the new subscriber the transaction pool set.
	diff := new(modules.TransactionPoolDiff)
	diff.AppliedTransactions = make([]*modules.UnconfirmedTransactionSet, 0, len(tp.subscriberSets))
	for _, ut := range tp.subscriberSets {
		diff.AppliedTransactions = append(diff.AppliedTransactions, ut)
	}
	subscriber.ReceiveUpdatedUnconfirmedTransactions(diff)
}

// Unsubscribe removes a subscriber from the transaction pool. If the
// subscriber is not in tp.subscribers, Unsubscribe does nothing. If the
// subscriber occurs more than once in tp.subscribers, only the earliest
// occurrence is removed (unsubscription fails).
func (tp *TransactionPool) Unsubscribe(subscriber modules.TransactionPoolSubscriber) {
	tp.mu.Lock()
	defer tp.mu.Unlock()

	// Search for and remove subscriber from list of subscribers.
	for i := range tp.subscribers {
		if tp.subscribers[i] == subscriber {
			tp.subscribers = append(tp.subscribers[0:i], tp.subscribers[i+1:]...)
			break
		}
	}
}