1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
// SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
// SPDX-License-Identifier: BSD-3-Clause
#include "vtkPartitionBalancer.h"
#include "vtkCommunicator.h"
#include "vtkCompositeDataSet.h"
#include "vtkDataObject.h"
#include "vtkInformation.h"
#include "vtkInformationVector.h"
#include "vtkMultiProcessController.h"
#include "vtkObjectFactory.h"
#include "vtkPartitionedDataSet.h"
#include "vtkStreamingDemandDrivenPipeline.h"
#include <algorithm>
#include <numeric>
#include <vector>
VTK_ABI_NAMESPACE_BEGIN
vtkStandardNewMacro(vtkPartitionBalancer);
vtkCxxSetObjectMacro(vtkPartitionBalancer, Controller, vtkMultiProcessController);
namespace
{
//------------------------------------------------------------------------------
void CompositeShallowCopy(vtkPartitionedDataSet* inputPDS, vtkPartitionedDataSet* outputPDS,
int numberOfNonNullPartitionsInInput, int offset = 0)
{
for (int outPartitionId = 0, inPartitionId = 0; outPartitionId < numberOfNonNullPartitionsInInput;
++inPartitionId, ++outPartitionId)
{
vtkDataObject* inputDO = inputPDS->GetPartitionAsDataObject(inPartitionId);
while (!inputDO)
{
++inPartitionId;
inputDO = inputPDS->GetPartitionAsDataObject(inPartitionId);
}
outputPDS->SetPartition(outPartitionId + offset, inputDO);
}
}
} // anonymous namespace
//----------------------------------------------------------------------------
vtkPartitionBalancer::vtkPartitionBalancer()
: Controller(nullptr)
, Mode(vtkPartitionBalancer::Squash)
{
this->SetController(vtkMultiProcessController::GetGlobalController());
}
//----------------------------------------------------------------------------
vtkPartitionBalancer::~vtkPartitionBalancer()
{
this->SetController(nullptr);
}
//----------------------------------------------------------------------------
int vtkPartitionBalancer::RequestData(
vtkInformation*, vtkInformationVector** inputVector, vtkInformationVector* outputVector)
{
vtkPartitionedDataSet* inputPDS = vtkPartitionedDataSet::GetData(inputVector[0], 0);
vtkPartitionedDataSet* outputPDS = vtkPartitionedDataSet::GetData(outputVector, 0);
int numberOfNonNullPartitionsInInput = 0;
for (unsigned int partitionId = 0; partitionId < inputPDS->GetNumberOfPartitions(); ++partitionId)
{
numberOfNonNullPartitionsInInput +=
(inputPDS->GetPartitionAsDataObject(partitionId) != nullptr) ? 1 : 0;
}
if (!this->Controller)
{
outputPDS->CompositeShallowCopy(inputPDS);
outputPDS->RemoveNullPartitions();
return 1;
}
std::vector<int> recvBuf(this->Controller->GetNumberOfProcesses());
this->Controller->AllGather(&numberOfNonNullPartitionsInInput, recvBuf.data(), 1);
if (this->Mode == vtkPartitionBalancer::Expand)
{
const int localProcessId = this->Controller->GetLocalProcessId();
const int numberOfPartitions = std::accumulate(recvBuf.begin(), recvBuf.end(), 0);
const int offset = std::accumulate(recvBuf.begin(), recvBuf.begin() + localProcessId, 0);
outputPDS->SetNumberOfPartitions(numberOfPartitions);
::CompositeShallowCopy(inputPDS, outputPDS, numberOfNonNullPartitionsInInput, offset);
}
else if (this->Mode == vtkPartitionBalancer::Squash)
{
const int numberOfPartitions = *std::max_element(recvBuf.begin(), recvBuf.end());
outputPDS->SetNumberOfPartitions(numberOfPartitions);
::CompositeShallowCopy(inputPDS, outputPDS, numberOfNonNullPartitionsInInput);
}
else
{
vtkErrorMacro(<< "Unknown mode: " << this->Mode);
return 0;
}
return 1;
}
//----------------------------------------------------------------------------
void vtkPartitionBalancer::PrintSelf(ostream& os, vtkIndent indent)
{
this->Superclass::PrintSelf(os, indent);
os << indent << "Controller: " << this->Controller << endl;
switch (this->Mode)
{
case vtkPartitionBalancer::Expand:
os << indent << "Mode: Expand" << endl;
break;
case vtkPartitionBalancer::Squash:
os << indent << "Mode: Squash" << endl;
break;
default:
os << indent << "Mode: Wrong value" << endl;
break;
}
}
VTK_ABI_NAMESPACE_END
|