File: vtkPartitionBalancer.cxx

package info (click to toggle)
vtk9 9.5.2%2Bdfsg3-6
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 205,984 kB
  • sloc: cpp: 2,336,570; ansic: 327,116; python: 111,200; yacc: 4,104; java: 3,977; sh: 3,032; xml: 2,771; perl: 2,189; lex: 1,787; makefile: 181; javascript: 165; objc: 153; tcl: 59
file content (129 lines) | stat: -rw-r--r-- 4,167 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
// SPDX-License-Identifier: BSD-3-Clause

#include "vtkPartitionBalancer.h"

#include "vtkCommunicator.h"
#include "vtkCompositeDataSet.h"
#include "vtkDataObject.h"
#include "vtkInformation.h"
#include "vtkInformationVector.h"
#include "vtkMultiProcessController.h"
#include "vtkObjectFactory.h"
#include "vtkPartitionedDataSet.h"
#include "vtkStreamingDemandDrivenPipeline.h"

#include <algorithm>
#include <numeric>
#include <vector>

VTK_ABI_NAMESPACE_BEGIN
vtkStandardNewMacro(vtkPartitionBalancer);
vtkCxxSetObjectMacro(vtkPartitionBalancer, Controller, vtkMultiProcessController);

namespace
{
//------------------------------------------------------------------------------
void CompositeShallowCopy(vtkPartitionedDataSet* inputPDS, vtkPartitionedDataSet* outputPDS,
  int numberOfNonNullPartitionsInInput, int offset = 0)
{
  for (int outPartitionId = 0, inPartitionId = 0; outPartitionId < numberOfNonNullPartitionsInInput;
       ++inPartitionId, ++outPartitionId)
  {
    vtkDataObject* inputDO = inputPDS->GetPartitionAsDataObject(inPartitionId);

    while (!inputDO)
    {
      ++inPartitionId;
      inputDO = inputPDS->GetPartitionAsDataObject(inPartitionId);
    }

    outputPDS->SetPartition(outPartitionId + offset, inputDO);
  }
}
} // anonymous namespace

//----------------------------------------------------------------------------
vtkPartitionBalancer::vtkPartitionBalancer()
  : Controller(nullptr)
  , Mode(vtkPartitionBalancer::Squash)
{
  this->SetController(vtkMultiProcessController::GetGlobalController());
}

//----------------------------------------------------------------------------
vtkPartitionBalancer::~vtkPartitionBalancer()
{
  this->SetController(nullptr);
}

//----------------------------------------------------------------------------
int vtkPartitionBalancer::RequestData(
  vtkInformation*, vtkInformationVector** inputVector, vtkInformationVector* outputVector)
{
  vtkPartitionedDataSet* inputPDS = vtkPartitionedDataSet::GetData(inputVector[0], 0);
  vtkPartitionedDataSet* outputPDS = vtkPartitionedDataSet::GetData(outputVector, 0);

  int numberOfNonNullPartitionsInInput = 0;

  for (unsigned int partitionId = 0; partitionId < inputPDS->GetNumberOfPartitions(); ++partitionId)
  {
    numberOfNonNullPartitionsInInput +=
      (inputPDS->GetPartitionAsDataObject(partitionId) != nullptr) ? 1 : 0;
  }

  if (!this->Controller)
  {
    outputPDS->CompositeShallowCopy(inputPDS);
    outputPDS->RemoveNullPartitions();
    return 1;
  }

  std::vector<int> recvBuf(this->Controller->GetNumberOfProcesses());

  this->Controller->AllGather(&numberOfNonNullPartitionsInInput, recvBuf.data(), 1);

  if (this->Mode == vtkPartitionBalancer::Expand)
  {
    const int localProcessId = this->Controller->GetLocalProcessId();
    const int numberOfPartitions = std::accumulate(recvBuf.begin(), recvBuf.end(), 0);
    const int offset = std::accumulate(recvBuf.begin(), recvBuf.begin() + localProcessId, 0);

    outputPDS->SetNumberOfPartitions(numberOfPartitions);
    ::CompositeShallowCopy(inputPDS, outputPDS, numberOfNonNullPartitionsInInput, offset);
  }
  else if (this->Mode == vtkPartitionBalancer::Squash)
  {
    const int numberOfPartitions = *std::max_element(recvBuf.begin(), recvBuf.end());

    outputPDS->SetNumberOfPartitions(numberOfPartitions);
    ::CompositeShallowCopy(inputPDS, outputPDS, numberOfNonNullPartitionsInInput);
  }
  else
  {
    vtkErrorMacro(<< "Unknown mode: " << this->Mode);
    return 0;
  }

  return 1;
}

//----------------------------------------------------------------------------
void vtkPartitionBalancer::PrintSelf(ostream& os, vtkIndent indent)
{
  this->Superclass::PrintSelf(os, indent);
  os << indent << "Controller: " << this->Controller << endl;
  switch (this->Mode)
  {
    case vtkPartitionBalancer::Expand:
      os << indent << "Mode: Expand" << endl;
      break;
    case vtkPartitionBalancer::Squash:
      os << indent << "Mode: Squash" << endl;
      break;
    default:
      os << indent << "Mode: Wrong value" << endl;
      break;
  }
}
VTK_ABI_NAMESPACE_END