File: parallel.ex

package info (click to toggle)
erlang-hex 2.0.6-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,204 kB
  • sloc: erlang: 2,950; sh: 203; makefile: 10
file content (140 lines) | stat: -rw-r--r-- 3,449 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
defmodule Hex.Parallel do
  @moduledoc false

  # Runs a number of jobs (with an upper bound) in parallel and
  # awaits them to finish.

  use GenServer
  require Logger

  def start_link([name]) do
    GenServer.start_link(__MODULE__, [], name: name)
  end

  def run(name, id, opts \\ [], fun) do
    GenServer.call(name, {:run, id, opts, fun})
  end

  def await(name, id, timeout) do
    GenServer.call(name, {:await, id}, timeout)
  end

  def clear(name) do
    GenServer.call(name, :clear)
  end

  def init([]) do
    {:ok, new_state()}
  end

  def handle_call({:run, id, opts, fun}, {pid, _ref}, state) do
    await? = Keyword.get(opts, :await, true)
    state = run_task(id, fun, state)

    state =
      if await? do
        state
      else
        %{state | waiting_reply: Map.put(state.waiting_reply, id, {:send, pid})}
      end

    {:reply, :ok, state}
  end

  def handle_call({:await, id}, from, state) do
    if result = state.finished[id] do
      state = %{state | finished: Map.delete(state.finished, id)}
      {:reply, result, state}
    else
      state = %{state | waiting_reply: Map.put(state.waiting_reply, id, {:gen, from})}
      {:noreply, state}
    end
  end

  def handle_call(:clear, _from, state) do
    Enum.each(state.running, fn {%Task{pid: pid}, _} ->
      Process.unlink(pid)
      Process.exit(pid, :kill)
    end)

    state = %{state | running: %{}, finished: %{}, waiting: :queue.new(), waiting_reply: %{}}
    {:reply, :ok, state}
  end

  def handle_info({ref, message}, state) when is_reference(ref) do
    tasks = Map.keys(state.running)

    if task = Enum.find(tasks, &(&1.ref == ref)) do
      id = state.running[task]

      state =
        %{state | running: Map.delete(state.running, task)}
        |> reply(id, message)
        |> next_task()

      {:noreply, state}
    else
      Logger.error("[Hex] Hex.Parallel received unknown reply: #{inspect({ref, message})}")
      {:noreply, state}
    end
  end

  def handle_info({:DOWN, ref, _, proc, reason}, state) do
    tasks = Map.keys(state.running)

    if Enum.find(tasks, &(&1.ref == ref)) do
      Logger.error(
        "[Hex] Hex.Parallel task #{inspect(proc)} died with reason: #{inspect(reason)}"
      )

      {:noreply, %{state | running: Map.delete(state.running, ref)}}
    else
      {:noreply, state}
    end
  end

  defp reply(state, id, message) do
    case state.waiting_reply[id] do
      {:gen, from} ->
        GenServer.reply(from, message)
        %{state | waiting_reply: Map.delete(state.waiting_reply, id)}

      {:send, pid} ->
        send(pid, message)
        %{state | waiting_reply: Map.delete(state.waiting_reply, id)}

      nil ->
        %{state | finished: Map.put(state.finished, id, message)}
    end
  end

  defp next_task(state) do
    case :queue.out(state.waiting) do
      {{:value, {id, fun}}, waiting} ->
        state = %{state | waiting: waiting}
        run_task(id, fun, state)

      {:empty, _} ->
        state
    end
  end

  defp run_task(id, fun, state) do
    if map_size(state.running) >= state.max_jobs do
      %{state | waiting: :queue.in({id, fun}, state.waiting)}
    else
      task = Task.async(fun)
      %{state | running: Map.put(state.running, task, id)}
    end
  end

  defp new_state() do
    %{
      max_jobs: Hex.State.fetch!(:http_concurrency),
      running: %{},
      finished: %{},
      waiting: :queue.new(),
      waiting_reply: %{}
    }
  end
end