File: rabbit_federation_queue.erl

package info (click to toggle)
rabbitmq-server 3.3.5-1.1
  • links: PTS
  • area: main
  • in suites: jessie-kfreebsd
  • size: 12,004 kB
  • sloc: erlang: 78,203; python: 3,187; xml: 2,843; makefile: 903; sh: 831; java: 660; perl: 64; ruby: 63
file content (114 lines) | stat: -rw-r--r-- 4,206 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ Federation.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
%%

-module(rabbit_federation_queue).

-rabbit_boot_step({?MODULE,
                   [{description, "federation queue decorator"},
                    {mfa, {rabbit_registry, register,
                           [queue_decorator, <<"federation">>, ?MODULE]}},
                    {requires, rabbit_registry},
                    {enables, recovery}]}).

-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_federation.hrl").

-behaviour(rabbit_queue_decorator).

-export([startup/1, shutdown/1, policy_changed/2, active_for/1,
         consumer_state_changed/3]).
-export([policy_changed_local/2]).

-import(rabbit_misc, [pget/2]).

%%----------------------------------------------------------------------------

startup(Q) ->
    case active_for(Q) of
        true  -> rabbit_federation_queue_link_sup_sup:start_child(Q);
        false -> ok
    end,
    ok.

shutdown(Q = #amqqueue{name = QName}) ->
    case active_for(Q) of
        true  -> rabbit_federation_queue_link_sup_sup:stop_child(Q),
                 rabbit_federation_status:remove_exchange_or_queue(QName);
        false -> ok
    end,
    ok.

policy_changed(Q1 = #amqqueue{name = QName}, Q2) ->
    case rabbit_amqqueue:lookup(QName) of
        {ok, #amqqueue{pid = QPid}} ->
            rpc:call(node(QPid), rabbit_federation_queue,
                     policy_changed_local, [Q1, Q2]);
        {error, not_found} ->
            ok
    end.

policy_changed_local(Q1, Q2) ->
    shutdown(Q1),
    startup(Q2).

active_for(Q = #amqqueue{arguments = Args}) ->
    case rabbit_misc:table_lookup(Args, <<"x-internal-purpose">>) of
        {longstr, _} -> false; %% [0]
        _            -> rabbit_federation_upstream:federate(Q)
    end.
%% [0] Currently the only "internal purpose" is federation, but I
%% suspect if we introduce another one it will also be for something
%% that doesn't want to be federated.

%% We need to reconsider whether we need to run or pause every time
%% the consumer state changes in the queue. But why can the state
%% change?
%%
%% consumer blocked   | We may have no more active consumers, and thus need to
%%                    | pause
%%                    |
%% consumer unblocked | We don't care
%%                    |
%% queue empty        | The queue has become empty therefore we need to run to
%%                    | get more messages
%%                    |
%% basic consume      | We don't care
%%                    |
%% basic cancel       | We may have no more active consumers, and thus need to
%%                    | pause
%%                    |
%% refresh            | We asked for it (we have started a new link after
%%                    | failover and need something to prod us into action
%%                    | (or not)).
%%
%% In the cases where we don't care it's not prohibitively expensive
%% for us to be here anyway, so never mind.
%%
%% Note that there is no "queue became non-empty" state change - that's
%% because of the queue invariant. If the queue transitions from empty to
%% non-empty then it must have no active consumers - in which case it stays
%% the same from our POV.

consumer_state_changed(#amqqueue{name = QName}, MaxActivePriority, IsEmpty) ->
    case IsEmpty andalso active_unfederated(MaxActivePriority) of
        true  -> rabbit_federation_queue_link:run(QName);
        false -> rabbit_federation_queue_link:pause(QName)
    end,
    ok.

active_unfederated(empty)         -> false;
active_unfederated(P) when P >= 0 -> true;
active_unfederated(_P)            -> false.