File: async_dataloader.rb

package info (click to toggle)
ruby-graphql 2.2.17-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 9,584 kB
  • sloc: ruby: 67,505; ansic: 1,753; yacc: 831; javascript: 331; makefile: 6
file content (85 lines) | stat: -rw-r--r-- 2,491 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# frozen_string_literal: true
module GraphQL
  class Dataloader
    class AsyncDataloader < Dataloader
      def yield
        if (condition = Thread.current[:graphql_dataloader_next_tick])
          condition.wait
        else
          Fiber.yield
        end
        nil
      end

      def run
        job_fibers = []
        next_job_fibers = []
        source_tasks = []
        next_source_tasks = []
        first_pass = true
        sources_condition = Async::Condition.new
        manager = spawn_fiber do
          while first_pass || job_fibers.any?
            first_pass = false

            while (f = (job_fibers.shift || spawn_job_fiber))
              if f.alive?
                finished = run_fiber(f)
                if !finished
                  next_job_fibers << f
                end
              end
            end
            job_fibers.concat(next_job_fibers)
            next_job_fibers.clear

            Sync do |root_task|
              while source_tasks.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
                while (task = source_tasks.shift || spawn_source_task(root_task, sources_condition))
                  if task.alive?
                    root_task.yield # give the source task a chance to run
                    next_source_tasks << task
                  end
                end
                sources_condition.signal
                source_tasks.concat(next_source_tasks)
                next_source_tasks.clear
              end
            end
          end
        end

        manager.resume
        if manager.alive?
          raise "Invariant: Manager didn't terminate successfully: #{manager}"
        end

      rescue UncaughtThrowError => e
        throw e.tag, e.value
      end

      private

      def spawn_source_task(parent_task, condition)
        pending_sources = nil
        @source_cache.each_value do |source_by_batch_params|
          source_by_batch_params.each_value do |source|
            if source.pending?
              pending_sources ||= []
              pending_sources << source
            end
          end
        end

        if pending_sources
          fiber_vars = get_fiber_variables
          parent_task.async do
            set_fiber_variables(fiber_vars)
            Thread.current[:graphql_dataloader_next_tick] = condition
            pending_sources.each(&:run_pending_keys)
          end
        end
      end
    end
  end
end