File: RELAY.txt

package info (click to toggle)
libjgroups-java 2.12.2.Final-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,712 kB
  • sloc: java: 109,098; xml: 9,423; sh: 149; makefile: 2
file content (188 lines) | stat: -rw-r--r-- 7,854 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188

RELAY - replication between data centers
========================================

Author: Bela Ban

This is an enhanced version of DataCenterReplication.txt with the ability to send unicast messages and to provide views
to the application, which list members of all local clusters.

We have data centers, each with a local cluster, in New York (NYC) and San Francisco (SFO). The idea is to relay
traffic from NYC to SFO, and vice versa.

In case of a site failure of NYC, the state is available in SFO, and all clients can be switched over to SFO and
continue working with (almost) up-to-date data. The failing over of clients to SFO is outside the scope of this
proposal, and could be done for example by changing DNS entries, load balancers etc.

The data centers in NYC and SFO are *completely autonomous local clusters*. There are no stability, flow control or
retransmission messages exchanged between NYC and SFO. This is critical because we don't want the SFO cluster to block
for example on waiting for credits from a node in the NYC cluster !

For the example, we assume that each site uses a UDP based stack, and relaying between the sites uses a
TCP based stack, see figure RELAY.png.

There is a local cluster, based on UDP, at each site and one global cluster, based on TCP, which connects the
two sites. Each coordinator of the local cluster is also a member of the global cluster, e.g. member A in NYC
(assuming it is the coordinator) is also member X of the TCP cluster. This is called a *relay* member. A relay
member is always member of the local and global cluster, and therefore has 2 channels it joins.

A relay member has a UDP stack which additionally contains a protocol RELAY at the top (shown in the bottom part
of the figure). RELAY has a JChannel which connects to the TCP group, but *only* when it is (or becomes) coordinator
of the local cluster. The configuration of the TCP channel is done via a property in RELAY.

A multicast message received by RELAY traveling up the stack is wrapped and sent via the TCP channel to the
other site. When received there, the corresponding RELAY protocol unwraps the original message and changes the sender
of the message to a ProxyUUID, which wraps the original sender and the local sender.

A ProxyUUID extends UUID and behaves like a normal UUID, but it also contains the original sender.

A unicast message received by RELAY traveling down the stack is forwarded to the current relay if the destination is
a ProxyUUID. The relay will then wrap the message and forward it to the other site via TCP.

When boradcasting a relayed message on the local cluster, RELAY adds a header. When it receives the multicast message it
forwarded itself, and a header is present, it does *not* relay it back to the other site but simply drops it.
Otherwise, we would have a cycle.

When a coordinator crashes or leaves, the next-in-line becomes coordinator and activates the RELAY protocol,
connecting to the TCP channel and starting to relay messages.

However, if we receive messages from the local cluster while the coordinator has crashed and the new one hasn't taken
over yet, we'd lose messages. Therefore, we need additional functionality in RELAY which buffers the last N messages
(or M bytes, or for T seconds) and numbers all messages sent. This is done by the second-in-line.

When there is a coordinator failover, the new coordinator communicates briefly with the other site to determine
which was the highest message relayed by it. It then forwards buffered messages with lower numbers and removes the
remaining messages in the buffer. During this replay, message relaying is suspended.

Therefore, a relay has to handle 3 types of messages from the global (TCP) cluster:
 (1) Regular multicast messages
 (2) A message asking for the highest sequence number received from another relay, and the response to this
 (3) A message stating that the other side will go down gracefully (no need to replay buffered messages)


Example walkthrough
-------------------

Multicasting a message:

- C (in the NYC cluster, with coordinator A) multicasts a message
- A, B and C receive the multicast
- A is the relay. The byte buffer is extracted and a new message M is created. M's source is C, the dest is null
  (= send to all). Note that the original headers are *not* sent with M. If this is needed, we need to revisit.
- A then wraps M into a message sent from X to Y
- X receives M, drops it (because it is the sender, determined by the header).
- Y receives M, and unwraps it.
- Y replaces the sender (C) with a ProxyUUID(D,C) (D is the sender and C the origial sender), adds a RelayHeader and
  sends it down its local cluster
- D, E and F receive M and deliver it
- D does not relay M because M has a header

Sending a unicast reply:

- When F receives the multicast message M, it sends a unicast reply message R
- R.dest=ProxyUUID(D,C) and R.src=F
- RELAY.down() sees that R.dest is a ProxyUUID and therefore forwards R to the current relay (which is D)
- RELAY.up() in D sees that the destination is a ProxyUUID and relays the message, via Y to X
- D sets the destination of R to C, wraps the message and sends it to X (via the TCP cluster)
- A receives R (from X) and replaces R.src with a ProxyUUID(C,F)
- A puts R on the local channel where it is sent to C


Implementation
--------------

Becoming coordinator:
- Join TCP channel
- Register receiver

Ceasing to be coordinator:
- Leave TCP channel


RELAY.up(msg):
- If RelayHeader present:  // coord
  - If FORWARD && coordinator: forward(msg.buf); return
  - If DISSEMINATE: pass up and return
  - If VIEW: // see below
  - Return
- Else:
  - If multicast message && coordinator:
    - Copy msg to M (don't copy headers)
    - Serialize M into a buffer buf
    - forward(buf)
  - Pass up // unicast or multicast messages


RELAY.down(msg):
- If msg.dest is not a ProxyUUID: pass down, return
- forwardToCoord(msg)
- Return // don't pass down !


Receive message M from TCP channel:
- Switch RelayHeader:
  - Case FORWARD:
    - If sender = self: discard
    - Else: deserialize M.buf into message M2 and putOnLocalCluster(M2)
  - Case VIEW:  // see below


forward(buf): // buf is the serialized message to be forwarded 
- Create a message M with M.buf=buf, M.dst=null
- Add RelayHeader.FORWARD to M
- Put M onto the TCP channel


forwardToCoord(msg):
- Copy msg to M (don't copy headers)
- Set M.dst=msg.dst.original, M.src=local_addr
- Serialize M into a buffer buf
- Create message M2 (M2.buf=buf)
- Add RelayHeader.FORWARD to M2
- Send M2 to the current relay (coordinator)

putOnLocalCluster(M):
- Set M.src=ProxyUUID(local_addr,M.src)
- Add a RelayHeader.DISSEMINATE to M
- Put M on the local channel


View changes
------------

Local view changed:
- Set local view
- If coordinator:
  - Broadcast remote and global view to local cluster
  - Send remote view to remote cluster
- Every node (on reception):
  - Update(RV, GV)

Remote view (RV) changed:
- Broadcast remote and global view to local cluster
- Every node (on reception):
  - Update(RV, GV)


Update(RV,GV):
- Update remote view from RV if needed
- Install GV:
  - If GV != current global view: set current view = GV and viewAccepted(GV)


Bridge view changed:
- If coordinator: send local view to remote
- If remote coordinator ('creator of remote view') crashed:
  - Generate empty remote view V, generate global view and send RV and GV to local cluster





Issues:
- Do we copy the headers of a message M when M is relayed ? If not, an app won't be able to add their own headers
- Should we pass logical name information between the clusters ? Or should this be part of ProxyAddress ?

Todo:
#3 Handling temp coordinator outage - how do we prevent message loss ?
#4 State transfer - replication across clusters, to bootstrap initial coords in a local cluster