File: inline_fork_wait.cpp

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (65 lines) | stat: -rw-r--r-- 2,053 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <torch/csrc/jit/jit_log.h>
#include <torch/csrc/jit/passes/inline_fork_wait.h>

namespace torch {
namespace jit {

void InlineForkWait(
    Block* b,
    std::unordered_map<Value*, Value*>& future_remap) {
  auto nodes = b->nodes();

  // Track the futures returned by prim::fork.
  for (auto it = nodes.begin(); it != nodes.end(); it++) {
    auto node = *it;
    if (node->kind() != prim::fork) {
      continue;
    }
    WithInsertPoint insert_guard(node);
    auto graph = b->owningGraph();
    auto subgraph = node->g(attr::Subgraph);

    auto output = insertGraph(*graph, *subgraph, node->inputs());

    future_remap[node->output()] = output.at(0);
  }

  // Remove aten::wait if its input future is returned by prim::fork.
  auto reversed = b->nodes().reverse();
  for (auto it = reversed.begin(); it != reversed.end(); it++) {
    auto node = *it;
    if (node->kind() == prim::fork) {
      // Account for the case where the aten::wait call isn't present in
      // the current graph.
      node->output()->replaceAllUsesWith(future_remap.at(node->output()));
      it.destroyCurrent();
    } else if (node->kind() == aten::wait) {
      AT_ASSERT(node->inputs().size() == 1);
      AT_ASSERT(node->outputs().size() == 1);
      // If the future does not map to a prim::fork, it could be
      // returned from prim::rpc_async, which has side effect, so it shouldn't
      // be dead code eliminated.
      if (future_remap.count(node->input())) {
        node->output()->replaceAllUsesWith(future_remap.at(node->input()));
        it.destroyCurrent();
      }
    }
  }

  // Recursively inline fork/wait.
  for (auto it = nodes.begin(); it != nodes.end(); it++) {
    auto node = *it;
    for (auto sub_b : node->blocks()) {
      InlineForkWait(sub_b, future_remap);
    }
  }
}

void InlineForkWait(const std::shared_ptr<Graph>& graph) {
  std::unordered_map<Value*, Value*> future_remap;
  InlineForkWait(graph->block(), future_remap);
  GRAPH_DUMP("After InlineForkWait: ", graph);
}

} // namespace jit
} // namespace torch