Design of SEQUENCER, a total order protocol using a sequencer ============================================================= Author: Bela Ban Date: Dec 29 2005 Motivation ---------- The TOTAL protocol works by asking the coordinator for a sequence number for every group message, but each member sends the message itself. This means a round trip to the coordinator, then a multicast. In the design below, we want the coordinator to directly multicast the message on behalf of the member, now we have 1 unicast to the coordinator and 1 multicast. The downside is that all message are now routed via the coordinator, which makes the latter the bottleneck. However, since it is only the coordinator who sends multicasts, we have no contention. Overview -------- When a member multicasts a group message, it sends it to the coordinator. The coord then wraps the message into its own message and multicasts it to the group. The SEQUENCER protocol (somewhere above NAKACK) unwraps the message so that the original sender is seen. It is the coordinator who decides on the ordering of messages from different senders. When receiving FORWARD messages from A, B and C, the coordinator establishes an ordered sequence, ordered by the seqno assigned by the coordinator when sending the message. This sequence is delivered in order at all receivers, therefore establishing total ordering. Note that there can be cases where a sender P sends message M1 and M2, but M2 is received first by the coordinator, then M1. In this case, M2 would be *before* M1 in the global sequence established by the coordinator. If this is not desired, then use a group RPC which gets acked by every node before a new message is sent. Example: group is {A,B,C}. B wants to multicast a message M to the group. It does so by sending M to A. A then adds a header which records the original sender (B) and replaces M's sender address with its own (A). When a member receives M, everybody thinks the message is from A, and NAKACK will possibly retransmit from A in case M is lost. However, SEQUENCER removes the header and replaces M's sender address with B. It has to do so on a (shallow) *copy* of M, otherwise we would change the original message and therefore retransmission requests would go to B, rather than A ! Reliability on failover of coordinator -------------------------------------- When sending messages to a coord, and that coord dies, then we simply resend the unacked messages to the new coord (on a VIEW_CHANGE). Each message has a unique tag, consisting of the sender's address and a monotonically increasing sequence number (seqno). The coord simply broadcasts the message (making sure that the tag is still in the message, as a header). When a member receives a broadcast message, it checks whether it already received that message, by comparing the message's seqno to the highest seqno it received for that sender. If received, the message will simply be discarded, otherwise the member will update its highest seqno received and deliver the message. So if a member C sends messages 1, 2, 3, 4, 5 and 6 to the coord, and the coord is able to broadcast all messages, but dies and C doesn't receive brodacasts 5 and 6 (therefore 5 and 6 are still in the forward-table), then C simply re-sends messages 5 and 6 to the new coord. The new coord broadcasts 5 and 6, but all members who received 5 and 6 before will simply discard them. C, who originally sent 5 and 6, will deliver them and remove 5 and 6 from its forward-table. The unit test that verifies this behavior is org.jgroups.tests.SequencerFailoverTest Design ------ Variables: - forward-table: Map, list of messages to be forwarded (sorted by seqno) that have not yet been received (as bcasts). Access patterns: add at end (frequent), remove by seqno (frequent), iterate through it in order of seqnos (infrequent): therefore a Map was preferred over a linked list - received-table: Map which keeps the highest seqno seen per member. This table has N entries, where N is the number of members in the group. Access patterns: lookup of message by seqno, update by seqno. Therefore a map was chosen On send(M): - Pass down unicasts and return (handle only multicasts) - Add a FORWARD header to M with the local_addr and a monotonically increasing seqno (use a ViewId) - If not coordinator: - Add message to the forward-table (when a broadcast is received, remove the corresponding message from the forward-table) - Send M to coordinator - Else - Multicast(M, local_addr) On stop(): - Stop handling new down messages - Wait for all replies until forward-table is empty, or a timeout (?) ** Comment: I decided not to implement this, because this implies flushing ** On reception of M from S: - If header is not present: - Pass up - Else (header is present) - If header is FORWARD: - If not coord: error and return - Else - Multicast(M,S) - If header is DATA: - Take sender S from header and call Deliver(M,S) Multicast(M,S): - Reuse M (no problem since UNICAST's retransmission buffer will have remove M when passed to SEQUENCER) - Replace FORWARD type in header with DATA type - Multicast M Deliver(M,S): - If M was sent by us (local_addr == S): - remove corresponding message from forward-table - If M was already delivered (M's seqno is <= the one in received-table): - Discard M - Else: - Update the received-table with M's seqno with key=S (needs to be 1 higher then prev seqno) - Shallow-copy M into M-tmp - Set M-tmp's sender address to S - Pass up M-tmp On view change: - If the coord leaves/crashes: - For each message M in forward-table (*in increasing order of seqnos* !): - Change M.dest to the new coord - Send M to the new coord - Remove entries for left members in received-table Comments -------- When forwarding we don't care about whether the message was forwarded before, e.g. by the previous coordinator. We simply forward, but discard duplicate messages when receiving the bcast. This is not overly efficient when failing over between coordinators, but makes the implementation simpler. SEQUENCER requires NAKACK and UNICAST. The latter is used for retransmission of unicast FORWARD requests, and NAKACK is used to deliver all messages from the coordinator in order and without gaps. Since we only have 1 member multicasting with FIFO properties, we essentially provide global order.