Design Issues ============= Digest returned to client ------------------------- The digest returned to clients determines the seqnos they have to expect from other group members. If, in the absence of a retransmission protocol, a client gets back a lower seqno for a member P than is actually the case, the client will never make any progress, because lower seqnos need to be delivered before those with higher values. Therefore the client would hang (again, in the absence of retransmission). The problem at hand is in the coordinator, when a new member joins: the coordinator returns its own digest, which contains the highest seqnos from all current members (including itself) seen so far. It also includes the new member, with a seqno of 0 (starting seqno). Let's assume the coordinator has a seqno of 35. Now, the new view is mcast before the call (handleJoin()) returns to the client (containing the digest). The client therefore gets a wrong seqno for the coordinator. The 2 scenarios are described below: A: The coordinator gets its digest and then mcasts the view. The seqno for the coordinator in the digest returned to the client would therefore be 35. The client will expect the next message from the coordinator to be a message labeled with seqno=35. However, the coordinator mcasts the new view, therefore incrementing its seqno to 36. The client discards this mcast, because it is not yet operational (it is only operational after setting the digest returned from the coordinator). Therefore the client will have a missing seqno=35 when it received seqno=36 or higher, which will cause it not to deliver any message higher than 35 until 35 is finally received. In the absence of retransmission, this would effectively hang the client. B: The coordinator mcasts the new view and then gets the digest. Here, the hope is that the mcast is sent *and* received before the GET_DIGEST event returns. Currently this is the case. However, in slow machine, the GET_DIGEST might actually return before the view mcast is received, giving us a seqno=35 again, compared to seqno=36 in the former case. Since the latter case is highly timing-dependent, we chose to adopt case #1. The kludge in case #1 is that, in order to avoid the client getting a lower seqno, we actually have to increment the coordinator's seqno by 1 in the digest returned to the client. This is done towards the end of the client's Join() method. 'Highest delivered' vs. 'highest deliverable' --------------------------------------------- Currently, we always return the highest delivered message seqno from NakReceiverWindow rather than the highest deliverable seqno. This would leade to discrepancies if for example a consumer did not consume messages, or did not consume them fast enough. However, in PBCAST, the HandleUpMessage() ensures that, as soon as it adds a message, it tried to remove as many as possible. Therefore, deliverable messages are always consumed, making highest deliverable == highest delivered. Local messages -------------- Assumption is that messages sent to self or mcast to the group are always received by self. If this is not the case, the PBCAST layer would have to store not only received messages, but also sent ones. (This is currently not the case, would be an easy change). Garbage collection lag-behind ----------------------------- When a gossip is received by a member, the member asks for retransmission of yet unseen messages (if necessary) and removes itself from the not-seen list of the gossip. Then it re-broadcasts the gossip (if not yet re-broadcast previously). When the member removes itself from the not-seen list, and the list is subsequently empty, this means that the gossip has been seen by all the members. Therefore, all messages lower than the ones in the gossip's message digest can be discarded. However, as the removal from the not-seen list only signals gossip reception, but not actual reception of the requested missing messages, we cannot really be sure that those messages have really been received. Therefore, the strategy adopted here is to introduce some 'lag', which is the number of messages that we should wait before really garbage-collecting messages in the digest. This number is subtracted from the digest's highest seqnos, so if for example the received digest is [P:25, Q:3, R:100] and the lag is 10, then we would garbage-collect messages lower than P:15, R:90 and none of Q's messages. The lag value can be set differently for different protocol stacks, therefore we could determine a number of members that are "late-join-handlers" (see Birman et al). Those members would have a high lag values, to make sure that at least some of the group members have almost all of the messages ready, e.g. for new joiners who want to catch up. As a matter of fact the late-join-handlers may choose to store their message digests in stable storage, e.g. a database. GET_DIGEST vs. GET_DIGEST_STATE event ------------------------------------- The latter is used by the STATE_TRANSFER layer to fetch the digest from the PBCAST layer. However, as GMS (below STATE_TRANSFER) also uses the GET_DIGEST family of events, there would be interference. E.g. when STATE_TRANSFER sends down a GET_DIGEST event and waits for the GET_DIGEST_OK event, it will never receive it because GMS catches and handles it, and does not pass it up, although it was not destined for GMS, but for STATE_TRANSFER. This is the reason why there is yet another event GET_DIGEST_STATE. Clearly a kludge, the better solution would be to tag an event with its sending protocol, have the receiver construct a response event from it, and have every receiver of the response discriminate according to its own identity. Similar scheme as the one used for sending messages. Handling of received gossips (PBCAST.handleGossip() method) ----------------------------------------------------------- First make sure that the sender of the gossip is not null. If it was null, we wouldn't be able to ask it for retransmission of missing messages. We check for this right at the beginning of the method, so we can return immediately and don't have to do all the processing if the sender is null. Then a check is made whether the sender really is a member. If it is not, we return as well. There are 2 cases when this can happen: (a) when a member left the group and (b) when a new member joined, but the membership is not yet updated because the current member has not yet received the new membership list. In the first case, we can safely discard the gossip because we won't need any of the messages of the dead member. Gossips sent by dead members will only be in the system for a short time, until everybody has received them. In the second case, which is assumed to be very rare, a new member S has joined the group, received its digest from the coordinator and, before the coordinator's new view was received by a member P, that member receives a gossip from S. This is highly unlikely, but P will just discard S's gossip. This is not a problem, because S will continue sending gossips, and at some point will be a member, therefore its gossips will not be dropped any longer. Now, we check whether the gossip has been received before. We keep a list of gossips received (bounded cache in which the oldest gossips are removed before adding a new one). If the gossip has been seen before, it is discarded and the method returns. Otherwise we add the gossip to the end of the list. Since a gossip was already received, we already removed ourself from the not-seen list (see below) and re-sent the gossip, so we don't need to handle it again. This also prevents network flooding by gossips, e.g. caused by spurious retransmission requests, all to the same sender (the one who sent the gossip). Now we compare the digest received as part of the gossip for messages (from any senders) that we haven't seen yet. If there are any missing message, we send a retransmission request (a hashtable of senders and the messages missing from those senders) to the sender of the gossip. Next, we check who has already seen this gossip. If the not-seen list is empty (after removing ourselves, because we have now seen the gossip), we can assume that every other member in the group has received the messages in the gossip's digest, and therfore garbage collect the messages seen by all members. If the not-seen list is empty, we also do not need to resend this gossip, because everybody has already seen it. Therefore, the method returns. If there are still members in the not-seen list, we pick a random subset of those members and resend the gossip to them. Great care has to be taken not to flood the network. For example, if the group size is big and the gossips are resent to 30% of the members in a relatively short time interval (gossip_interval), then the network might easily become flooded. This is because every receiver potentially resends the gossip to another 30% of the members and because each receiver might request retransmission from a single sender of a gossip, which might lead to a message implosion problem. On the todo list for JavaGroups is therefore a mechanism which dynamically adjusts the subset percentage and the gossip interval with increasing or decreasing group size. Another additional line of defense is to have a bounded buffer in PBCAST to which all PBCAST-related messages (e.g. gossips, xmit-reqs, xmit-rsps) are added. When the buffer is full, new requests will just be discarded. This prevents flooding of both the buffers and the network (e.g. by not forwarding gossips). Regular unicast messages are passed up the stack, and multicast messages are directly handled (higher priority than gossips). Dynamic adjustment of gossip_interval in PBCAST ----------------------------------------------- The gossip_interval (frequency with which gossips are sent by each member) can optionally be determined dynamically by JavaGroups as a function of the group size. We assume that almost every gossip reaches every other member (definitely if use_mcast_for_gossip is true). Therefore a gossip is used by each member which received it to adjust the time it hasn't heard from the members which have received the same gossip. This is done by resetting the times for all members which are in the gossip's path, i.e. the seen_list. The desired average gossip time (desired_avg_gossip) defines the average time between gossip receptions for a member. E.g. if it is set to 30 seconds, then the member will receive a gossip every 30 seconds on average. The gossip_interval is computed as follows: gossip_interval = Random(desired_avg_gossip * 2 * num_mbrs) For example: if the desired average gossip reception time is 30 secs, and we have 10 members in the group, then the gossip_interval will be randomly chosen from the range [1 - 600secs]. The average random value will be 300secs. Divided by 10 members, the average gossip reception will be 30 secs. The gossip_interval is determined anew before each gossip, so that members vary their gossip_interval. Thus, gossip_interval increases with group size. The following rules govern whether static or dynamic determination of gossip_interval will be used: - if dynamic: dynamic computation of gossip_interval --> gossip_interval is ignored --> desired_avg_gossip and number of members is used to determine gossip_interval - if dynamic=false: static use of gossip_interval --> gossip_interval is set Tracing ------- Most diagnostic output is through JavaGroups.Common.Trace, e.g. Trace.println("FD.run()", Trace.WARN, "could not ping " + dest); The Trace.setOutput() methods enable to switch tracing on/off, so the above message will be written (e.g. to a file) or not. However, if we don't want any output, the 2 strings "FD.run()" and "could not ping" will still be created, which results in a lot of garbage. Therefore, the convention was adopted in the Protocols and Protocols/pbcast directories to check for the boolean flag 'trace' first, and only if it is true, to actually call Trace.println(). Therefore the above example can be rewritten as if(trace) Trace.println("FD.run()", Trace.WARN, "could not ping " + dest); The 2 strings will only be created if 'trace' is true, which saves a lot of accumulated creation time and memory when JavaGroups needs to be run without tracing enabled. Note that code that will only be executed in exceptional cases does not necessarily be preceeded by the "if(trace)" conditional statement, e.g. try { // try out something } catch(Exception ex) { // should never be reached Trace.println("FD.PingDest()", Trace.DEBUG, ex.printStackTrace()); } To balance the need to avoid excesssive string creation on the one hand and the overhead and adverse effect of trace statements on readability, as a rule of thumb, the "if(trace)" statement should be used in places where a big number of strings would be generated if trace is disabled. Membership Updating for JOIN/LEAVE requests ------------------------------------------- When many members join a group at the same time, there were inconsistent views mcast by the coordinator (P). The reason was that P made a *copy* of its membership M1, added the new member and mcast the new membership M2 to all members. P would only update its membership upon reception of M2. However, if there were client JOINs before receiving M2, P would compute the new membership based on its (old) M1, which would not take into account members who joined between M1 and M2. Solution: P updates its membership locally before mcasting the new membership. As soon as M2 is received, the membership will be set to M2 (which is the same). There is a little twist on the above solution: when a coordinator P receives a number of simultaneous JOIN requests (e.g. from A, B, C), the membership will be {P, A, B, C} after C's request has been processed. However, if we then receive the view for the first join, which is {P, A}, the membership would be reset to {P, A}. Further JOIN requests received before view {P, A, B, C} would operate on {P, A} instead of the correct {P, A, B, C}. Solution: a temporary membership list keeps track of the unofficial membership. This is one hasn't yet been made official by means of a view change, but which keeps joining members which are not yet contained in the official membership. [bela Nov 22 2003]: implemented the same mechanism for LEAVE requests (using a 'leaving' list in pbcast.GMS) Last Message dropped in NAKACK ------------------------------ When a negative acknowledgment scheme (NACK or NAK) is used, senders send monotonically increasing sequence numbers (seqnos) and receivers expect them in the same sequence. If a gap is detected at a receiver R, R will send a retransmit request to the sender of that message. However, there is a problem: if a receiver R does not receive the last message M sent by P, and P does not send more messages, then R will not know that P sent M and therefore not request retransmission. This will be the case until P sends another message M'. At this point, R will request retransmission of M from P and only deliver M' after M has been received. Since this may never be the case, or take a long time, the following solution has been adopted: the STABLE layer includes an array of the highest seqnos received for each member. When a gossip has been received from each member, the stability vector will be sent by the STABLE layer up the stack to the NAKACK layer. The NAKACK protocol will then do its garbage collection based on the stability vector received. In addition, it will also check whether it has a copy of the highest messages for each sender, as indicated in the stability vector. If it doesn't, it will request retransmission of the missing message(s). A retransmission would only occur if (a) a message was not received and (b) it was the last message. MERGE2 protocol --------------- This is a description of the new merge protocol, consisting of the MERGE2 protocol and the pbcast.GMS and pbcast.Coord/Part/Client modules. The MERGE2 protocol periodically fetches the initial membership (using the FIND_INITIAL_MBRS event, handled e.g. by the PING protocol). When it discovers that there is more than 1 coordinator, it sends a MERGE event up the stack with the list of coordinators as argument. For details see the javadoc for MERGE2. Changes to the group membership protocol were made in the pbcast directory. Those modifications will be backported to the main protocol directory. The following discussion refers to the changes in pbcast. Overview: First a merge leader is determined. This is done by lexical sorting of the coordinators' addresses and taking the first one. Because this is deterministic, and the outcome the same at all coordinators, we don't need an additional round of messaging for leader election. The leader then sends MERGE_REQ to all coordinators. Each coordinator returns its view and digest. The leader merges them into one view/digest. This data is subsequently sent to each coordinator, who in turn rebroadcasts it to the members of its subgroup. Each member (including the coordinator) will then install the (same) new digest and view, thus agreeing on the membership and forming one single group. On view installation, each coordinator checks whether it is still coordinator (of the new group), and becomes participant if not so. It is possible that, during a merge, members send messages (e.g. after the digest has been returned). The digest would obviously contain lower sequence numbers than actually send. For example, member P's digest entry shows P's highest seqno to be 14, but it actually send 15 and 16 after the digest was returned. In this case, there may be retransmissions of messages 15 and 16 from P. The following section describes the implementation in pseudo code. When GMS receives a MERGE event, it calls impl.Merge(evt.GetArg()). Only the coordinators react. CoordGmsImpl.Merge(): - Returns if merge already in progress - Else: set merging to true, create merge id - Determine merge leader - If leader: start MergeTask (needs to be done asynchronously to avoid blocking) MergeTask: - Send MERGE_REQ (with merge id) to other coords and wait for all responses (including self) - Remove the rejected or missing MERGE_RSPS from list of coordinators - If resulting list is empty (containing only self): abort thread, set merging to false - Else: - Merge all digests. Should be mutually exclusive (different subgroups). If not, take maximum of all digests - Establish new view (max of all ViewIds + 1) - Establish new membership (merge all members and sort) - New coordinator is first member of new membership - Send INSTALL_MERGE_VIEW to all coordinators - Terminate MergeTask Reception of MERGE_REQ (HandleMerge(): - If merging is false: return MERGE_RSP with rejected=true - Else: - Set merging to true - Set merge id - Set timer. If timer goes off, cancel merge (to prevent hangs if merge leader dies) - Get digest and view - Return MERGE_RSP with digest/view to sender Reception of INSTALL_MERGE_VIEW: - If merging == true and merge ids are the same: - Cast view (MergeView) to all members of subgroup (including view and digest) Reception of MergeView: - If digest: merge digest with current digest - Install view - If not coord anymore: become participant - Send VIEW_CHANGE (with MergeView as parameter) up the stack - Set merging to false pbcast.STABLE protocol ---------------------- - Every member maintains a digest: the highest seqno *delivered* for each member, e.g. A:22, B:2, C:19. This means the highest delivered messages were 22 for A, 2 for B and 19 for C. Note that the highest delivered message is always <= the highest *see* (or *received*) message - Periodically (or after having received a certain number of bytes), every member multicasts its digest - When another member receives the digest, it updates its own digest such that the minimum of its own seqnos per member and the seqnos from the other digest becomes the new seqnos - Every member also maintains a heard_from list, which is initialized to all members - When a digest is received, we update our own digest, then remove the sender from the heard_from list - When the heard_from list is empty, because we have heard from all the members, we send our digest as a *stability* message. This is done in a randomly staggered fashion so that we can avoid everyone sending their stability digest at the same time. When a member receives a stability digest, it cancels the sending of its own stability digest - When a member receives a STABILITY message, it can be sure that everyone agreed on it, and therefore removes all seqnos <= the ones in the stability digest - One problem is that everyone needs to agree, we cannot have unilateral action. The result could be that someone removes messages from its store that have not yet been seen by everyone, therefore it cannot serve those messages when a retransmission is requested One way in which this can happen is as follows: - 2 members: A,B - A receives a stable message from B, and removes B from its heard_from list - A sets its own digest, but this time it has 3 members ! (A,B,C) - A removes itself from the heard_from list, and since that list is now empty, it sends a stability message {A,B,C}. However, B has *not* agreed on C's highest delivered seqnos, so this is unilateral action and may cause the above problem. Solution: when any member receives a digest that doesn't have the exact same members, it does the following: - Reset the heard_from list - This will ensure that (a) no stable message is sent and (b) no stability message is sent