1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
|
Multiplexing functionality
===========================
Author: Bela Ban
JIRA: http://jira.jboss.com/jira/browse/JGRP-119, http://jira.jboss.com/jira/browse/JGRP-112
Overview
--------
In JBoss we have multiple JGroups channels, one for each application (e.g. JBossCache, ClusterPartition etc).
The goal of the Multiplexer is to combine all stacks with the *same* configuration into one, and have multiple
apps on top of that same channel.
To do this we have to introduce multiplexing and demultiplexing functionality, ie. each app will have to have
a unique application ID (a string), and when sending a message, the message has to be tagged with that ID. When
receiving a message, it will be dispatched to the right app based on the ID attached to the message.
We require special handling for VIEW and SUSPECT messages: those need to be dispatched to *all* apps.
State transfer also needs to be handled specially, here we probably have to use thread locals, or change the API (TBD).
When deployed into JBoss, the Multiplexer will be exposed as an MBean, and all apps that depend on it will be deployed
with dependency injection on the Multiplexer. Of course, the old configuration will still be supported.
The config of the Multiplexer is done via a config file, which lists a number of stacks, each keyed by a name, e.g.
"udp", "tcp", "tcp-nio" etc. See ./conf/stacks.xml for an example. An app is configured with the name of a stack, e.g.
"udp", and a reference to the Multiplexer MBean. It will get a proxy channel through which all of its communication
will take place. The proxy channel (MuxChannel) will mux/demux messages to the real JGroups channel.
The advantage of the Multiplexer is that we can reduce N channels into M where M < N. This means fewer threads, therefore
fewer context switches, less memory consumption and easier configuration and better support.
Design
------
The Multiplexer is implemented by extending the JChannelFactory class. When an app wants to fetch a channel, it calls
Channel createChannel(String stack_name, String id) on the factory. The arguments are the stack
name (e.g. "udp") and the application ID.
The factory then returns a MuxChannel (a subclass of Channel). This channel is the handle for the application to
send and receive messages. There can be multiple MuxChannel instances per JChannel, but each application has only
1 MuxChannel. The application can choose between the pull or push style of receiving messages;
it can call Channel.receive() to pull messages out of the channel, or register as a Receiver, in which case messages
are pushed to the application when received.
All calls made against the MuxChannel are redirected to the factory for multiplexing and de-multiplexing.
Each JChannel has a Multiplexer class sitting on top of it (an UpHandler impl). This instance maintains the
mappings between application IDs and MuxChannels and, upon receiving a message from the JChannel, looks up
the correct MuxChannel according to the application ID stripped from the message header, and dispatches the message
to that MuxChannel.
TBD: we may buffer messages internally in the Multiplexer before dispatching them into the MuxChannel, should be
enabled or disabled via a configuration option (or programmatically). This may be desired if the application registers
as a Receiver with the MuxChannel; a receive() callback taking some time to be processed would block all other
applications from receiving messages until completion. The configuration should be per channel.
The buffering should be handled in the MuxChannel; MuxChannel.up() would then simply put the event into a queue,
or - if a Receiver is configured - directly convert the event into a callback for Receiver.
The subsequent sections describe the various use cases, e.g. life cycle for channel creation, message sending and
reception, channel close/disconnect, view reception and state transfer.
Configuration
-------------
Configuration is via XML, example is ./conf/stacks.xml. The file defines a number of stacks, each is given a unique
name. This name is used in JChannelFactory.createChannel() as stack name.
Structures
----------
In JChannelFactory:
- channels: Map<String, Entry>. A hashmap of channels and their associated MuxChannels. The keys are the channel
stack names, e.g. "udp", or "tcp". The values are ChannelEntries, which contain both the JChannel and associated Multiplexer.
The Multiplexer maintains a hashtable of application IDs and MuxChannels (Map<String,MuxChannel>), see Multiplexer below.
<obsolete>
We need to keep the stack names *and* channel names, because multiple applications residing on top of the same
channel stack could be connected to different groups, e.g.
Appl-1 uses the "udp" stack and connects to group "group-A", whereas appl-2 uses the same "udp" stack, but connects
to "group-B". Since a channel can be connected to only *one* group at a time, this requires 2 different channel
instances.
Only if 2 applications use the same channel stack *and* group can we share the same channel. We do provide this
flexibility, however, we expect that all applications sharing the same channel stack will use the same group name !
</obsolete>
This is not needed, as "udp" as stack name implies that all applications which use "udp" will use the *same* stack and
join the *same* group. If an application wants to use the same stack, but join a different group, it has to create
its own JChannel and cannot use the Multiplexer mechanism.
In Multiplexer:
- apps: Map<String, MuxChannel>. A hashmap of application IDs as keys and MuxChannels as values. Used for dispatching
incoming messages. The Multiplexer implements UpHandler and registers with the associated JChannel (there can only
be 1 Multiplexer per JChannel). When up() is called with a message, the header of the message is removed and the
MuxChannel corresponding to the header's application ID is retrieved from the map, and MuxChannel.up() is called
with the message.
JChannelFactory initialization
------------------------------
JChannelFactory has a reference to the configuration which is set either on creation (e.g. via "MultiplexerConfig"
JMX attribute) when used as an MBean, or via programmatically calling one of the config() methods.
The JChannelFactory MBean has to support the JMX life cycle methods (create(), start(), stop(), destroy()).
On redeployment, a JChannelFactory re-reads its configuration file. The method to read the config file (config()) can
also be called via JMX to force a re-read without redeploying the factory.
When a JChannelFactory is destroyed or redeployed, all of its dependent MBeans (usually all applications) will be
redeployed/destroyed as well. We recommend dependency definition/injection between application MBeans and their
underlying shared JChannelFactory MBean.
MuxChannel creation
-------------------
When JChannelFactory.createChannel() is called, the following happens:
- If the JChannel already exists:
- Create a MuxChannel (only if the application ID doesn't yet exists), referencing the JChannel,
add it to the Multiplexer's apps hashmap, and return it to the application
- Else:
- Create a new JChannel, add it to the channels hashmap
- Create a new MuxChannel, referencing the JChannel, add it to the Multiplexer's apps hashmap, and return it
Note that a newly created JChannel is not connected yet.
MuxChannel connect
------------------
- Calling JChannel.connect(stack_name). If the JChannel is already connected, this is a no-op. The name of the group
is always the name of the stack, e.g. "udp" or "tcp"
Sending a message on MuxChannel
-------------------------------
- The MuxChannel adds a MuxHeader, with the application ID and then calls JChannel.send()/down()
Receiving an event on Multiplexer (up() method)
-----------------------------------------------
- The Multiplexer removes the MuxHeader (if present)
- For SUSPECT and VIEW: pass it on to all MuxChannels.
- For messages and state transfer events: fetch the MuxChannel associated with the MuxHeader's application ID and
pass the message to it
- A MuxChannel can be configured to use a queue when the pull model is chosen (no Receiver present). In that case,
up() simply places the event into the queue, and MuxChannel.receive() dequeues the event. The queue can be bounded.
If a Receiver is set, we can either directly invoke the callback, or use a QueuedExecutor to queue all events and
invoke them in FIFO order on the Receiver. Note that we *cannot* use a PooledExecutor since the delivery order might
be destroyed by a thread pool.
MuxChannel disconnect/close
---------------------------
- Remove the MuxChannel from the Multiplexer's apps hashmap
- If the apps hashmap is empty:
- Disconnect and close the JChannel
- Remove the Multiplexer from the channels hashmap in Multiplexer
State transfer for *all* applications
-------------------------------------
- If we transfer the state for each application separately, we have to run the stop-the-world (FLUSH) protocol
N times, once for each application
- This could be improved by fetching *all* states after the last application has been started
- It could be implemented by adding a getState(Address coord, long timeout, Object state_id) method to Channel:
(same as partial state transfer)
- the state transfer event would have the state_id as parameter
- a new method Multiplexer.getAllApplicationStates() would call JChannel.getState() with a state_id of "ALL"
- when the Multiplexer receives the state event, and the state_id is "ALL", it asks all the registered applications
for their state, and returns all appl states as one state. That state would contains all substates, prefixed
with the substate ID (which is the application ID)
- on the receiving Multiplexer, we demultiplex the state into individual appl states and call setState() on the
application corresponding to the substate ID
- this has the advantage that the digest would have to be transferred and set only *once*, not once per application
- Actually, transferring all substates as *one* single combined state is a bad idea, this might become too big. For example.
if we have 2 substates A and B, each 500MB, and assuming we have to generate a byte[] array for the serialized state, which
is 500MB as well (simplified assumption), then if we combine the state, we have 500MB for A and 500MB for B, for a total
of 1GB. This 1GB byte[] buffer can only be discarded once we have fragmented it and placed all fragments on the wire.
If we send the substates individually, then we only have to generate 500MB at a time.
Preventing multiple FLUSH phases
--------------------------------
- When we have multiple applications on top of the same channel, each application will call MuxChannel.getState() after
connecting. Let's say we have 3 applications, which all require state transfer, then we will have 3 FLUSH phases when
starting. This will slow down the startup of JBoss
- The goal is to reduce this to just a single FLUSH phase (the use case is JBoss)
- Let's assume there are 4 applications that reside on the same JChannel. 3 of them require state
- All 4 applications are MBeans which have a dependency to the JChannelFactory MBean
- Each application which requires state has to do the following:
- On create(): register with the JChannelFactory for state transfer. This basically tells the factory that application
X is interested in state transfer
- An application must *not* call MuxChannel.getState() after it connected, but should rely on the JChannelFactory
to push the state to it when all applications that require state have connected. As an alternative, we could
simply ignore MuxChannel.getState() if the state push model is chosen.
- When all applications that registered for state transfer have connected (we can find out because all
MuxChannel.connect() calls go through the JChannelFactory), the factory tells the Multiplexer to fetch *all*
substates for that JChannel. This is essentially the same as fetching the entire state, but the returned state
contains a list of <application-id/state> pairs, so the Multiplexer will call setState() on each application
NOTE: this design doesn't work in JBoss as the dependency management does *not* guarantee that all create() methods
will be called before all start() methods. Currently, this is disabled in JBoss
Service views with the Multiplexer
==================================
Issue
-----
When switching to a services-oriented architecture, where multiple services can run on the same JGroups channel, then
we need to adapt the notion of 'view' slightly. The problem is that if not all services are deployed on all nodes in
a cluster, certain services might run into problems (JIRA issue: http://jira.jboss.com/jira/browse/JGRP-247). Consider
a cluster {A,B,C,D}. If service S1 is deployed on A and C only, then - when that service wants to make a cluster-wide
synchronous call - it cannot wait for responses from B and D because S1 is not deployed on those 2 nodes.
Solution
--------
S1's view should therefore be {A,C}, rather than {A,B,C,D}. This is called a *service view* (as compared to a cluster view).
A service view is always derived from the current cluster view, except that the hosts not running the given service
will be excluded from the service view, so the service view acts like a filter for the cluster view.
A service view is always applicable only to a given service, e.g. service S1 might see a different service view than S2.
Design of Multiplexer
---------------------
Data structures
---------------
- Every Multiplexer maintains a Map<Service,List<Host>>, where it keeps track of which hosts are currently running a
service. This is called the *service state*
New JChannel is connected (JChannelFactory.connect())
-----------------------------------------------------
- Gets service state from coordinator, blocks until state is available (doesn't block if coordinator)
- Loops until state has been received, or no members available
New service S is started (JChannelFactory.connect())
---------------------------------------------------------
- Multicast serviceUp(S,H). The message includes the host H (JGroups address) on which the service was started
On Multiplexer.disconnect()/close()/shutdown()
----------------------------------------------
- Multicast serviceDown(S,H) for the service whose MuxChannel called disconnect()/close()/shutdown()
On Multiplexer.closeAll()
-------------------------
- Multicast serviceDown(S,H) for all services currently registered with the Multiplexer
On reception of serviceUp(S,H)
------------------------------
- Create new service view SV (based on view V and service state)
- If service S is affected (hosts joined or left): call viewAccepted(SV) on S
On reception of serviceDown(S,H)
--------------------------------
- Create new service view SV (based on view V and service state)
- If service S is affected (hosts joined or left): call viewAccepted(SV) on S
On view V
---------
- If coordinator:
- for each host H which left:
- multicast serviceDown(S,H) for each service S that was running on H
- Else:
- if viewChange() has not yet been called: call viewChange() in all services which have hosts in V
On MergeView
------------
- If coordinator:
- for each new host H:
- multicast serviceUp(S,H) for all services that are running on S
|