File: ConcurrentStack.txt

package info (click to toggle)
libjgroups-java 2.12.2.Final-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,712 kB
  • sloc: java: 109,098; xml: 9,423; sh: 149; makefile: 2
file content (165 lines) | stat: -rw-r--r-- 5,165 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165


Concurrent stack
================

Author: Bela Ban
JIRAs:
http://jira.jboss.com/jira/browse/JGRP-180 (harden stack)
http://jira.jboss.com/jira/browse/JGRP-181 (concurrent stack)
http://jira.jboss.com/jira/browse/JGRP-205 (out-of-band messages)


Concurrent stack
----------------

We will get rid of all queues in the protocols, and their associated threads. This is the same as setting all
down_thread and up_thread vars to false, but now it is the default.

On the receiver's side there will be 2 thread pools (Executors): one for regular messages and one for OOB messages.
Whenever an OOB message is encountered, the receiver thread (which called receive()) will call Executor.execute() on the
OOB threadpool with the message as argument, and if it is a regular message it will call Executor.execute() on the
regular thread pool.

The type of Executor is chosen by the user (through configuration): DirectExecutor means we will *not* use a separate
thread, PooledExecutor uses a real thread pool plus a queue in front.

Both thread pools have their own instance of SchedulingPolicy (whose main method is handle(msg)). The purpose of the
SchedulingPolicy is to schedule handling of messages with respect to others. For example, we could have a sender-based
policy, which uses queues and places all messages from the same sender into the same queue. Combined with a message
priority, after placing the message into the correct queue, the processing thread could then pick the message with the
highest priority first. Other policies: longest queue first, round robin, random, FIFO etc.



Out-of-band messages (OOB)
--------------------------

Could be the same as priorities, e.g. prio=0 is always OOB


Adding priorities to messages
-----------------------------


Reliable events
---------------




Implementation
==============


Message
-------
We add a flags byte to a message. Values are
- OOB (if not set, we have a regular message)
- HIGH_PRIO (if not set, the messsge has a default priority)
- (LOW_PRIO): TBD


Unmarshaller
------------
Executor (DirectExecutor or PooledExecutor (default)) which
- unmarshals byte buffers into messages
- performs the version check and
- discards messages from different groups

When done, the Message is passed either to the OOB or regular thread pool for passing up the stack. The decision is
made based on whether or not the OOB flag is set in the message.


Buffer pool
-----------
Unicast and multicast thread have access to a buffer pool with a fixed size. Before every receive(),
they get a buffer from the pool (blocking until they get a free one). The buffer is locked and passed into the
queue of the first thread pool. (Ideally, there are as many buffers as max threads in that pool).
The alloted thread from the thread pool then unmarshalls the buffer into a Message and returns the buffer to the
buffer pool, releasing the lock so it can be used for a new packet.

Advantage: we don't need to *copy* the packet (unlike in the old solution where we copied the byte buffer into the
incoming queue)


Todos
------



- Unmarshaller should reuse a pool of InputStreams, not create a new one for each unmarshalling task

- In TP.receive() we unconditionally *copy* the buffer. This doesn't need to be done if
  - we use a BufferPool or

- Remove TP.use_concurrent_stack

- Thread naming in thread pool: better naming (e.g. for Unmarshalling threads sender of message that is being
  unmarshalled). Don't use increasing numbers for threads (?)
  - Append (channel="<GroupName>") to each pool thread's name

- Dynamic resizing of thread pools, e.g. according to number of nodes in a cluster ?

- Enable setting the thread pools programmatically

- Remove configurability of rejection policy ?





Done
====

- Thread pool shut-downs: correct ? Should we use something else rather than shutdownNow() ?
  - Use of shutdown() rather than shutdownNow() ?
  - We continue using shutdownNow()

- Handle dispatching of messages from Unmarshaller to OOB or regular thread pool based on flags. Currently not done;
  we dispatch all messages to the regular thread pool.
  - Add flags to Message
  - Review which messages should be flagged as OOB

- Switch all queues in thread pools from number of elements to number of bytes
  Created JIRA issue

- How are OOB messages handled by UNICAST and NAKACK ? Are these 2 effectively ignoring OOB flags ? Does it make
  sense to use OOB messages in protocols *above* UNICAST or NAKACK ? Exampe: GMS sending VIEW_ACK messages.
  Logic in UNICAST:
  - Receive OOB message, place in receiver table
  - Pass up the OOB message *immediately*, regardless of whether it is in sequence !
  - When message is finally removed, do *NOT* pass it up when marked as OOB !
  JGRP-379 (NAKACK) / JGRP-377 (UNICAST)

- Check UNICAST and NAKACK for correct synchronization: block messages from the *same* sender, other messages can be
  processed in parallel [JGRP-378]

- Expose JMX information for all 3 thread pools
- Allow resizing of thread pools (and other changes) via JMX
- Thread pool queues are always showing 0 size although they *should* be filled: CORRECT