1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
|
#<cldoc:Design Notes>
Design Notes
============
Automatic master promotion
--------------------------
The standard set of yrmcds consists of a master server and two slave
servers. A set of yrmcds will be prepared for each server farm.
To elect the master server automatically, keepalived or similar
virtual IP address allocation tool is used. The server owning the
virtual IP address becomes the master server. The others are slaves.
Slaves connect to the master via the virtual IP address. The master
will *not* connect to slaves. A new slave will receive all data from
the master server once the connection has been established.
For some reason, if the master suddenly notices it is no longer
the master (i.e. the server has lost the virtual IP), it kills itself
to reborn as a new slave (restarted by upstart).
If a slave gets disconnected from the master, it waits for some
seconds to see if the virtual IP address is assigned. If assigned,
the slave promotes to the new master. If not, the slave forgets all
replicated data then try to connect to the new master. The new master
should close the established connection to the old master, if any.
Replication protocol
--------------------
The replication protocol is the same as [the binary protocol of memcached][1].
Specifically, slaves receive only "SetQ" and "DeleteQ" requests.
This allows any memcached compatible programs can become yrmcds slaves
with slight modifications.
The number of connections
-------------------------
The master will have tens of thousands of client connections from
every PHP-fpm processes in a server farm. A thread per connection
model is therefore not very efficient.
This encourages us to adopt asynchronous sockets and [the reactor pattern][2].
The reactor and workers
-----------------------
So we adopt [the reactor pattern with a thread-pool][3] for the best
performance.
The reactor is implemented with asynchronous sockets and [epoll][] in
edge-triggered mode. Basically, with edge-triggered mode, a socket need
to be read until [recv][] returns an `EWOULDBLOCK` error. If, for some
reason like fairness, not all available data are received from a socket,
the reactor need to remember that socket to manually handle it later.
To remember such sockets, the reactor manages a list of _readable_ socket
internally.
When the reactor detects some data are available for a socket, it
dispatches the socket to an **idle** _worker_ thread. The worker then
receives and processes the data. While a worker is handling a socket,
that socket must not be passed to another worker to keep the protocol
semantics.
To implement these, following properties are required:
* Every worker thread has an **idle** flag.
* Every socket has a **busy** (in-use) flag.
Both flags are set by the reactor thread, and cleared by a worker thread.
If the reactor finds a socket is readable but the socket is busy or there
are no idle workers, the socket is remembered in the previously stated
readable socket list. This keeps the reactor thread from being blocked.
### Efficient allocation of readable lists
Readable lists used in the reactor can be implemented with very rare
memory allocations by preparing two lists that pre-allocate a certain
amount of memory. Swap the list with another for each epoll loop.
### Use of eventfd to dispatch jobs
To notify an idle worker a new job (or to wait a new job), we use
Linux-specific [eventfd][] rather than pipes or pthread's condition
variables. eventfd is lighter than pipe. Unlike condition variables,
eventfd remembers events so that workers never fail to catch events.
The hash
--------
The biggest and the most important data structure in yrmcds is clearly
the hash map of objects. To reduce contention between worker threads,
the hash does not use a single lock to protect the data. Instead, each
bucket has a lock to protect objects in the bucket and the bucket itself.
One drawback is that resizing the hash is almost impossible. In the real
implementation, yrmcds statically allocates a large number of buckets.
Housekeeping
------------
yrmcds is essentially an object _cache_ system. Objects whose life-time
have been expired need to be removed from the cache. If the cache is
full of objects, least-recently-used (LRU) objects should be removed.
For such housekeeping, a dedicated thread called **GC thread** is used.
It scans the whole hash to detect and remove expired objects.
To implement LRU cache, objects have a counter that increments at every
GC. The counter is reset to zero when a worker accesses the object.
When the cache is full, GC thread removes objects whose LRU counter value
is high.
A GC thread is started by the reactor thread only when there is no
running GC thread. The reactor thread passes the current list of slaves
to the GC thread in order to replicate object removals. If a slave
is added while a GC thread is running, that slave may fail to remove
some objects, which is *not* a big problem.
Replication
-----------
When a slave connect to the master, it need to receive all data in the
master. yrmcds reuses the GC thread for this initial replication because
the GC thread scans the entire hash.
While GC is running, the reactor and worker threads also keep running.
The GC thread and worker threads therefore need to be synchronized when
sending data of the same object. This can be achieved by keeping the
lock of a hash bucket acquired while an object is being sent.
Sockets for replication
-----------------------
Sockets connected to slaves are used to send a lot of replication data.
Further, worker threads serving client requests need to access replication
sockets to send updates to slaves.
Another difference from client sockets is the possible number of sockets.
The number of replication sockets can be limited to a fairly small value,
say 5.
Worker threads need to have references to all replication sockets.
This can be done without any locks; the reactor passes a copy of the
references along with the socket and received data. However, this can
introduce a race between workers and the GC thread.
If a new GC thread starts the initial replication process for a new slave
while some worker threads having old copies of replication sockets are
executing its job, the workers would fail to send some objects to the new
slave.
To resolve this race, we need to guarantee that all worker threads
are idle or have the latest copies of replication sockets. This can be
implemented as:
1. Accept a new replication socket and add it to the list of
replication sockets.
At this point, the GC thread must not begin the initial replication
for this new slave.
2. The reactor thread puts a synchronization request.
The request will be satisfied once the reactor thread observes every
worker thread gets idle.
3. The reactor thread adds the new replication socket to the confirmed
list.
4. At the next GC, the reactor thread requests initial replication for
sockets stored in the confirmed list.
Sending data
------------
As yrmcds need to handle a lot of clients, having every socket has a
large send buffer is not a good idea. Sockets connected to clients
therefore can have temporarily allocated memory for pending data only.
On the other hand, sockets connected to slaves are few, hence they can
statically allocate large memory for pending send data.
Anyways, a sending thread need to wait for the reactor to send pending
data and clear the buffer when there is not enough room.
To achieve best TCP performance, we use [`TCP_CORK`][4] option although
it is Linux-specific.
With `TCP_CORK` turned on, the TCP send buffer need to be flushed explicitly.
The buffer should be flushed only when all data are successfully sent without
seeing `EWOULDBLOCK`.
To put a complete response data with one call, the socket object should
provide an API similar to [writev][].
Reclamation strategy of shared sockets
--------------------------------------
### Sockets are shared
Sockets connected to clients may be shared by the reactor thread and
a worker thread. Sockets connected to slaves may be shared by the
reactor thread, worker threads, and the initial replication thread.
### Socket can be closed only by the reactor thread
This is because the reactor thread manages a mapping between file
descriptors and resource objects including sockets. If another thread
closes a file descriptor of a resource, the same file descriptor number
may be reused by the operating system, which would break the mapping.
### Strategy to reclaim shared sockets
1. Sockets can be invalidated by any thread.
Invalidated sockets refuse further access to them.
2. Inform the reactor thread of invalidated sockets.
3. The reactor thread
1. removes invalidated sockets from the internal mapping,
2. closes the file descriptor, then
3. adds them a list of pending destruction resources.
At some point when there is no running GC thread, the reactor thread can
put a new synchronization request. To optimize memory allocations,
the reactor thread will not put such a request if there are any pending
requests. This way, the reactor can save the current pending destruction
list by swapping contents with a pre-allocated save list.
Once the reactor observes idle state of all worker threads, resources
in the save list can be destructed safely.
No blocking job queues, no barrier synchronization
--------------------------------------------------
To avoid excessive contention between the reactor and worker threads,
a blocking job queue is not used between them. Instead, the reactor
thread loops over atomic flags for each worker thread to identify if
a worker is idle or busy.
For the same reason, we do not use barrier synchronization between
the reactor and worker threads. Instead, the reactor thread checks
every worker thread is idle or becomes idle when there are any
synchronization request. Once it confirms all worker threads gets
idle at least once after a synchronization request, it starts a job
associated with the synchronization request.
Slaves are essentially single-threaded
--------------------------------------
Slaves do accept connection requests from memcache clients, but they
close the connections immediately. This way, slaves can essentially
be single-threaded. As long as being a slave, the internal hash need
not be guarded by locks. For this reason, the internal hash should
provide lock-free version of methods.
Joining threads
---------------
All threads are started by the reactor thread. The reactor thread is
therefore responsible for those threads to be joined.
All threads except for the reactor thread may block on either a
condition variable of a socket or the [eventfd][] file descriptor.
For sockets, the reactor thread signals the condition variable to unblock
threads when it closes the socket. For eventfd descriptors, the reactor
thread simply writes to it.
When the program exits, the reactor thread executes the following
termination process:
1. Sets termination flags of all worker threads.
2. Writes to eventfd descriptors to unblock worker threads.
3. invalidates all resources to unblock worker threads.
4. joins with the GC thread, if any, then
5. destructs resources.
Large data
----------
Too large data should be stored in a temporary file.
The temporary file shall soon be unlinked for automatic removal
upon the program exit.
Summary
-------
### Threads
In the master,
* the reactor thread,
* worker threads to process client requests, and
* a GC thread.
Slaves run the reactor (main) thread only.
### Sockets
In the master,
* a listening socket for clients,
* a listening socket for slaves,
* sockets connected to clients, and
* sockets connected to slaves.
In a slave,
* a listening socket for clients, and
* a socket connected to the master.
### Shared data structures
* Connected sockets.
* The object hash map.
* Close request queue in the reactor.
* Other less significant resources such as statistics counters.
### Locks and ordering
* The reactor thread
* acquires a lock of a socket to send data.
* acquires a spinlock of socket close request queue.
* A worker thread
* acquires a lock of a hash bucket.
* acquires a lock of a socket.
* acquires a lock of a socket to send data independent of cached objects.
* acquires a spinlock of the reactor to put socket close request.
* A GC thread
* acquires a lock of a hash bucket.
* acquires locks of sockets connected to slaves.
* acquires a spinlock of the reactor to put socket close request.
[1]: https://code.google.com/p/memcached/wiki/BinaryProtocolRevamped
[2]: http://en.wikipedia.org/wiki/Reactor_pattern
[3]: http://stackoverflow.com/questions/14317992/thread-per-connection-vs-reactor-pattern-with-a-thread-pool
[4]: http://manpages.ubuntu.com/manpages/precise/en/man7/tcp.7.html
[epoll]: http://manpages.ubuntu.com/manpages/precise/en/man7/epoll.7.html
[eventfd]: http://manpages.ubuntu.com/manpages/precise/en/man2/eventfd.2.html
[recv]: http://manpages.ubuntu.com/manpages/precise/en/man2/recv.2.html
[signalfd]: http://manpages.ubuntu.com/manpages/precise/en/man2/signalfd.2.html
[writev]: http://manpages.ubuntu.com/manpages/precise/en/man2/writev.2.html
|