1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
{
**********************************************************************
This file is part of the Free Pascal run time library.
See the file COPYING.FPC, included in this distribution,
for details about the license.
**********************************************************************
Utilities using light weight threads.
Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org
Abstract:
Utility functions using mtprocs.
For example a parallel sort.
}
unit MTPUtils;
{$mode objfpc}{$H+}
interface
uses
Classes, SysUtils, MTProcs;
type
TSortPartEvent = procedure(aList: PPointer; aCount: PtrInt);
{ TParallelSortPointerList }
TParallelSortPointerList = class
protected
fBlockSize: PtrInt;
fBlockCntPowOf2Offset: PtrInt;
FMergeBuffer: PPointer;
procedure MTPSort(Index: PtrInt; {%H-}Data: Pointer; Item: TMultiThreadProcItem);
public
List: PPointer;
Count: PtrInt;
Compare: TListSortCompare;
BlockCnt: PtrInt;
OnSortPart: TSortPartEvent;
constructor Create(aList: PPointer; aCount: PtrInt; const aCompare: TListSortCompare;
MaxThreadCount: integer = 0);
procedure Sort;
end;
{ Sort a list in parallel using merge sort.
You must provide a compare function.
You can provide your own sort function for the blocks which are sorted in a
single thread, for example a normal quicksort. }
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare;
MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
implementation
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare;
MaxThreadCount: integer; const OnSortPart: TSortPartEvent);
var
Sorter: TParallelSortPointerList;
begin
if List.Count<=1 then exit;
Sorter:=TParallelSortPointerList.Create(@List.List[0],List.Count,Compare,
MaxThreadCount);
try
Sorter.OnSortPart:=OnSortPart;
Sorter.Sort;
finally
Sorter.Free;
end;
end;
{ TParallelSortPointerList }
procedure TParallelSortPointerList.MTPSort(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
procedure MergeSort(L, M, R: PtrInt; Recursive: boolean);
var
Src1: PtrInt;
Src2: PtrInt;
Dest1: PtrInt;
begin
if R-L<=1 then begin
// sort lists of 1 and 2 items directly
if L<R then begin
if Compare(List[L],List[R])>0 then begin
FMergeBuffer[L]:=List[L];
List[L]:=List[R];
List[R]:=FMergeBuffer[L];
end;
end;
exit;
end;
// sort recursively
if Recursive then begin
MergeSort(L,(L+M) div 2,M-1,true);
MergeSort(M,(M+R+1) div 2,R,true);
end;
// merge both blocks
Src1:=L;
Src2:=M;
Dest1:=L;
repeat
if (Src1<M)
and ((Src2>R) or (Compare(List[Src1],List[Src2])<=0)) then begin
FMergeBuffer[Dest1]:=List[Src1];
inc(Dest1);
inc(Src1);
end else if (Src2<=R) then begin
FMergeBuffer[Dest1]:=List[Src2];
inc(Dest1);
inc(Src2);
end else
break;
until false;
// write the mergebuffer back
Src1:=L;
Dest1:=l;
while Src1<=R do begin
List[Dest1]:=FMergeBuffer[Src1];
inc(Src1);
inc(Dest1);
end;
end;
var
L, M, R: PtrInt;
i: integer;
NormIndex: Integer;
Range: integer;
MergeIndex: Integer;
begin
L:=fBlockSize*Index;
R:=L+fBlockSize-1;
if R>=Count then
R:=Count-1; // last block
//WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' sort block: ',L,' ',(L+R+1) div 2,' ',R);
if Assigned(OnSortPart) then
OnSortPart(@List[L],R-L+1)
else
MergeSort(L,(L+R+1) div 2,R,true);
// merge
// 0 1 2 3 4 5 6 7
// \/ \/ \/ \/
// \/ \/
// \/
// For example: BlockCnt = 5 => Index in 0..4
// fBlockCntPowOf2Offset = 3 (=8-5)
// NormIndex = Index + 3 => NormIndex in 3..7
NormIndex:=Index+fBlockCntPowOf2Offset;
i:=0;
repeat
Range:=1 shl i;
if NormIndex and Range=0 then break;
// merge left and right block(s)
MergeIndex:=NormIndex-Range-fBlockCntPowOf2Offset;
if (MergeIndex+Range-1>=0) then begin
// wait until left blocks have finished
//WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' wait for block ',MergeIndex);
if (MergeIndex>=0) and (not Item.WaitForIndex(MergeIndex)) then exit;
// compute left and right block bounds
M:=L;
L:=(MergeIndex-Range+1)*fBlockSize;
if L<0 then L:=0;
//WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' merge blocks ',L,' ',M,' ',R);
MergeSort(L,M,R,false);
end;
inc(i);
until false;
//WriteLn('TParallelSortPointerList.LWTSort END Index='+IntToStr(Index));
end;
constructor TParallelSortPointerList.Create(aList: PPointer; aCount: PtrInt;
const aCompare: TListSortCompare; MaxThreadCount: integer);
begin
List:=aList;
Count:=aCount;
Compare:=aCompare;
BlockCnt:=Count div 100; // at least 100 items per thread
if BlockCnt>ProcThreadPool.MaxThreadCount then
BlockCnt:=ProcThreadPool.MaxThreadCount;
if (MaxThreadCount>0) and (BlockCnt>MaxThreadCount) then
BlockCnt:=MaxThreadCount;
if BlockCnt<1 then BlockCnt:=1;
end;
procedure TParallelSortPointerList.Sort;
begin
if (Count<=1) then exit;
fBlockSize:=(Count+BlockCnt-1) div BlockCnt;
fBlockCntPowOf2Offset:=1;
while fBlockCntPowOf2Offset<BlockCnt do
fBlockCntPowOf2Offset:=fBlockCntPowOf2Offset*2;
fBlockCntPowOf2Offset:=fBlockCntPowOf2Offset-BlockCnt;
//WriteLn('TParallelSortPointerList.Sort BlockCnt=',BlockCnt,' fBlockSize=',fBlockSize,' fBlockCntPowOf2Offset=',fBlockCntPowOf2Offset);
GetMem(FMergeBuffer,SizeOf(Pointer)*Count);
try
ProcThreadPool.DoParallel(@MTPSort,0,BlockCnt-1);
finally
FreeMem(FMergeBuffer);
FMergeBuffer:=nil;
end;
end;
end.
|