1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
Concurrent stack
================
Author: Bela Ban
JIRAs:
http://jira.jboss.com/jira/browse/JGRP-180 (harden stack)
http://jira.jboss.com/jira/browse/JGRP-181 (concurrent stack)
http://jira.jboss.com/jira/browse/JGRP-205 (out-of-band messages)
Concurrent stack
----------------
We will get rid of all queues in the protocols, and their associated threads. This is the same as setting all
down_thread and up_thread vars to false, but now it is the default.
On the receiver's side there will be 2 thread pools (Executors): one for regular messages and one for OOB messages.
Whenever an OOB message is encountered, the receiver thread (which called receive()) will call Executor.execute() on the
OOB threadpool with the message as argument, and if it is a regular message it will call Executor.execute() on the
regular thread pool.
The type of Executor is chosen by the user (through configuration): DirectExecutor means we will *not* use a separate
thread, PooledExecutor uses a real thread pool plus a queue in front.
Both thread pools have their own instance of SchedulingPolicy (whose main method is handle(msg)). The purpose of the
SchedulingPolicy is to schedule handling of messages with respect to others. For example, we could have a sender-based
policy, which uses queues and places all messages from the same sender into the same queue. Combined with a message
priority, after placing the message into the correct queue, the processing thread could then pick the message with the
highest priority first. Other policies: longest queue first, round robin, random, FIFO etc.
Out-of-band messages (OOB)
--------------------------
Could be the same as priorities, e.g. prio=0 is always OOB
Adding priorities to messages
-----------------------------
Reliable events
---------------
Implementation
==============
Message
-------
We add a flags byte to a message. Values are
- OOB (if not set, we have a regular message)
- HIGH_PRIO (if not set, the messsge has a default priority)
- (LOW_PRIO): TBD
Unmarshaller
------------
Executor (DirectExecutor or PooledExecutor (default)) which
- unmarshals byte buffers into messages
- performs the version check and
- discards messages from different groups
When done, the Message is passed either to the OOB or regular thread pool for passing up the stack. The decision is
made based on whether or not the OOB flag is set in the message.
Buffer pool
-----------
Unicast and multicast thread have access to a buffer pool with a fixed size. Before every receive(),
they get a buffer from the pool (blocking until they get a free one). The buffer is locked and passed into the
queue of the first thread pool. (Ideally, there are as many buffers as max threads in that pool).
The alloted thread from the thread pool then unmarshalls the buffer into a Message and returns the buffer to the
buffer pool, releasing the lock so it can be used for a new packet.
Advantage: we don't need to *copy* the packet (unlike in the old solution where we copied the byte buffer into the
incoming queue)
Todos
------
- Unmarshaller should reuse a pool of InputStreams, not create a new one for each unmarshalling task
- In TP.receive() we unconditionally *copy* the buffer. This doesn't need to be done if
- we use a BufferPool or
- Remove TP.use_concurrent_stack
- Thread naming in thread pool: better naming (e.g. for Unmarshalling threads sender of message that is being
unmarshalled). Don't use increasing numbers for threads (?)
- Append (channel="<GroupName>") to each pool thread's name
- Dynamic resizing of thread pools, e.g. according to number of nodes in a cluster ?
- Enable setting the thread pools programmatically
- Remove configurability of rejection policy ?
Done
====
- Thread pool shut-downs: correct ? Should we use something else rather than shutdownNow() ?
- Use of shutdown() rather than shutdownNow() ?
- We continue using shutdownNow()
- Handle dispatching of messages from Unmarshaller to OOB or regular thread pool based on flags. Currently not done;
we dispatch all messages to the regular thread pool.
- Add flags to Message
- Review which messages should be flagged as OOB
- Switch all queues in thread pools from number of elements to number of bytes
Created JIRA issue
- How are OOB messages handled by UNICAST and NAKACK ? Are these 2 effectively ignoring OOB flags ? Does it make
sense to use OOB messages in protocols *above* UNICAST or NAKACK ? Exampe: GMS sending VIEW_ACK messages.
Logic in UNICAST:
- Receive OOB message, place in receiver table
- Pass up the OOB message *immediately*, regardless of whether it is in sequence !
- When message is finally removed, do *NOT* pass it up when marked as OOB !
JGRP-379 (NAKACK) / JGRP-377 (UNICAST)
- Check UNICAST and NAKACK for correct synchronization: block messages from the *same* sender, other messages can be
processed in parallel [JGRP-378]
- Expose JMX information for all 3 thread pools
- Allow resizing of thread pools (and other changes) via JMX
- Thread pool queues are always showing 0 size although they *should* be filled: CORRECT
|