1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
// Author: Bela Ban
Flow control (FC)
=================
Constants:
----------
- max_credits (example 2000000)
- min_credits (example 200000)
Vars:
-----
- sent_msgs: Map<Address,Long>, maps destinations to credits
- received_msgs: Map<Address,Long>, maps receivers to credits
- running: termination condition
- insufficient_credit: condition, message sending blocks when true
- creditors: list of destination for which we current don't have enough credits to send messages
- lowest_credit: the lowest credits of any destination (sent_msgs)
- mutex: for blocking of a sender
Init:
-----
- running = true
- insufficient_credit = false
- lowest_credit = max_credits
On message send:
----------------
- length = length of message
- if (lowest_credit - length <= 0):
- insufficient_credit = true
[- determine creditors]
- while (insufficient_credit and running):
- wait on mutex (timeout)
- if timeout: send credit request to all creditors
- else
- in sent_msgs: decrement length from all members (if multicast) or specific member (if unicast)
- adjust lowest_credit
- send message
On message receive:
-------------------
- length = length of message
- sender = sender of message
- in received_msgs: subtract length from sender M
- if M doesn't have enough credits (credits left <= min_credits):
- send credits to M
- replenish credits for M (in received_msgs)
On credit receive from P:
-------------------------
- in sent_msgs: update credits for P
- compute lowest_credit
- remove P from creditors
- if (lowest_credit > 0 and creditor.size() == 0):
- set insufficient_credit = false
- notify mutex
On credit request receive from P:
---------------------------------
- send credits to P
On view change:
---------------
- Remove all entries from sent_msgs, received_msgs which are not in the new view
- Add entries for all members of the new view for which there are currently no entries
- Remove all creditors which are not in the new view
- Compute lowest_credit
- if (lowest_credit > 0 and creditor.size() == 0):
- set insufficient_credit = false
- notify mutex
On stop:
--------
- Set running = false
- Notify mutex
|