File: parlist.g

package info (click to toggle)
gap-scscp 2.1.4%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,624 kB
  • ctags: 7
  • sloc: xml: 1,232; sh: 454; makefile: 21
file content (286 lines) | stat: -rw-r--r-- 10,297 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#############################################################################
##
#W parlist.g                The SCSCP package             Alexander Konovalov
#W                                                               Steve Linton
##
#############################################################################

SCSCPprocesses:=[];

###########################################################################
##
#F  SCSCPreset
##
##  <#GAPDoc Label="SCSCPreset">
##  
##  <ManSection>
##  <Func Name="SCSCPreset" Arg=""/>
##  <Returns>
##    nothing
##  </Returns>          
##  <Description>
##  If an error occurs during a call of <Ref Func="ParQuickWithSCSCP" />
##  and <Ref Func="ParListWithSCSCP" />, some of parallel requests may
##  be still running at the remaining services, making them inaccessible
##  for further procedure calls. <Ref Func="SCSCPreset" /> resets them
##  by closing all open streams to &SCSCP; servers.
##  </Description>
##  </ManSection>
##  <#/GAPDoc>
##
SCSCPreset:=function()
local proc;
for proc in SCSCPprocesses do
	if not IsClosedStream( proc![1] ) then
		CloseStream( proc![1] );
	fi;	
od;
end;

#############################################################################
#
# ParQuickWithSCSCP( commands, listargs )
#
# The idea of ParQuickWithSCSCP is to apply various methods from the first 
# argument 'commands' containing the list of names of SCSCP procedures to 
# the list of arguments 'listargs', where i-th SCSCP procedure will be 
# executed at SCSCPservers[i]
#
# Example of usage (the time of computation by these two methods
# is approximately the same, so you should expect results from both
# methods in some random order from repeated calls):
#
# ParQuickWithSCSCP( [ "WS_FactorsECM", "WS_FactorsMPQS" ], [ 2^150+1 ] );
# ParQuickWithSCSCP( [ "WS_FactorsCFRAC", "WS_FactorsMPQS" ], [ 2^150+1 ] );
#
InstallGlobalFunction( ParQuickWithSCSCP, function( commands, listargs )
local nr, res;
if Length( commands ) < Length( SCSCPservers ) then
  Error("ParQuickWithSCSCP : the number of procedures smaller than the number of services!!!\n");
fi;
SCSCPprocesses := [];
for nr in [ 1 .. Length(commands) ] do
  SCSCPprocesses[nr] := NewProcess( commands[nr], listargs, SCSCPservers[nr][1], SCSCPservers[nr][2] );
od;  
res := FirstProcess( SCSCPprocesses );
SCSCPreset(); # we want this to be a tiny bit later to prevent broken pipes
return res;
end);


