1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
DeclareInfoClass("InfoIO");
DoChild := function( pipefd, func, arglist )
local file,ppid,ret;
ppid := IO_getppid();
ret := CallFuncList(func,arglist);
file := IO_WrapFD(pipefd,false,1024);
IO_Pickle(file,ret);
IO_Close(file);
IO_exit(0);
end;
DoParallelOptions := rec(
TimeOutSecs := false,
TimeOutuSecs := false,
);
DoParallelByFork := function(jobs,opt)
local answered,answers,file,i,j,n,pid,pids,pipes,pipescopy,r;
if not(IsEvenInt(Length(jobs))) or Length(jobs) < 4 or
not(IsRecord(opt)) then
Error(Concatenation("Usage: DoParallelByFork(jobs,opt); where ",
"jobs is [func,arglist{,func,arglist})"));
return fail;
fi;
IO_InstallSIGCHLDHandler();
for n in RecNames(DoParallelOptions) do
if not(IsBound(opt.(n))) then opt.(n) := DoParallelOptions.(n); fi;
od;
n := Length(jobs)/2;
pipes := EmptyPlist(n);
for i in [1..n] do
pipes[i] := IO_pipe();
if pipes[i] = fail then
for j in [1..i-1] do
IO_close(pipes[j].towrite);
IO_close(pipes[j].toread);
od;
Error("Cannot make pipes");
fi;
od;
pids := EmptyPlist(n);
for i in [1..n] do
pid := IO_fork();
if pid = 0 then
# we are in the child:
for j in [1..n] do
if j <> i then
IO_close(pipes[j].towrite);
IO_close(pipes[j].toread);
else
IO_close(pipes[j].toread);
fi;
od;
DoChild( pipes[i].towrite, jobs[2*i-1], jobs[2*i] );
IO_exit(0);
fi;
pids[i] := pid;
Info(InfoIO,2,"Started child, pid=",pid);
IO_close(pipes[i].towrite);
od;
pipes := List(pipes,x->x.toread);
pipescopy := ShallowCopy(pipes);
r := IO_select(pipescopy,[],[],opt.TimeOutSecs,opt.TimeOutuSecs);
answered := [];
answers := EmptyPlist(n);
for i in [1..n] do
if pipescopy[i] = fail then
IO_close(pipes[i]);
IO_kill(pids[i],IO.SIGTERM);
IO_WaitPid(pids[i],true);
Info(InfoIO,2,"Child ",pids[i]," terminated.");
else
Add(answered,i);
fi;
od;
Info(InfoIO,2,"Getting answers...");
for i in answered do
file := IO_WrapFD(pipes[i],1024,false);
answers[i] := IO_Unpickle(file);
IO_Close(file);
IO_WaitPid(pids[i],true);
Info(InfoIO,2,"Child ",pids[i]," terminated with answer.");
od;
return answers;
end;
|