1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
|
# RabbitMQ Consistent Hash Exchange Type
## Introduction
This plugin adds a consistent-hash exchange type to RabbitMQ. This
exchange type uses consistent hashing (intro blog posts: [one](http://www.martinbroadhurst.com/Consistent-Hash-Ring.html), [two](http://michaelnielsen.org/blog/consistent-hashing/), [three](https://akshatm.svbtle.com/consistent-hash-rings-theory-and-implementation)) to distribute
messages between the bound queues. It is recommended to get a basic understanding of the
concept before evaluating this plugin and its alternatives.
[rabbitmq-sharding](https://github.com/rabbitmq/rabbitmq-sharding) is another plugin
that provides a way to partition a stream of messages among a set of consumers
while trading off total stream ordering for processing parallelism.
## Problem Definition
In various scenarios it may be desired to ensure that messages sent to an
exchange are reasonably [uniformly distributed](https://en.wikipedia.org/wiki/Uniform_distribution_(discrete)) across a number of
queues based on the routing key of the message, a [nominated
header](#routing-on-a-header), or a [message property](#routing-on-a-header).
Technically this can be accomplished using a direct or topic exchange,
binding queues to that exchange and then publishing messages to that exchange that
match the various binding keys.
However, arranging things this way can be problematic:
1. It is difficult to ensure that all queues bound to the exchange
will receive a (roughly) equal number of messages (distribution uniformity)
without baking in to the publishers quite a lot of knowledge about the number of queues and
their bindings.
2. When the number of queues changes, it is not easy to ensure that the
new topology still distributes messages between the different queues
roughly evenly.
[Consistent Hashing](https://en.wikipedia.org/wiki/Consistent_hashing)
is a hashing technique whereby each bucket appears at multiple points
throughout the hash space, and the bucket selected is the nearest
higher (or lower, it doesn't matter, provided it's consistent) bucket
to the computed hash (and the hash space wraps around). The effect of
this is that when a new bucket is added or an existing bucket removed,
only a very few hashes change which bucket they are routed to.
## Purpose
The purpose of this exchange type is to help developers achieve
a reasonably even message flow distribution between a number of queues.
## Installation
This plugin ships with RabbitMQ.
## Enabling the Plugin
This plugin ships with RabbitMQ. Like all other [RabbitMQ plugins](https://www.rabbitmq.com/plugins.html),
it has to be enabled before it can be used:
``` sh
rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
```
## Provided Exchange Type
The exchange type is `"x-consistent-hash"`.
## How It Works
In the case of Consistent Hashing as an exchange type, the hash is
calculated from a message property (most commonly the routing key).
When a queue is bound to this exchange, it is assigned one or more
partitions on the consistent hashing ring depending on its binding weight
(covered below).
For every property hash (e.g. routing key), a hash position computed
and a corresponding hash ring partition is picked. That partition corresponds
to a bound queue, and the message is routed to that queue.
Assuming a reasonably even routing key distribution of inbound messages,
routed messages should be reasonably evenly distributed across all
ring partitions, and thus queues according to their binding weights.
### Bindings and Hash Ring Buckets
#### One Binding Per Queue
This exchange type **assumes a single binding between a queue and an exchange**.
This will be enforced in the code:
when multiple bindings are created, only the first one will actually update the ring.
This limitation makes most semantic sense: the purpose is to achieve
a reasonably even message flow distribution between queues.
#### Weights
When a queue is bound to a Consistent Hash exchange, the binding key
is a number-as-a-string which indicates the binding weight: the number
of buckets (sections of the range) that will be associated with the
target queue.
In most environments, using **one bucket per binding** (and thus queue)
is highly recommended as it is the simplest way to achieve reasonably
even balancing.
### Consistent Hashing-based Routing
The hashing distributes *routing keys* among queues, not *message payloads*
among queues; all messages with the same routing key will go the
same queue. So, if you wish for queue A to receive twice as many
routing keys routed to it than are routed to queue B, then you bind
the queue A with a binding key of twice the number (as a string --
binding keys are always strings) of the binding key of the binding
to queue B. Note this is only the case if your routing keys are
evenly distributed in the hash space. If, for example, only two
distinct routing keys are used on all the messages, there's a chance
both keys will route (consistently!) to the same queue, even though
other queues have higher values in their binding key. With a larger
set of routing keys used, the statistical distribution of routing
keys approaches the ratios of the binding keys.
Each message gets delivered to at most one queue. On average, a
message gets delivered to exactly one queue. Concurrent binding changes
and queue primary replica failures can temporarily affect this but
over the long term, assuming equal weights of every binding,
the distribution should be roughly even.
### Node Restart Effects
Consistent hashing ring is stored in memory and will be re-populated
from exchange bindings when the node boots. Relative positioning of queues
on the ring is not guaranteed to be the same between restarts. In practice
this means that after a restart, all queues will still receive roughly
the same number of messages routed to them (assuming routing key distribution
does not change) but a given routing key now **may route to a different queue**.
In other words, this exchange type provides consistent message distribution
between queues but cannot guarantee stable queue routing locality for messages
with a given routing key.
## Usage Example
### The Topology
In the below example the queues `q0` and `q1` get bound each with the weight of 1
in the hash space to the exchange `e` which means they'll each get
roughly the same number of routing keys. The queues `q2` and `q3`
however, get 2 buckets each (their weight is 2) which means they'll each get roughly the
same number of routing keys too, but that will be approximately twice
as many as `q0` and `q1`.
Note the `routing_key`s in the bindings are numbers-as-strings. This
is because AMQP 0-9-1 specifies the `routing_key` field must be a string.
### Choosing Appropriate Weight Values
The example uses low weight values intentionally.
Higher values will reduce throughput of the exchange, primarily for
workloads that experience a high binding churn (queues are bound to
and unbound from a consistent hash exchange frequently).
Equal weights of 1 for all bindings are recommended (and sufficient for most use cases).
### Inspecting Message Counts
The example then publishes 100,000 messages to our
exchange with random routing keys, the queues will get their share of
messages roughly equal to the binding keys ratios. After this has
completed, message distribution between queues can be inspected using
RabbitMQ's management UI and `rabbitmqctl list_queues`.
## Routing Keys and Uniformity of Distribution
It is important to ensure that the messages being published
to the exchange have varying routing keys: if a very
small set of routing keys are being used then there's a possibility of
messages not being evenly distributed between the bound queues. With a
large number of bound queues some queues may get no messages routed to
them at all.
If pseudo-random or unique values such as client/session/request identifiers
are used for routing keys (or another property used for hashing) then
reasonably uniform distribution should be observed.
### Executable Versions
Executable versions of some of the code examples can be found under [./examples](./examples).
### Code Example in Python
This version of the example uses [Pika](https://pika.readthedocs.io/en/stable/), the most widely used Python client for RabbitMQ:
``` python
#!/usr/bin/env python
import pika
import time
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = conn.channel()
ch.exchange_declare(exchange="e", exchange_type="x-consistent-hash", durable=True)
for q in ["q1", "q2", "q3", "q4"]:
ch.queue_declare(queue=q, durable=True)
ch.queue_purge(queue=q)
for q in ["q1", "q2"]:
ch.queue_bind(exchange="e", queue=q, routing_key="1")
for q in ["q3", "q4"]:
ch.queue_bind(exchange="e", queue=q, routing_key="2")
n = 100000
for rk in list(map(lambda s: str(s), range(0, n))):
ch.basic_publish(exchange="e", routing_key=rk, body="")
print("Done publishing.")
print("Waiting for routing to finish...")
# in order to keep this example simpler and focused,
# wait for a few seconds instead of using publisher confirms and waiting for those
time.sleep(5)
print("Done.")
conn.close()
```
### Code Example in Java
Below is a version of the example that uses
the official [RabbitMQ Java client](https://www.rabbitmq.com/api-guide.html):
``` java
package com.rabbitmq.examples;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class ConsistentHashExchangeExample1 {
private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, "e1", "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, "e1", "2");
}
ch.confirmSelect();
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
for (int i = 0; i < 100000; i++) {
ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
```
### Code Example in Ruby
Below is a version that uses [Bunny](http://rubybunny.info), the most widely used
Ruby client for RabbitMQ:
``` ruby
#!/usr/bin/env ruby
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
ch.confirm_select
q1 = ch.queue("q1", durable: true)
q2 = ch.queue("q2", durable: true)
q3 = ch.queue("q3", durable: true)
q4 = ch.queue("q4", durable: true)
[q1, q2, q3, q4]. each(&:purge)
x = ch.exchange("chx", type: "x-consistent-hash", durable: true)
[q1, q2].each { |q| q.bind(x, routing_key: "1") }
[q3, q4].each { |q| q.bind(x, routing_key: "2") }
n = 100_000
n.times do |i|
x.publish(i.to_s, routing_key: i.to_s)
end
ch.wait_for_confirms
puts "Done publishing!"
# wait for queue stats to be emitted so that management UI numbers
# are up-to-date
sleep 5
conn.close
puts "Done"
```
### Code Example in Erlang
Below is a version of the example that uses
the [RabbitMQ Erlang client](https://www.rabbitmq.com/erlang-client-user-guide.html):
``` erlang
-include_lib("amqp_client/include/amqp_client.hrl").
test() ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
Queues = [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>],
amqp_channel:call(Chan,
#'exchange.declare'{
exchange = <<"e">>, type = <<"x-consistent-hash">>
}),
[amqp_channel:call(Chan, #'queue.declare'{queue = Q}) || Q <- Queues],
[amqp_channel:call(Chan, #'queue.bind'{queue = Q,
exchange = <<"e">>,
routing_key = <<"1">>})
|| Q <- [<<"q0">>, <<"q1">>]],
[amqp_channel:call(Chan, #'queue.bind' {queue = Q,
exchange = <<"e">>,
routing_key = <<"2">>})
|| Q <- [<<"q2">>, <<"q3">>]],
RK = list_to_binary(integer_to_list(random:uniform(1000000))),
Msg = #amqp_msg{props = #'P_basic'{}, payload = <<>>},
[amqp_channel:call(Chan,
#'basic.publish'{
exchange = <<"e">>,
routing_key = RK
}, Msg) || _ <- lists:seq(1, 100000)],
amqp_connection:close(Conn),
ok.
```
## Configuration
### Routing on a Header
Under most circumstances the routing key is a good choice for something to
hash. However, in some cases it is necessary to use the routing key for some other
purpose (for example with more complex routing involving exchange to
exchange bindings). In this case it is possible to configure the consistent hash
exchange to route based on a named header instead. To do this, declare the
exchange with a string argument called "hash-header" naming the header to
be used.
When a `"hash-header"` is specified, the chosen header should be provided.
If published messages do not contain the header, they will all get
routed to the same **arbitrarily chosen** queue.
#### Code Example in Python
``` python
#!/usr/bin/env python
import pika
import time
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = conn.channel()
args = {u'hash-header': u'hash-on'}
ch.exchange_declare(exchange='e2',
exchange_type='x-consistent-hash',
arguments=args,
durable=True)
for q in ['q1', 'q2', 'q3', 'q4']:
ch.queue_declare(queue=q, durable=True)
ch.queue_purge(queue=q)
for q in ['q1', 'q2']:
ch.queue_bind(exchange='e2', queue=q, routing_key='1')
for q in ['q3', 'q4']:
ch.queue_bind(exchange='e2', queue=q, routing_key='2')
n = 100000
for rk in list(map(lambda s: str(s), range(0, n))):
hdrs = {u'hash-on': rk}
ch.basic_publish(exchange='e2',
routing_key='',
body='',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=2,
headers=hdrs))
print('Done publishing.')
print('Waiting for routing to finish...')
# in order to keep this example simpler and focused,
# wait for a few seconds instead of using publisher confirms and waiting for those
time.sleep(5)
print('Done.')
conn.close()
```
#### Code Example in Java
``` java
package com.rabbitmq.examples;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class ConsistentHashExchangeExample2 {
public static final String EXCHANGE = "e2";
private static String EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
Map<String, Object> args = new HashMap<>();
args.put("hash-header", "hash-on");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, EXCHANGE, "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, EXCHANGE, "2");
}
ch.confirmSelect();
for (int i = 0; i < 100000; i++) {
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
Map<String, Object> hdrs = new HashMap<>();
hdrs.put("hash-on", String.valueOf(i));
ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
```
#### Code Example in Ruby
``` ruby
#!/usr/bin/env ruby
require 'bundler'
Bundler.setup(:default, :test)
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
ch.confirm_select
q1 = ch.queue("q1", durable: true)
q2 = ch.queue("q2", durable: true)
q3 = ch.queue("q3", durable: true)
q4 = ch.queue("q4", durable: true)
[q1, q2, q3, q4]. each(&:purge)
x = ch.exchange("x2", type: "x-consistent-hash", durable: true, arguments: {"hash-header" => "hash-on"})
[q1, q2].each { |q| q.bind(x, routing_key: "1") }
[q3, q4].each { |q| q.bind(x, routing_key: "2") }
n = 100_000
(0..n).map(&:to_s).each do |i|
x.publish(i.to_s, routing_key: rand.to_s, headers: {"hash-on": i})
end
ch.wait_for_confirms
puts "Done publishing!"
# wait for queue stats to be emitted so that management UI numbers
# are up-to-date
sleep 5
conn.close
puts "Done"
```
#### Code Example in Erlang
With RabbitMQ Erlang client:
``` erlang
-include_lib("amqp_client/include/amqp_client.hrl").
test() ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
Queues = [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>],
amqp_channel:call(
Chan, #'exchange.declare'{
exchange = <<"e">>,
type = <<"x-consistent-hash">>,
arguments = [{<<"hash-header">>, longstr, <<"hash-on">>}]
}),
[amqp_channel:call(Chan, #'queue.declare'{queue = Q}) || Q <- Queues],
[amqp_channel:call(Chan, #'queue.bind' {queue = Q,
exchange = <<"e">>,
routing_key = <<"1">>})
|| Q <- [<<"q0">>, <<"q1">>]],
[amqp_channel:call(Chan, #'queue.bind' {queue = Q,
exchange = <<"e">>,
routing_key = <<"2">>})
|| Q <- [<<"q2">>, <<"q3">>]],
RK = list_to_binary(integer_to_list(random:uniform(1000000))),
Msg = #amqp_msg {props = #'P_basic'{headers = [{<<"hash-on">>, longstr, RK}]}, payload = <<>>},
[amqp_channel:call(Chan,
#'basic.publish'{
exchange = <<"e">>,
routing_key = <<"">>,
}, Msg) || _ <- lists:seq(1, 100000)],
amqp_connection:close(Conn),
ok.
```
### Routing on a Message Property
Instead of a value in the header property, you can route on the
``message_id``, ``correlation_id``, or ``timestamp`` message properties. To do so,
declare the exchange with a string argument called ``"hash-property"`` naming the
property to be used.
The `"hash-header"` and `"hash-property"` are mutually exclusive.
When a `"hash-property"` is specified, the chosen property should be provided.
If published messages do not contain the property, they will all get
routed to the same **arbitrarily chosen** queue.
#### Code Example in Python
``` python
#!/usr/bin/env python
import pika
import time
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = conn.channel()
args = {u'hash-property': u'message_id'}
ch.exchange_declare(exchange='e3',
exchange_type='x-consistent-hash',
arguments=args,
durable=True)
for q in ['q1', 'q2', 'q3', 'q4']:
ch.queue_declare(queue=q, durable=True)
ch.queue_purge(queue=q)
for q in ['q1', 'q2']:
ch.queue_bind(exchange='e3', queue=q, routing_key='1')
for q in ['q3', 'q4']:
ch.queue_bind(exchange='e3', queue=q, routing_key='2')
n = 100000
for rk in list(map(lambda s: str(s), range(0, n))):
ch.basic_publish(exchange='e3',
routing_key='',
body='',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=2,
message_id=rk))
print('Done publishing.')
print('Waiting for routing to finish...')
# in order to keep this example simpler and focused,
# wait for a few seconds instead of using publisher confirms and waiting for those
time.sleep(5)
print('Done.')
conn.close()
```
#### Code Example in Java
``` java
package com.rabbitmq.examples;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class ConsistentHashExchangeExample3 {
public static final String EXCHANGE = "e3";
private static String EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
Map<String, Object> args = new HashMap<>();
args.put("hash-property", "message_id");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, EXCHANGE, "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, EXCHANGE, "2");
}
ch.confirmSelect();
for (int i = 0; i < 100000; i++) {
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
ch.basicPublish(EXCHANGE, "", bldr.messageId(String.valueOf(i)).build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
```
#### Code Example in Ruby
``` ruby
#!/usr/bin/env ruby
require 'bundler'
Bundler.setup(:default, :test)
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
ch.confirm_select
q1 = ch.queue("q1", durable: true)
q2 = ch.queue("q2", durable: true)
q3 = ch.queue("q3", durable: true)
q4 = ch.queue("q4", durable: true)
[q1, q2, q3, q4].each(&:purge)
x = ch.exchange("x3", type: "x-consistent-hash", durable: true, arguments: {"hash-property" => "message_id"})
[q1, q2].each { |q| q.bind(x, routing_key: "1") }
[q3, q4].each { |q| q.bind(x, routing_key: "2") }
n = 100_000
(0..n).map(&:to_s).each do |i|
x.publish(i.to_s, routing_key: rand.to_s, message_id: i)
end
ch.wait_for_confirms
puts "Done publishing!"
# wait for queue stats to be emitted so that management UI numbers
# are up-to-date
sleep 5
conn.close
puts "Done"
```
#### Code Example in Erlang
``` erlang
-include_lib("amqp_client/include/amqp_client.hrl").
test() ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
Queues = [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>],
amqp_channel:call(Chan,
#'exchange.declare'{
exchange = <<"e">>, type = <<"x-consistent-hash">>,
arguments = {<<"hash-property">>, longstr, <<"message_id">>}
}),
[amqp_channel:call(Chan, #'queue.declare'{queue = Q}) || Q <- Queues],
[amqp_channel:call(Chan, #'queue.bind'{queue = Q,
exchange = <<"e">>,
routing_key = <<"1">>})
|| Q <- [<<"q0">>, <<"q1">>]],
[amqp_channel:call(Chan, #'queue.bind' {queue = Q,
exchange = <<"e">>,
routing_key = <<"2">>})
|| Q <- [<<"q2">>, <<"q3">>]],
RK = list_to_binary(integer_to_list(random:uniform(1000000)),
Msg = #amqp_msg{props = #'P_basic'{message_id = RK}, payload = <<>>},
[amqp_channel:call(Chan,
#'basic.publish'{
exchange = <<"e">>,
routing_key = <<"">>,
)
}, Msg) || _ <- lists:seq(1, 100000)],
amqp_connection:close(Conn),
ok.
```
## Getting Help
If you have questions or need help, feel free to ask on the
[RabbitMQ mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users).
## Implementation Details
The hash function used in this plugin as of RabbitMQ 3.7.8
is [A Fast, Minimal Memory, Consistent Hash Algorithm](https://arxiv.org/abs/1406.2294) by Lamping and Veach. Erlang's `phash2` function is used to convert non-integer values to
an integer one that can be used by the jump consistent hash function by Lamping and Veach.
### Distribution Uniformity
A Chi-squared test was used to evaluate distribution uniformity. Below are the
results for 18 bucket counts and how they compare to two commonly used `p-value`
thresholds:
|Number of buckets|Chi-squared test result|Degrees of freedom|p-value = 0.05|p-value = 0.01|
|-|-----------|------------------|--------|--------|
|2|0.5|1|3.84|6.64|
|3|0.946|2|5.99|9.21|
|4|2.939|3|7.81|11.35|
|5|2.163|4|3.49|13.28|
|6|2.592|5|11.07|15.09|
|7|4.654|6|12.59|16.81|
|8|7.566|7|14.07|18.48|
|9|5.847|8|15.51|20.09|
|10|9.790|9|16.92|21.67|
|11|13.448|10|18.31|23.21|
|12|12.432|11|19.68|24.73|
|13|12.338|12|21.02|26.22|
|14|9.898|13|22.36|27.69|
|15|8.513|14|23.69|29.14|
|16|6.997|15|24.99|30.58|
|17|6.279|16|26.30|32.00|
|18|10.373|17|28.87|34.81|
|19|12.935|18|30.14|36.19|
|20|11.895|19|31.41|37.57|
### Binding Operations and Bucket Management
When a queue is bound to a consistent hash exchange, the protocol method, `queue.bind`,
carries a weight in the routing (binding) key. The binding is given
a number of buckets on the hash ring (hash space) equal to the weight.
When a queue is unbound, the buckets added for the binding are deleted.
These two operations use linear algorithms to update the ring.
To perform routing the exchange extract the appropriate value for hashing,
hashes it and retrieves a bucket number from the ring, then the bucket and
its associated queue.
The implementation assumes there is only one binding between a consistent hash
exchange and a queue. Having more than one binding is unnecessary because
queue weight can be provided at the time of binding.
### Clustered Environments
The state of the hash space is distributed across all cluster nodes.
## Copyright and License
(c) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
Released under the Mozilla Public License 2.0, same as RabbitMQ.
See [LICENSE](./LICENSE) for details.
|