#############################################################################
##
## ParListWithSCSCP( inputlist, remoteprocname : noretry, timeout=int, recallfrequency=int )
##
InstallGlobalFunction( ParListWithSCSCP, function( inputlist, remoteprocname )
local noretry, status, i, itercount, recallfreq, output, callargspositions, 
      currentposition, inputposition, timeout, nr, waitinglist, descriptors, 
      s, nrdesc, retrystack, result, nrservices_alive, nrservices_needed, 
      len, infomw, connections;
       
if ValueOption("timeout")=fail then
  timeout:=60*60; # default timeout - one hour, given in seconds;
else
  timeout:=ValueOption("timeout");
fi;

if ValueOption("noretry")=fail then
  noretry:=false; # no retrying calls which exceeded the timeout
else
  noretry:=ValueOption("noretry");
fi;

if ValueOption("recallfrequency")=fail then
  recallfreq:=0; # no need in initial and perodic pinging services
else
  recallfreq:=ValueOption("recallfrequency");
fi;

infomw:=InfoLevel(InfoMasterWorker);

connections := [];
status := [ ];
nrservices_alive:=0;
nrservices_needed:=Length( inputlist );
len:=Length( inputlist );

for i in [ 1 .. Length(SCSCPservers) ] do
    if PingSCSCPservice( SCSCPservers[i][1], SCSCPservers[i][2] )=fail then
    	status[i]:=0; # the server is not alive  
    	Info( InfoSCSCP, 1, SCSCPservers[i], " is not responding and will not be used!" );
    else 
        connections[i] := NewSCSCPconnection( SCSCPservers[i][1], SCSCPservers[i][2] );
    	status[i]:=1; # the server is alive and ready to accept
        Info( InfoSCSCP, 1, SCSCPservers[i], " responded and attached to the computation!" );
        nrservices_alive := nrservices_alive + 1;
        if nrservices_alive >= nrservices_needed then
        	break;
        fi;
    fi;   
od;

if nrservices_alive = 0 then
	Error( "Can not start computation - no SCSCP service available!\n" );
fi;

output := [ ];
callargspositions := [ ];
retrystack:= [ ];
currentposition := 0;
SCSCPprocesses := [ ];
itercount:=0;

while true do
  itercount:=itercount+1;
  if recallfreq <> 0 then
    if IsInt(itercount/recallfreq) then
      nrservices_needed := Length( inputlist ) - currentposition + Length(retrystack);
      for i in [ 1 .. Length(SCSCPservers) ] do
        if status[i]=0 then
  	      if PingSCSCPservice( SCSCPservers[i][1], SCSCPservers[i][2] )=fail then
            Info( InfoSCSCP, 1, SCSCPservers[i], "is still not responding and can not be used!" );
          else  
            connections[i]:=NewSCSCPconnection( SCSCPservers[i][1], SCSCPservers[i][2] );
            status[i]:=1; # alive and ready to accept
            Info( InfoSCSCP, 1, SCSCPservers[i], " responded and attached to the computation!" );
            nrservices_alive := nrservices_alive + 1;
    	    if nrservices_alive >= nrservices_needed then
      		  break;
    	    fi;
          fi;
        fi;
      od;    
      itercount:=0;
    fi;
  fi;
  #
  # is next task available (from the initial list or retry stack)?
  #
  while currentposition < Length( inputlist ) or Length( retrystack ) > 0 do
    #
    # search for next available service
    #
    nr := Position( status, 1 );
    if nr<>fail then
      #
      # there is a service number 'nr' that is ready to accept procedure call
      #
      if Length( retrystack ) > 0 then
        inputposition := retrystack[ Length( retrystack ) ];
        Unbind( retrystack[ Length( retrystack ) ] );
      else
      	currentposition := currentposition + 1;
      	inputposition := currentposition;
      fi;	
      # remember which argument was sent to this service
      callargspositions[nr] := inputposition;
      SCSCPprocesses[nr] := NewProcess( remoteprocname, 
                                        [ inputlist[inputposition] ], 
                                        connections[nr] );
      if infomw <> 0 then                       
        if infomw = 1 then
      	  Print( inputposition, "/", len, "\r");
        elif infomw = 2 or infomw = 3 then
      	  Print( "#I  ", inputposition, "/", len, ":master --> ", 
                SCSCPservers[nr][1], ":", SCSCPservers[nr][2], "\n" );
        else
      	  Print( "#I  ", inputposition, "/", len, ":master --> ", 
                SCSCPservers[nr][1], ":", SCSCPservers[nr][2], " : ", inputlist[inputposition], "\n" );
        fi;        
	  fi;
      status[nr] := 2; # status 2 means that we are waiting to hear from this service
    else
      break; # if we are here all services are busy
    fi;
  od;  
  #
  # see are there any waiting tasks
  #
  waitinglist:= Filtered( [ 1 .. Length(status) ], i -> status[i]=2 );
  if Length( waitinglist ) = 0 then
  	if Length( callargspositions ) = 0 then
  	  # no next tasks, no waiting tasks and no arguments sent off - computation completed!
  	  if not noretry and ( Length(output) <> Length(inputlist) or not IsDenseList(output) ) then
  	    Error( "The output list does not match the input list!\n" );
  	  else
  	    for i in [1..Length(connections)] do
  	      if not IsClosedStream ( connections[i]![1] ) then
  	        CloseSCSCPconnection( connections[i] );
  	      fi;  
  	    od;
        return output;
      fi;
    else
      Error( "Tasks for arguments ", 
      			inputlist{ Filtered( [ 1 .. Length(callargspositions) ], 
      				i -> IsBound( callargspositions[i] ) ) }, " are lost!\n");
    fi;  
  fi;
  #
  # waiting until any of the running tasks will be completed
  #
  descriptors := List( SCSCPprocesses{waitinglist}, s -> IO_GetFD( s![1]![1] ) );  
  if IN_SCSCP_TRACING_MODE then SCSCPTraceSuspendThread(); fi;
  IO_select( descriptors, [ ], [ ], timeout, 0 );
  if IN_SCSCP_TRACING_MODE then SCSCPTraceRunThread(); fi;
  nrdesc := First( [ 1 .. Length(descriptors) ], i -> descriptors[i] <> fail );
  # if nothing came and timeout has passed then nrdesc=fail
  # This may happen when server was terminated by ^C and is in a break loop,
  # so no procedure_terminated message will appear on the client's side
  if nrdesc=fail then
    if noretry then
      nr := Random( waitinglist );
  	  TerminateProcess( SCSCPprocesses[nr] );
  	  if not IsClosedStream( SCSCPprocesses[nr]![1] ) then
		CloseStream( SCSCPprocesses[nr]![1] );
	  fi;	
	  CloseSCSCPconnection( connections[nr] );
      result:=fail;
    else  
   	  Error( "ParSCSCP: waited for ", timeout, " seconds with no response from ", SCSCPservers{waitinglist}, "\n" );  
   	fi;
  else	
  	nr := waitinglist[ nrdesc ];
  	result := CompleteProcess( SCSCPprocesses[nr] );
  fi;
  
  if result=fail then
    if noretry then
      Info( InfoSCSCP, 2, SCSCPservers[nr], " : timeout exceeded, procedure call terminated" );
      status[nr]:=1;
    else
 	  # the service SCSCPservers[nr] seems to crash, mark it as unavailable
 	  if PingSCSCPservice( SCSCPservers[nr][1], SCSCPservers[nr][2] ) = fail then
 		Print( SCSCPservers[nr], " is no longer available \n" );
 	 	status[nr]:=0;
 	 	nrservices_alive := nrservices_alive - 1;
		if nrservices_alive = 0 then
		  Error( "Can not continue computation - no SCSCP service left available!\n" );
		fi;
 	  else
 		Error("ParSCSCP: failed to get result from ", SCSCPservers[nr] );
 	  fi;
      # we need to retry the call with argument inputlist[callargspositions[nr] ]
      Add( retrystack, callargspositions[nr] );
    fi;
  else
    #
    # processing the result
    #
    if infomw <> 0 then                       
      if infomw > 2 then
        Print( "#I  ", SCSCPservers[nr][1], ":", SCSCPservers[nr][2], 
               " --> ", callargspositions[nr], "/", len, ":master" );
        if infomw > 4 then
          Print( " : ", result.object, "\n" );
        else
          Print("\n"); 
        fi;
      fi;        
	fi;
    status[nr]:=1;
    output[ callargspositions[nr] ] := result.object;
  fi;
  Unbind(callargspositions[nr]);
od; # end of the outer loop
end);