1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
|
// Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
// SPDX-FileCopyrightText: 2024 Redict Contributors
// SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
//
// SPDX-License-Identifier: BSD-3-Clause
// SPDX-License-Identifier: LGPL-3.0-only
#include "server.h"
/* The tracking table is constituted by a radix tree of keys, each pointing
* to a radix tree of client IDs, used to track the clients that may have
* certain keys in their local, client side, cache.
*
* When a client enables tracking with "CLIENT TRACKING on", each key served to
* the client is remembered in the table mapping the keys to the client IDs.
* Later, when a key is modified, all the clients that may have local copy
* of such key will receive an invalidation message.
*
* Clients will normally take frequently requested objects in memory, removing
* them when invalidation messages are received. */
rax *TrackingTable = NULL;
rax *PrefixTable = NULL;
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
the whole tracking table. This gives
an hint about the total memory we
are using server side for CSC. */
robj *TrackingChannelName;
/* This is the structure that we have as value of the PrefixTable, and
* represents the list of keys modified, and the list of clients that need
* to be notified, for a given prefix. */
typedef struct bcastState {
rax *keys; /* Keys modified in the current event loop cycle. */
rax *clients; /* Clients subscribed to the notification events for this
prefix. */
} bcastState;
/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
* table, so we'll remove the ID reference in a lazy way. Otherwise when a
* client with many entries in the table is removed, it would cost a lot of
* time to do the cleanup. */
void disableTracking(client *c) {
/* If this client is in broadcasting mode, we need to unsubscribe it
* from all the prefixes it is registered to. */
if (c->flags & CLIENT_TRACKING_BCAST) {
raxIterator ri;
raxStart(&ri,c->client_tracking_prefixes);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
void *result;
int found = raxFind(PrefixTable,ri.key,ri.key_len,&result);
serverAssert(found);
bcastState *bs = result;
raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
/* Was it the last client? Remove the prefix from the
* table. */
if (raxSize(bs->clients) == 0) {
raxFree(bs->clients);
raxFree(bs->keys);
zfree(bs);
raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
}
}
raxStop(&ri);
raxFree(c->client_tracking_prefixes);
c->client_tracking_prefixes = NULL;
}
/* Clear flags and adjust the count. */
if (c->flags & CLIENT_TRACKING) {
server.tracking_clients--;
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING|
CLIENT_TRACKING_NOLOOP);
}
}
static int stringCheckPrefix(unsigned char *s1, size_t s1_len, unsigned char *s2, size_t s2_len) {
size_t min_length = s1_len < s2_len ? s1_len : s2_len;
return memcmp(s1,s2,min_length) == 0;
}
/* Check if any of the provided prefixes collide with one another or
* with an existing prefix for the client. A collision is defined as two
* prefixes that will emit an invalidation for the same key. If no prefix
* collision is found, 1 is return, otherwise 0 is returned and the client
* has an error emitted describing the error. */
int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) {
for (size_t i = 0; i < numprefix; i++) {
/* Check input list has no overlap with existing prefixes. */
if (c->client_tracking_prefixes) {
raxIterator ri;
raxStart(&ri,c->client_tracking_prefixes);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (stringCheckPrefix(ri.key,ri.key_len,
prefixes[i]->ptr,sdslen(prefixes[i]->ptr)))
{
sds collision = sdsnewlen(ri.key,ri.key_len);
addReplyErrorFormat(c,
"Prefix '%s' overlaps with an existing prefix '%s'. "
"Prefixes for a single client must not overlap.",
(unsigned char *)prefixes[i]->ptr,
(unsigned char *)collision);
sdsfree(collision);
raxStop(&ri);
return 0;
}
}
raxStop(&ri);
}
/* Check input has no overlap with itself. */
for (size_t j = i + 1; j < numprefix; j++) {
if (stringCheckPrefix(prefixes[i]->ptr,sdslen(prefixes[i]->ptr),
prefixes[j]->ptr,sdslen(prefixes[j]->ptr)))
{
addReplyErrorFormat(c,
"Prefix '%s' overlaps with another provided prefix '%s'. "
"Prefixes for a single client must not overlap.",
(unsigned char *)prefixes[i]->ptr,
(unsigned char *)prefixes[j]->ptr);
return i;
}
}
}
return 1;
}
/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
* already registered for the specified prefix, no operation is performed. */
void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
void *result;
bcastState *bs;
/* If this is the first client subscribing to such prefix, create
* the prefix in the table. */
if (!raxFind(PrefixTable,(unsigned char*)prefix,plen,&result)) {
bs = zmalloc(sizeof(*bs));
bs->keys = raxNew();
bs->clients = raxNew();
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
} else {
bs = result;
}
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
if (c->client_tracking_prefixes == NULL)
c->client_tracking_prefixes = raxNew();
raxInsert(c->client_tracking_prefixes,
(unsigned char*)prefix,plen,NULL,NULL);
}
}
/* Enable the tracking state for the client 'c', and as a side effect allocates
* the tracking table if needed. If the 'redirect_to' argument is non zero, the
* invalidation messages for this client will be sent to the client ID
* specified by the 'redirect_to' argument. Note that if such client will
* eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT|
CLIENT_TRACKING_NOLOOP);
c->client_tracking_redirection = redirect_to;
/* This may be the first client we ever enable. Create the tracking
* table if it does not exist. */
if (TrackingTable == NULL) {
TrackingTable = raxNew();
PrefixTable = raxNew();
TrackingChannelName = createStringObject("__redict__:invalidate",20);
}
/* For broadcasting, set the list of prefixes in the client. */
if (options & CLIENT_TRACKING_BCAST) {
c->flags |= CLIENT_TRACKING_BCAST;
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
for (size_t j = 0; j < numprefix; j++) {
sds sdsprefix = prefix[j]->ptr;
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
}
}
/* Set the remaining flags that don't need any special handling. */
c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT|
CLIENT_TRACKING_NOLOOP);
}
/* This function is called after the execution of a readonly command in the
* case the client 'c' has keys tracking enabled and the tracking is not
* in BCAST mode. It will populate the tracking invalidation table according
* to the keys the user fetched, so that Redict will know what are the clients
* that should receive an invalidation message with certain groups of keys
* are modified. */
void trackingRememberKeys(client *tracking, client *executing) {
/* Return if we are in optin/out mode and the right CACHING command
* was/wasn't given in order to modify the default behavior. */
uint64_t optin = tracking->flags & CLIENT_TRACKING_OPTIN;
uint64_t optout = tracking->flags & CLIENT_TRACKING_OPTOUT;
uint64_t caching_given = tracking->flags & CLIENT_TRACKING_CACHING;
if ((optin && !caching_given) || (optout && caching_given)) return;
getKeysResult result = GETKEYS_RESULT_INIT;
int numkeys = getKeysFromCommand(executing->cmd,executing->argv,executing->argc,&result);
if (!numkeys) {
getKeysFreeResult(&result);
return;
}
/* Shard channels are treated as special keys for client
* library to rely on `COMMAND` command to discover the node
* to connect to. These channels doesn't need to be tracked. */
if (executing->cmd->flags & CMD_PUBSUB) {
return;
}
keyReference *keys = result.keys;
for(int j = 0; j < numkeys; j++) {
int idx = keys[j].pos;
sds sdskey = executing->argv[idx]->ptr;
void *result;
rax *ids;
if (!raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),&result)) {
ids = raxNew();
int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
sdslen(sdskey),ids, NULL);
serverAssert(inserted == 1);
} else {
ids = result;
}
if (raxTryInsert(ids,(unsigned char*)&tracking->id,sizeof(tracking->id),NULL,NULL))
TrackingTableTotalItems++;
}
getKeysFreeResult(&result);
}
/* Given a key name, this function sends an invalidation message in the
* proper channel (depending on RESP version: PubSub or Push message) and
* to the proper client (in case of redirection), in the context of the
* client 'c' with tracking enabled.
*
* In case the 'proto' argument is non zero, the function will assume that
* 'keyname' points to a buffer of 'keylen' bytes already expressed in the
* form of Redict RESP protocol. This is used for:
* - In BCAST mode, to send an array of invalidated keys to all
* applicable clients
* - Following a flush command, to send a single RESP NULL to indicate
* that all keys are now invalid. */
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) {
c->flags |= CLIENT_TRACKING_BROKEN_REDIR;
/* We need to signal to the original connection that we
* are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
c = redir;
using_redirection = 1;
old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
}
/* Only send such info for clients in RESP version 3 or more. However
* if redirection is active, and the connection we redirect to is
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
* by sending Pub/Sub messages in the __redict__:invalidate channel. */
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
/* We use a static object to speedup things, however we assume
* that addReplyPubsubMessage() will not take a reference. */
addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk);
} else {
/* If are here, the client is not using RESP3, nor is
* redirecting to another client. We can't send anything to
* it since RESP2 does not support push messages in the same
* connection. */
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
/* Send the "value" part, which is the array of keys. */
if (proto) {
addReplyProto(c,keyname,keylen);
} else {
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
}
updateClientMemUsageAndBucket(c);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* This function is called when a key is modified in Redict and in the case
* we have at least one client with the BCAST mode enabled.
* Its goal is to set the key in the right broadcast state if the key
* matches one or more prefixes in the prefix table. Later when we
* return to the event loop, we'll send invalidation messages to the
* clients subscribed to each prefix. */
void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
raxIterator ri;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (ri.key_len > keylen) continue;
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
continue;
bcastState *bs = ri.data;
/* We insert the client pointer as associated value in the radix
* tree. This way we know who was the client that did the last
* change to the key, and can avoid sending the notification in the
* case the client is in NOLOOP mode. */
raxInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL);
}
raxStop(&ri);
}
/* This function is called from signalModifiedKey() or other places in Redict
* when a key changes value. In the context of keys tracking, our task here is
* to send a notification to every client that may have keys about such caching
* slot.
*
* Note that 'c' may be NULL in case the operation was performed outside the
* context of a client modifying the database (for instance when we delete a
* key because of expire).
*
* The last argument 'bcast' tells the function if it should also schedule
* the key for broadcasting to clients in BCAST mode. This is the case when
* the function is called from the Redict core once a key is modified, however
* we also call the function in order to evict keys in the key table in case
* of memory pressure: in that case the key didn't really change, so we want
* just to notify the clients that are in the table for this key, that would
* otherwise miss the fact we are no longer tracking the key for them. */
void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
if (TrackingTable == NULL) return;
unsigned char *key = (unsigned char*)keyobj->ptr;
size_t keylen = sdslen(keyobj->ptr);
if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,(char *)key,keylen);
void *result;
if (!raxFind(TrackingTable,key,keylen,&result)) return;
rax *ids = result;
raxIterator ri;
raxStart(&ri,ids);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,sizeof(id));
client *target = lookupClientByID(id);
/* Note that if the client is in BCAST mode, we don't want to
* send invalidation messages that were pending in the case
* previously the client was not in BCAST mode. This can happen if
* TRACKING is enabled normally, and then the client switches to
* BCAST mode. */
if (target == NULL ||
!(target->flags & CLIENT_TRACKING)||
target->flags & CLIENT_TRACKING_BCAST)
{
continue;
}
/* If the client enabled the NOLOOP mode, don't send notifications
* about keys changed by the client itself. */
if (target->flags & CLIENT_TRACKING_NOLOOP &&
target == server.current_client)
{
continue;
}
/* If target is current client and it's executing a command, we need schedule key invalidation.
* As the invalidation messages may be interleaved with command
* response and should after command response. */
if (target == server.current_client && (server.current_client->flags & CLIENT_EXECUTING_COMMAND)) {
incrRefCount(keyobj);
listAddNodeTail(server.tracking_pending_keys, keyobj);
} else {
sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0);
}
}
raxStop(&ri);
/* Free the tracking table: we'll create the radix tree and populate it
* again if more keys will be modified in this caching slot. */
TrackingTableTotalItems -= raxSize(ids);
raxFree(ids);
raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}
void trackingHandlePendingKeyInvalidations(void) {
if (!listLength(server.tracking_pending_keys)) return;
/* Flush pending invalidation messages only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
if (server.execution_nesting) return;
listNode *ln;
listIter li;
listRewind(server.tracking_pending_keys,&li);
while ((ln = listNext(&li)) != NULL) {
robj *key = listNodeValue(ln);
/* current_client maybe freed, so we need to send invalidation
* message only when current_client is still alive */
if (server.current_client != NULL) {
if (key != NULL) {
sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0);
} else {
sendTrackingMessage(server.current_client,shared.null[server.current_client->resp]->ptr,
sdslen(shared.null[server.current_client->resp]->ptr),1);
}
}
if (key != NULL) decrRefCount(key);
}
listEmpty(server.tracking_pending_keys);
}
/* This function is called when one or all the Redict databases are
* flushed. Caching keys are not specific for each DB but are global:
* currently what we do is send a special notification to clients with
* tracking enabled, sending a RESP NULL, which means, "all the keys",
* in order to avoid flooding clients with many invalidation messages
* for all the keys they may hold.
*/
void freeTrackingRadixTreeCallback(void *rt) {
raxFree(rt);
}
void freeTrackingRadixTree(rax *rt) {
raxFreeWithCallback(rt,freeTrackingRadixTreeCallback);
}
/* A RESP NULL is sent to indicate that all keys are invalid */
void trackingInvalidateKeysOnFlush(int async) {
if (server.tracking_clients) {
listNode *ln;
listIter li;
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
if (c == server.current_client) {
/* We use a special NULL to indicate that we should send null */
listAddNodeTail(server.tracking_pending_keys,NULL);
} else {
sendTrackingMessage(c,shared.null[c->resp]->ptr,sdslen(shared.null[c->resp]->ptr),1);
}
}
}
}
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
if (TrackingTable) {
if (async) {
freeTrackingRadixTreeAsync(TrackingTable);
} else {
freeTrackingRadixTree(TrackingTable);
}
TrackingTable = raxNew();
TrackingTableTotalItems = 0;
}
}
/* Tracking forces Redict to remember information about which client may have
* certain keys. In workloads where there are a lot of reads, but keys are
* hardly modified, the amount of information we have to remember server side
* could be a lot, with the number of keys being totally not bound.
*
* So Redict allows the user to configure a maximum number of keys for the
* invalidation table. This function makes sure that we don't go over the
* specified fill rate: if we are over, we can just evict information about
* a random key, and send invalidation messages to clients like if the key was
* modified. */
void trackingLimitUsedSlots(void) {
static unsigned int timeout_counter = 0;
if (TrackingTable == NULL) return;
if (server.tracking_table_max_keys == 0) return; /* No limits set. */
size_t max_keys = server.tracking_table_max_keys;
if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
return; /* Limit not reached. */
}
/* We have to invalidate a few keys to reach the limit again. The effort
* we do here is proportional to the number of times we entered this
* function and found that we are still over the limit. */
int effort = 100 * (timeout_counter+1);
/* We just remove one key after another by using a random walk. */
raxIterator ri;
raxStart(&ri,TrackingTable);
while(effort > 0) {
effort--;
raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0);
if (raxEOF(&ri)) break;
robj *keyobj = createStringObject((char*)ri.key,ri.key_len);
trackingInvalidateKey(NULL,keyobj,0);
decrRefCount(keyobj);
if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
raxStop(&ri);
return; /* Return ASAP: we are again under the limit. */
}
}
/* If we reach this point, we were not able to go under the configured
* limit using the maximum effort we had for this run. */
raxStop(&ri);
timeout_counter++;
}
/* Generate Redict protocol for an array containing all the key names
* in the 'keys' radix tree. If the client is not NULL, the list will not
* include keys that were modified the last time by this client, in order
* to implement the NOLOOP option.
*
* If the resulting array would be empty, NULL is returned instead. */
sds trackingBuildBroadcastReply(client *c, rax *keys) {
raxIterator ri;
uint64_t count;
if (c == NULL) {
count = raxSize(keys);
} else {
count = 0;
raxStart(&ri,keys);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (ri.data != c) count++;
}
raxStop(&ri);
if (count == 0) return NULL;
}
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),count);
sds proto = sdsempty();
proto = sdsMakeRoomFor(proto,count*15);
proto = sdscatlen(proto,"*",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
raxStart(&ri,keys);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (c && ri.data == c) continue;
len = ll2string(buf,sizeof(buf),ri.key_len);
proto = sdscatlen(proto,"$",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
proto = sdscatlen(proto,ri.key,ri.key_len);
proto = sdscatlen(proto,"\r\n",2);
}
raxStop(&ri);
return proto;
}
/* This function will run the prefixes of clients in BCAST mode and
* keys that were modified about each prefix, and will send the
* notifications to each client in each prefix. */
void trackingBroadcastInvalidationMessages(void) {
raxIterator ri, ri2;
/* Return ASAP if there is nothing to do here. */
if (TrackingTable == NULL || !server.tracking_clients) return;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
/* For each prefix... */
while(raxNext(&ri)) {
bcastState *bs = ri.data;
if (raxSize(bs->keys)) {
/* Generate the common protocol for all the clients that are
* not using the NOLOOP option. */
sds proto = trackingBuildBroadcastReply(NULL,bs->keys);
/* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
client *c;
memcpy(&c,ri2.key,sizeof(c));
if (c->flags & CLIENT_TRACKING_NOLOOP) {
/* This client may have certain keys excluded. */
sds adhoc = trackingBuildBroadcastReply(c,bs->keys);
if (adhoc) {
sendTrackingMessage(c,adhoc,sdslen(adhoc),1);
sdsfree(adhoc);
}
} else {
sendTrackingMessage(c,proto,sdslen(proto),1);
}
}
raxStop(&ri2);
/* Clean up: we can remove everything from this state, because we
* want to only track the new keys that will be accumulated starting
* from now. */
sdsfree(proto);
}
raxFree(bs->keys);
bs->keys = raxNew();
}
raxStop(&ri);
}
/* This is just used in order to access the amount of used slots in the
* tracking table. */
uint64_t trackingGetTotalItems(void) {
return TrackingTableTotalItems;
}
uint64_t trackingGetTotalKeys(void) {
if (TrackingTable == NULL) return 0;
return raxSize(TrackingTable);
}
uint64_t trackingGetTotalPrefixes(void) {
if (PrefixTable == NULL) return 0;
return raxSize(PrefixTable);
}
|