1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
|
# Cluster API documentation
This document describes using `libvalkey` in cluster mode, including an overview of the synchronous and asynchronous APIs.
It is not intended as a complete reference. For that it's always best to refer to the source code.
## Table of Contents
- [Synchronous API](#synchronous-api)
- [Connecting](#connecting)
- [Connection options](#connection-options)
- [Executing commands](#executing-commands)
- [Executing commands on a specific node](#executing-commands-on-a-specific-node)
- [Disconnecting/cleanup](#disconnecting-cleanup)
- [Pipelining](#pipelining)
- [Events](#events)
- [Asynchronous API](#asynchronous-api)
- [Connecting](#connecting-1)
- [Connection options](#connection-options-1)
- [Executing commands](#executing-commands-1)
- [Executing commands on a specific node](#executing-commands-on-a-specific-node-1)
- [Disconnecting/cleanup](#disconnecting-cleanup-1)
- [Events](#events-1)
- [Miscellaneous](#miscellaneous)
- [TLS support](#tls-support)
- [Cluster node iterator](#cluster-node-iterator)
- [Extend the list of supported commands](#extend-the-list-of-supported-commands)
- [Random number generator](#random-number-generator)
## Synchronous API
### Connecting
There are a few alternative ways to setup and connect to a cluster.
The basic alternatives lacks most options, but can be enough for some use cases.
```c
valkeyClusterContext *valkeyClusterConnect(const char *addrs);
valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs,
const struct timeval tv);
```
There is also a convenience struct to specify various options.
```c
valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options);
```
When connecting to a cluster, `NULL` is returned when the context can't be allocated, or `err` and `errstr` are set in the returned allocated context when there are issues.
So when connecting it's simple to handle error states.
```c
valkeyClusterContext *cc = valkeyClusterConnect("127.0.0.1:6379,127.0.0.1:6380");
if (cc == NULL || cc->err) {
fprintf(stderr, "Error: %s\n", cc ? cc->errstr : "OOM");
}
```
### Connection options
There are a variety of options you can specify using the `valkeyClusterOptions` struct when connecting to a cluster.
This includes information about how to connect to the cluster and defining optional callbacks and other options.
See [include/valkey/cluster.h](../include/valkey/cluster.h) for more details.
```c
valkeyClusterOptions opt = {
.initial_nodes = "127.0.0.1:6379,127.0.0.1:6380"; // Addresses to initially connect to.
.options = VALKEY_OPT_USE_CLUSTER_NODES; // See available option flags below.
.password = "password"; // Authenticate connections using the `AUTH` command.
};
valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&opt);
if (cc == NULL || cc->err) {
fprintf(stderr, "Error: %s\n", cc ? cc->errstr : "OOM");
}
```
There are also several flags you can specify in `valkeyClusterOptions.options`. It's a bitwise OR of the following flags:
| Flag | Description |
| --- | --- |
| `VALKEY_OPT_USE_CLUSTER_NODES` | Tells libvalkey to use the command `CLUSTER NODES` when updating its slot map (cluster topology).<br>Libvalkey uses `CLUSTER SLOTS` by default. |
| `VALKEY_OPT_USE_REPLICAS` | Tells libvalkey to keep parsed information of replica nodes. |
| `VALKEY_OPT_BLOCKING_INITIAL_UPDATE` | **ASYNC**: Tells libvalkey to perform the initial slot map update in a blocking fashion. The function call will wait for a slot map update before returning so that the returned context is immediately ready to accept commands. |
| `VALKEY_OPT_REUSEADDR` | Tells libvalkey to set the [SO_REUSEADDR](https://man7.org/linux/man-pages/man7/socket.7.html) socket option |
| `VALKEY_OPT_PREFER_IPV4`<br>`VALKEY_OPT_PREFER_IPV6`<br>`VALKEY_OPT_PREFER_IP_UNSPEC` | Informs libvalkey to either prefer IPv4 or IPv6 when invoking [getaddrinfo](https://man7.org/linux/man-pages/man3/gai_strerror.3.html). `VALKEY_OPT_PREFER_IP_UNSPEC` will cause libvalkey to specify `AF_UNSPEC` in the getaddrinfo call, which means both IPv4 and IPv6 addresses will be searched simultaneously.<br>Libvalkey prefers IPv4 by default. |
| `VALKEY_OPT_MPTCP` | Tells libvalkey to use multipath TCP (MPTCP). Note that only when both the server and client are using MPTCP do they establish an MPTCP connection between them; otherwise, they use a regular TCP connection instead. |
### Executing commands
The primary command interface is a `printf`-like function that takes a format string along with a variable number of arguments.
This will construct a `RESP` command and deliver it to the correct node in the cluster.
```c
valkeyReply *reply = valkeyClusterCommand(cc, "SET %s %d", "counter", 42);
if (reply == NULL) {
fprintf(stderr, "Communication error: %s\n", cc->err ? cc->errstr : "Unknown error");
} else if (reply->type == VALKEY_REPLY_ERROR) {
fprintf(stderr, "Error response from node: %s\n", reply->str);
} else {
// Handle reply..
}
freeReplyObject(reply);
```
Commands will be sent to the cluster node that the client perceives handling the given key.
If the cluster topology has changed the Valkey node might respond with a redirection error which the client will handle, update its slot map and resend the command to correct node.
The reply will in this case arrive from the correct node.
If a node is unreachable, for example if the command times out or if the connect times out, it can indicate that there has been a failover and the node is no longer part of the cluster.
In this case, `valkeyClusterCommand` returns NULL and sets `err` and `errstr` on the cluster context, but additionally, libvalkey schedules a slot map update to be performed when the next command is sent.
That means that if you try the same command again, there is a good chance the command will be sent to another node and the command may succeed.
### Executing commands on a specific node
When there is a need to send commands to a specific node, the following low-level API can be used.
```c
valkeyReply *reply = valkeyClusterCommandToNode(cc, node, "DBSIZE");
```
This function handles `printf`-like arguments similar to `valkeyClusterCommand`, but will only attempt to send the command to the given node and will not perform redirects or retries.
If the command times out or the connection to the node fails, a slot map update is scheduled to be performed when the next command is sent.
`valkeyClusterCommandToNode` also performs a slot map update if it has previously been scheduled.
### Disconnecting/cleanup
To disconnect and free the context the following function can be used:
```c
valkeyClusterFree(cc);
```
This function closes the sockets and deallocates the context.
### Pipelining
For pipelined commands, every command is simply appended to the output buffer but not delivered to the server, until you attempt to read the first response, at which point the entire buffer will be delivered.
The `valkeyClusterAppendCommand` function can be used to append a command, which is identical to the `valkeyClusterCommand` family, apart from not returning a reply.
After calling an append function `valkeyClusterGetReply` can be used to receive the subsequent replies.
The following example shows a simple cluster pipeline.
```c
if (valkeyClusterAppendCommand(cc, "SET foo bar") != VALKEY_OK) {
fprintf(stderr, "Error appending command: %s\n", cc->errstr);
exit(1);
}
if (valkeyClusterAppendCommand(cc, "GET foo") != VALKEY_OK) {
fprintf(stderr, "Error appending command: %s\n", cc->errstr);
exit(1);
}
valkeyReply *reply;
if (valkeyClusterGetReply(cc,&reply) != VALKEY_OK) {
fprintf(stderr, "Error reading reply %zu: %s\n", i, c->errstr);
exit(1);
}
// Handle the reply for SET here.
freeReplyObject(reply);
if (valkeyClusterGetReply(cc,&reply) != VALKEY_OK) {
fprintf(stderr, "Error reading reply %zu: %s\n", i, c->errstr);
exit(1);
}
// Handle the reply for GET here.
freeReplyObject(reply);
```
### Events
#### Events per cluster context
There is a hook to get notified when certain events occur.
```c
/* Function to be called when events occur. */
void event_cb(const valkeyClusterContext *cc, int event, void *privdata) {
switch (event) {
// Handle event
}
}
valkeyClusterOptions opt = {
.event_callback = event_cb;
.event_privdata = my_privdata; // User defined data can be provided to the callback.
};
valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&opt);
```
The callback is called with `event` set to one of the following values:
* `VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED` when the slot mapping has been updated;
* `VALKEYCLUSTER_EVENT_READY` when the slot mapping has been fetched for the first
time and the client is ready to accept commands, useful when initiating the
client using `valkeyClusterAsyncConnectWithOptions` without enabling the option
`VALKEY_OPT_BLOCKING_INITIAL_UPDATE` where a client is not immediately ready
after a successful call;
* `VALKEYCLUSTER_EVENT_FREE_CONTEXT` when the cluster context is being freed, so
that the user can free the event `privdata`.
#### Events per connection
There is a hook to get notified about connect and reconnect attempts.
This is useful for applying socket options or access endpoint information for a connection to a particular node.
The callback is registered using an option.
```c
void connect_cb(const valkeyContext *c, int status) {
// Perform desired action
}
valkeyClusterOptions opt = {
.connect_callback = connect_cb;
};
valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&opt);
```
The callback is called just after connect, before TLS handshake and authentication.
On successful connection, `status` is set to `VALKEY_OK` and the `valkeyContext` can be used, for example,
to see which IP and port it's connected to or to set socket options directly on the file descriptor which can be accessed as `c->fd`.
On failed connection attempt, this callback is called with `status` set to `VALKEY_ERR`.
The `err` field in the `valkeyContext` can be used to find out the cause of the error.
## Asynchronous API
The asynchronous API supports a wide range of event libraries and uses [adapters](../include/valkey/adapters/) to attach to a specific event library.
Each adapter provide a convenience function that configures which event loop instance the created context will be attached to.
### Connecting
To asynchronously connect to a cluster a `valkeyClusterOptions` should first be initiated with initial nodes and more,
but it's also important to configure which event library to use before calling `valkeyClusterAsyncConnectWithOptions`.
```c
valkeyClusterOptions options = {
.initial_nodes = "127.0.0.1:7000";
};
// Use convenience function to set which event library to use.
valkeyClusterOptionsUseLibev(&options, EV_DEFAULT);
// Initiate the context and start connecting to the initial nodes.
valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options);
```
Since an initial slot map update is performed asynchronously any command sent directly after `valkeyClusterAsyncConnectWithOptions` may fail
because the initial slot map has not yet been retrieved and the client doesn't know which cluster node to send the command to.
You may use the [`eventCallback`](#events-per-cluster-context-1) to be notified when the slot map is updated and the client is ready to accept commands.
A crude example of using the `eventCallback` can be found in [this test case](../tests/ct_async.c).
Another option is to enable blocking initial slot map updates using the option `VALKEY_OPT_BLOCKING_INITIAL_UPDATE`.
When enabled `valkeyClusterAsyncConnectWithOptions` will initially connect to the cluster in a blocking fashion and wait for the slot map before returning.
Any command sent by the user thereafter will create a new non-blocking connection, unless a non-blocking connection already exists to the destination.
The function returns a pointer to a newly created `valkeyClusterAsyncContext` struct and its `err` field should be checked to make sure the initial slot map update was successful.
### Connection options
There is a variety of options you can specify using the `valkeyClusterOptions` struct when connecting to a cluster.
One asynchronous API specific option is `VALKEY_OPT_BLOCKING_INITIAL_UPDATE` which enables the initial slot map update to be performed in a blocking fashion.
The connect function will wait for a slot map update before returning so that the returned context is immediately ready to accept commands.
See previous [Connection options](#connection-options) section for common options.
### Executing commands
Executing commands in an asynchronous context work similarly to the synchronous context, except that you can pass a callback that will be invoked when the reply is received.
```c
int status = valkeyClusterAsyncCommand(cc, commandCallback, privdata,
"SET %s %s", "key", "value");
```
The return value is `VALKEY_OK` when the command was successfully added to the output buffer and `VALKEY_ERR` otherwise.
When the connection is being disconnected per user-request, no new commands may be added to the output buffer and `VALKEY_ERR` is returned.
Commands will be sent to the cluster node that the client perceives handling the given key.
If the cluster topology has changed the Valkey node might respond with a redirection error which the client will handle, update its slot map and resend the command to correct node.
The reply will in this case arrive from the correct node.
The reply callback, that is called when the reply is received, should have the following prototype:
```c
void(valkeyClusterAsyncContext *acc, void *reply, void *privdata);
```
The `privdata` argument can be used to carry arbitrary data to the callback.
All pending callbacks are called with a `NULL` reply when the context encountered an error.
### Executing commands on a specific node
When there is a need to send commands to a specific node, the following low-level API can be used.
```c
status = valkeyClusterAsyncCommandToNode(acc, node, commandCallback, privdata, "DBSIZE");
```
This functions will only attempt to send the command to a specific node and will not perform redirects or retries, but communication errors will trigger a slot map update just like the commonly used API.
### Disconnecting/cleanup
Asynchronous cluster connections can be terminated using:
```c
valkeyClusterAsyncDisconnect(acc);
```
When this function is called, connections are **not** immediately terminated.
Instead, new commands are no longer accepted and connections are only terminated when all pending commands have been written to a socket, their respective replies have been read and their respective callbacks have been executed.
After this, the disconnection callback is executed with the `VALKEY_OK` status and the context object is freed.
### Events
#### Events per cluster context
Use [`event_callback` in `valkeyClusterOptions`](#events-per-cluster-context) to get notified when certain events occur.
When the callback function requires the current `valkeyClusterAsyncContext`, it can typecast the given `valkeyClusterContext` to a `valkeyClusterAsyncContext`.
The `valkeyClusterAsyncContext` struct is an extension of the `valkeyClusterContext` struct.
```c
void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) {
valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)cc;
}
```
#### Events per connection
Because the connections that will be created are non-blocking, the kernel is not able to instantly return if the specified host and port is able to accept a connection.
Instead, use a connect callback to be notified when a connection is established or failed.
Similarly, a disconnect callback can be used to be notified about a disconnected connection (either because of an error or per user request).
The callbacks can be enabled using the following options when calling `valkeyClusterAsyncConnectWithOptions`:
```c
valkeyClusterOptions opt = {
.async_connect_callback = connect_cb;
.async_disconnect_callback = disconnect_cb;
}
```
The connect callback function should have the following prototype, aliased to `valkeyConnectCallback`:
```c
void(valkeyAsyncContext *ac, int status);
```
On a connection attempt, the `status` argument is set to `VALKEY_OK` when the connection was successful.
The file description of the connection socket can be retrieved from a `valkeyAsyncContext` as `ac->c->fd`.
The disconnect callback function should have the following prototype, aliased to `valkeyDisconnectCallback`:
```c
void(const valkeyAsyncContext *ac, int status);
```
On a disconnection the `status` argument is set to `VALKEY_OK` if it was initiated by the user, or to `VALKEY_ERR` when it was caused by an error.
When caused by an error the `err` field in the context can be accessed to get the error cause.
You don't need to reconnect in the disconnect callback since libvalkey will reconnect by itself when the next command is handled.
## Miscellaneous
### TLS support
TLS support is not enabled by default and requires an explicit build flag as described in [`README.md`](../README.md#building).
When support is enabled, TLS can be enabled on a cluster context using a prepared `valkeyTLSContext` and the options `valkeyClusterOptions.tls` and `valkeyClusterOptions.tls_init_fn`.
```c
// Initialize the OpenSSL library.
valkeyInitOpenSSL();
// Initialize a valkeyTLSContext, which holds an OpenSSL context.
valkeyTLSContext *tls = valkeyCreateTLSContext("ca.crt", NULL, "client.crt",
"client.key", NULL, NULL);
// Set options to enable TLS on context.
valkeyClusterOptions opt = {
.tls = tls;
.tls_init_fn = &valkeyInitiateTLSWithContext;
};
valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&opt);
if (cc == NULL || cc->err) {
fprintf(stderr, "Error: %s\n", cc ? cc->errstr : "OOM");
}
```
### Cluster node iterator
A `valkeyClusterNodeIterator` can be used to iterate on all known master nodes in a cluster context.
First it needs to be initiated using `valkeyClusterInitNodeIterator` and then you can repeatedly call `valkeyClusterNodeNext` to get the next node from the iterator.
```c
valkeyClusterNodeIterator ni;
valkeyClusterInitNodeIterator(&ni, cc);
valkeyClusterNode *node;
while ((node = valkeyClusterNodeNext(&ni)) != NULL) {
valkeyReply *reply = valkeyClusterCommandToNode(cc, node, "DBSIZE");
// Handle reply..
}
```
The iterator will handle changes due to slot map updates by restarting the iteration, but on the new set of master nodes.
There is no bookkeeping for already iterated nodes when a restart is triggered, which means that a node can be iterated over more than once depending on when the slot map update happened and the change of cluster nodes.
Note that when `valkeyClusterCommandToNode` is called, a slot map update can happen if it has been scheduled by the previous command, for example if the previous call to `valkeyClusterCommandToNode` timed out or the node wasn't reachable.
To detect when the slot map has been updated, you can check if the slot map version (`iter.route_version`) is equal to the current cluster context's slot map version (`cc->route_version`).
If it isn't, it means that the slot map has been updated and the iterator will restart itself at the next call to `valkeyClusterNodeNext`.
Another way to detect that the slot map has been updated is to [register an event callback](#events-per-cluster-context) and look for the event `VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED`.
### Extend the list of supported commands
The list of commands and the position of the first key in the command line is defined in [`src/cmddef.h`](../src/cmddef.h) which is included in this repository.
It has been generated using the `JSON` files describing the syntax of each command in the Valkey repository, which makes sure we support all commands in Valkey, at least in terms of cluster routing.
To add support for custom commands defined in modules, you can regenerate `cmddef.h` using the script [`gencommands.py`](../script/gencommands.py).
Use the `JSON` files from Valkey and any additional files on the same format as arguments to the script.
For details, see the comments inside `gencommands.py`.
### Random number generator
This library uses [random()](https://linux.die.net/man/3/random) while selecting a node used for requesting the cluster topology (slot map).
A user should seed the random number generator using [`srandom()`](https://linux.die.net/man/3/srandom) to get less predictability in the node selection.
|