File: net_parallel.h

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (85 lines) | stat: -rw-r--r-- 2,144 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#ifndef CAFFE2_CORE_NET_PARALLEL_H
#define CAFFE2_CORE_NET_PARALLEL_H

#include "caffe2/core/net_async_base.h"
#include "caffe2/core/net_async_task_graph.h"

C10_DECLARE_string(caffe2_task_graph_engine);

namespace caffe2 {

class ParallelNetExecutorHelper;

class TORCH_API ParallelNet : public NetBase {
 public:
  ParallelNet(const std::shared_ptr<const NetDef>& net_def, Workspace* ws);

  bool RunAsync() override;
  void Wait() override;

  bool SupportsAsync() override;
  std::vector<OperatorBase*> GetOperators() const override;

  TaskThreadPoolBase* Pool(const DeviceOption& device_option);

 protected:
  bool handleRunError() override;
  virtual void finishRun();
  virtual void reset();

  ExecutionOptions options_;
  int num_workers_;

  std::unique_ptr<ParallelNetExecutorHelper> helper_;
  std::shared_ptr<AsyncTaskGraphBase> task_graph_;
  AsyncTaskFuture* run_future_;

  std::vector<dag_utils::OperatorNode> operator_nodes_;
  std::vector<OperatorBase*> operators_;

  std::mutex pools_mutex_;
  typedef std::unordered_map<
      int,
      std::unordered_map<int, std::shared_ptr<TaskThreadPoolBase>>>
      PoolsMap;
  PoolsMap cpu_pools_;
  PoolsMap gpu_pools_;
  TaskThreadPoolBase*
  poolGetter(PoolsMap& pools, int device_type, int device_id, int pool_size);

  friend class ParallelNetExecutorHelper;
  C10_DISABLE_COPY_AND_ASSIGN(ParallelNet);
};

C10_DECLARE_SHARED_REGISTRY(
    TaskGraphRegistry,
    AsyncTaskGraphBase,
    ExecutorHelper*,
    const ExecutionOptions&);

std::shared_ptr<AsyncTaskGraphBase> GetAsyncTaskGraph(
    ExecutorHelper* helper,
    const ExecutionOptions& options);

class ParallelNetExecutorHelper : public ExecutorHelper {
 public:
  explicit ParallelNetExecutorHelper(ParallelNet* net) : net_(net) {}
  TaskThreadPoolBase* GetPool(const DeviceOption& option) const override {
    return net_->Pool(option);
  }

  std::vector<OperatorBase*> GetOperators() const override {
    return net_->GetOperators();
  }

  int GetNumWorkers() const override {
    return net_->num_workers_;
  }

 private:
  ParallelNet* net_;
};

} // namespace caffe2

#endif // CAFFE2_CORE_NET_PARALLEL_H