1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
defmodule RabbitMQ.CLI.Ctl.RpcStream do
alias RabbitMQ.CLI.Ctl.InfoKeys
def receive_list_items(node, mod, fun, args, timeout, info_keys) do
receive_list_items(node, [{mod, fun, args}], timeout, info_keys, 1)
end
def receive_list_items(node, mod, fun, args, timeout, info_keys, chunks) do
receive_list_items(node, [{mod, fun, args}], timeout, info_keys, chunks)
end
def receive_list_items(_node, _mfas, _timeout, _info_keys, 0) do
nil
end
def receive_list_items(node, mfas, timeout, info_keys, chunks_init) do
receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, fn v -> v end)
end
def receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, response_fun) do
pid = Kernel.self()
ref = Kernel.make_ref()
Enum.each(mfas, fn {m, f, a} ->
init_items_stream(node, m, f, a, timeout, pid, ref)
end)
Stream.unfold(
{chunks_init, :continue},
fn
:finished ->
response_fun.(nil)
{chunks, :continue} ->
received =
receive do
{^ref, :finished} when chunks === 1 ->
nil
{^ref, :finished} ->
{[], {chunks - 1, :continue}}
{^ref, {:timeout, t}} ->
{{:error, {:badrpc, {:timeout, t / 1000}}}, :finished}
{^ref, []} ->
{[], {chunks, :continue}}
{^ref, :error, {:badrpc, :timeout}} ->
{{:error, {:badrpc, {:timeout, timeout / 1000}}}, :finished}
{^ref, result, :continue} ->
{result, {chunks, :continue}}
{:error, _} = error ->
{error, :finished}
{^ref, :error, error} ->
{{:error, simplify_emission_error(error)}, :finished}
{:DOWN, _mref, :process, _pid, :normal} ->
{[], {chunks, :continue}}
{:DOWN, _mref, :process, _pid, reason} ->
{{:error, simplify_emission_error(reason)}, :finished}
end
response_fun.(received)
end
)
|> display_list_items(info_keys)
end
def simplify_emission_error({:badrpc, {:EXIT, {{:nocatch, error}, error_details}}}) do
{error, error_details}
end
def simplify_emission_error({{:nocatch, error}, error_details}) do
{error, error_details}
end
def simplify_emission_error(other) do
other
end
defp display_list_items(items, info_keys) do
items
|> Stream.filter(fn
[] -> false
_ -> true
end)
|> Stream.map(fn
{:error, error} ->
error
# here item is a list of keyword lists:
[[{_, _} | _] | _] = item ->
Enum.map(item, fn i -> InfoKeys.info_for_keys(i, info_keys) end)
item ->
InfoKeys.info_for_keys(item, info_keys)
end)
end
defp init_items_stream(_node, _mod, _fun, _args, 0, pid, ref) do
set_stream_timeout(pid, ref, 0)
end
defp init_items_stream(node, mod, fun, args, timeout, pid, ref) do
:rabbit_control_misc.spawn_emitter_caller(node, mod, fun, args, ref, pid, timeout)
set_stream_timeout(pid, ref, timeout)
end
defp set_stream_timeout(_, _, :infinity) do
:ok
end
defp set_stream_timeout(pid, ref, timeout) do
Process.send_after(pid, {ref, {:timeout, timeout}}, timeout)
end
end
|