1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
RELAY - replication between data centers
========================================
Author: Bela Ban
This is an enhanced version of DataCenterReplication.txt with the ability to send unicast messages and to provide views
to the application, which list members of all local clusters.
We have data centers, each with a local cluster, in New York (NYC) and San Francisco (SFO). The idea is to relay
traffic from NYC to SFO, and vice versa.
In case of a site failure of NYC, the state is available in SFO, and all clients can be switched over to SFO and
continue working with (almost) up-to-date data. The failing over of clients to SFO is outside the scope of this
proposal, and could be done for example by changing DNS entries, load balancers etc.
The data centers in NYC and SFO are *completely autonomous local clusters*. There are no stability, flow control or
retransmission messages exchanged between NYC and SFO. This is critical because we don't want the SFO cluster to block
for example on waiting for credits from a node in the NYC cluster !
For the example, we assume that each site uses a UDP based stack, and relaying between the sites uses a
TCP based stack, see figure RELAY.png.
There is a local cluster, based on UDP, at each site and one global cluster, based on TCP, which connects the
two sites. Each coordinator of the local cluster is also a member of the global cluster, e.g. member A in NYC
(assuming it is the coordinator) is also member X of the TCP cluster. This is called a *relay* member. A relay
member is always member of the local and global cluster, and therefore has 2 channels it joins.
A relay member has a UDP stack which additionally contains a protocol RELAY at the top (shown in the bottom part
of the figure). RELAY has a JChannel which connects to the TCP group, but *only* when it is (or becomes) coordinator
of the local cluster. The configuration of the TCP channel is done via a property in RELAY.
A multicast message received by RELAY traveling up the stack is wrapped and sent via the TCP channel to the
other site. When received there, the corresponding RELAY protocol unwraps the original message and changes the sender
of the message to a ProxyUUID, which wraps the original sender and the local sender.
A ProxyUUID extends UUID and behaves like a normal UUID, but it also contains the original sender.
A unicast message received by RELAY traveling down the stack is forwarded to the current relay if the destination is
a ProxyUUID. The relay will then wrap the message and forward it to the other site via TCP.
When boradcasting a relayed message on the local cluster, RELAY adds a header. When it receives the multicast message it
forwarded itself, and a header is present, it does *not* relay it back to the other site but simply drops it.
Otherwise, we would have a cycle.
When a coordinator crashes or leaves, the next-in-line becomes coordinator and activates the RELAY protocol,
connecting to the TCP channel and starting to relay messages.
However, if we receive messages from the local cluster while the coordinator has crashed and the new one hasn't taken
over yet, we'd lose messages. Therefore, we need additional functionality in RELAY which buffers the last N messages
(or M bytes, or for T seconds) and numbers all messages sent. This is done by the second-in-line.
When there is a coordinator failover, the new coordinator communicates briefly with the other site to determine
which was the highest message relayed by it. It then forwards buffered messages with lower numbers and removes the
remaining messages in the buffer. During this replay, message relaying is suspended.
Therefore, a relay has to handle 3 types of messages from the global (TCP) cluster:
(1) Regular multicast messages
(2) A message asking for the highest sequence number received from another relay, and the response to this
(3) A message stating that the other side will go down gracefully (no need to replay buffered messages)
Example walkthrough
-------------------
Multicasting a message:
- C (in the NYC cluster, with coordinator A) multicasts a message
- A, B and C receive the multicast
- A is the relay. The byte buffer is extracted and a new message M is created. M's source is C, the dest is null
(= send to all). Note that the original headers are *not* sent with M. If this is needed, we need to revisit.
- A then wraps M into a message sent from X to Y
- X receives M, drops it (because it is the sender, determined by the header).
- Y receives M, and unwraps it.
- Y replaces the sender (C) with a ProxyUUID(D,C) (D is the sender and C the origial sender), adds a RelayHeader and
sends it down its local cluster
- D, E and F receive M and deliver it
- D does not relay M because M has a header
Sending a unicast reply:
- When F receives the multicast message M, it sends a unicast reply message R
- R.dest=ProxyUUID(D,C) and R.src=F
- RELAY.down() sees that R.dest is a ProxyUUID and therefore forwards R to the current relay (which is D)
- RELAY.up() in D sees that the destination is a ProxyUUID and relays the message, via Y to X
- D sets the destination of R to C, wraps the message and sends it to X (via the TCP cluster)
- A receives R (from X) and replaces R.src with a ProxyUUID(C,F)
- A puts R on the local channel where it is sent to C
Implementation
--------------
Becoming coordinator:
- Join TCP channel
- Register receiver
Ceasing to be coordinator:
- Leave TCP channel
RELAY.up(msg):
- If RelayHeader present: // coord
- If FORWARD && coordinator: forward(msg.buf); return
- If DISSEMINATE: pass up and return
- If VIEW: // see below
- Return
- Else:
- If multicast message && coordinator:
- Copy msg to M (don't copy headers)
- Serialize M into a buffer buf
- forward(buf)
- Pass up // unicast or multicast messages
RELAY.down(msg):
- If msg.dest is not a ProxyUUID: pass down, return
- forwardToCoord(msg)
- Return // don't pass down !
Receive message M from TCP channel:
- Switch RelayHeader:
- Case FORWARD:
- If sender = self: discard
- Else: deserialize M.buf into message M2 and putOnLocalCluster(M2)
- Case VIEW: // see below
forward(buf): // buf is the serialized message to be forwarded
- Create a message M with M.buf=buf, M.dst=null
- Add RelayHeader.FORWARD to M
- Put M onto the TCP channel
forwardToCoord(msg):
- Copy msg to M (don't copy headers)
- Set M.dst=msg.dst.original, M.src=local_addr
- Serialize M into a buffer buf
- Create message M2 (M2.buf=buf)
- Add RelayHeader.FORWARD to M2
- Send M2 to the current relay (coordinator)
putOnLocalCluster(M):
- Set M.src=ProxyUUID(local_addr,M.src)
- Add a RelayHeader.DISSEMINATE to M
- Put M on the local channel
View changes
------------
Local view changed:
- Set local view
- If coordinator:
- Broadcast remote and global view to local cluster
- Send remote view to remote cluster
- Every node (on reception):
- Update(RV, GV)
Remote view (RV) changed:
- Broadcast remote and global view to local cluster
- Every node (on reception):
- Update(RV, GV)
Update(RV,GV):
- Update remote view from RV if needed
- Install GV:
- If GV != current global view: set current view = GV and viewAccepted(GV)
Bridge view changed:
- If coordinator: send local view to remote
- If remote coordinator ('creator of remote view') crashed:
- Generate empty remote view V, generate global view and send RV and GV to local cluster
Issues:
- Do we copy the headers of a message M when M is relayed ? If not, an app won't be able to add their own headers
- Should we pass logical name information between the clusters ? Or should this be part of ProxyAddress ?
Todo:
#3 Handling temp coordinator outage - how do we prevent message loss ?
#4 State transfer - replication across clusters, to bootstrap initial coords in a local cluster
|