File: sge_task_depend.c

package info (click to toggle)
gridengine 8.1.9%2Bdfsg-13.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 57,140 kB
  • sloc: ansic: 432,689; java: 87,068; cpp: 31,958; sh: 29,445; jsp: 7,757; perl: 6,336; xml: 5,828; makefile: 4,704; csh: 3,934; ruby: 2,221; tcl: 1,676; lisp: 669; yacc: 519; python: 503; lex: 361; javascript: 200
file content (506 lines) | stat: -rw-r--r-- 17,274 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
/*___INFO__MARK_BEGIN__*/
/*************************************************************************
 *
 *  The Contents of this file are made available subject to the terms of
 *  the Sun Industry Standards Source License Version 1.2
 *
 *  Sun Microsystems Inc., March, 2001
 *
 *
 *  Sun Industry Standards Source License Version 1.2
 *  =================================================
 *  The contents of this file are subject to the Sun Industry Standards
 *  Source License Version 1.2 (the "License"); You may not use this file
 *  except in compliance with the License. You may obtain a copy of the
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 *
 *  Software provided under this License is provided on an "AS IS" basis,
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 *  See the License for the specific provisions governing your rights and
 *  obligations concerning the Software.
 *
 *  The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 *
 *  Copyright: 2007 by Rising Sun Pictures
 *
 *  All Rights Reserved.
 *
 ************************************************************************/
/*___INFO__MARK_END__*/

#include <stdio.h>

#include "uti/sge_rmon.h"

#include "sgeobj/sge_ja_task.h"
#include "sgeobj/sge_range.h"
#include "sgeobj/sge_job.h"
#include "sgeobj/sge_answer.h"

#include "sge_task_depend.h"
#include "sge_job_qmaster.h"
#include "sge_event_master.h"

static u_long32 task_depend_div_floor(u_long32 a, u_long32 b)
{
   return a / b;
}


static u_long32 nearest_index_in_A(u_long32 i, u_long32 t0, u_long32 sa) 
{
   return t0 + task_depend_div_floor(i - t0, sa) * sa;
}


static void task_depend(u_long32 *lb, u_long32 *ub, u_long32 t0, 
                        u_long32 sa, u_long32 sb, u_long32 step_b_id)
{
   /* simulate the equation i = t0 + n * sb, n in {0, 1, ..., N-1}. */

   u_long32 i = step_b_id;

   *lb = nearest_index_in_A(i, t0, sa);
   *ub = nearest_index_in_A(i + sb - 1, t0, sa);
}


/**************** qmaster/task/sge_task_depend_get_range() *******************
*  NAME
*     sge_task_depend_get_range() -- get predecessor job task depdendencies 
*
*  SYNOPSIS
*     int sge_task_depend_get_range(lListElem **range, lList **alpp, 
*                                   const lListElem *pre_jep, 
*                                   const lListElem *suc_jep, 
*                                   u_long32 task_id) 
*
*  FUNCTION
*     This function determines the range of sub-tasks of job pre_jep that
*     suc_jep.task_id will be dependent on when suc_jep has an explicit
*     array dependency hold on pre_jep (with -hold_jid_ad option).
*
*  INPUTS
*     lListElem **range        - RN_Type pointer
*     lList **alpp             - AN_Type list pointer
*     const lListElem *pre_jep - const JB_Type element
*     const lListElem *suc_jep - const JB_Type element
*     u_long32 task_id         - a valid suc_jep task id  
*
*  RESULT
*     int - 0 on success
*
*  NOTES
*     Let div(a, b) = floor(a / b)
*     Let nearest_index_in_A(i) = t0 + div(i - t0, sa) * sa
*
*     Sub-task B.i will be dependent on all tasks in A between
*     nearest_index_in_A(i) and nearest_index_in_A(i + sb - 1) where 
*     i = t0 + n * sb and n is a positive integer.
*
*     It is safe to swap pre_jep and suc_jep provided that the given task id
*     belongs to pre_jep (therefore reversing the sense of the dependence).
*
*     MT-NOTE: sge_task_depend_get_range() is MT safe
*
******************************************************************************/
int sge_task_depend_get_range(lListElem **range, lList **alpp, 
                              const lListElem *pre_jep, 
                              const lListElem *suc_jep, u_long32 task_id) 
{
   u_long32 a0, a1, b0, b1, sa, sb, rmin, rmax;

   DENTER(TOP_LAYER, "sge_task_depend_get_range");

   if (range == NULL || 
         pre_jep == NULL || 
         suc_jep == NULL) {
      DRETURN(STATUS_EUNKNOWN);
   }

   job_get_submit_task_ids(pre_jep, &a0, &a1, &sa);
   job_get_submit_task_ids(suc_jep, &b0, &b1, &sb);

   /* sanity check on task ranges */
   if (!sge_task_depend_is_same_range(pre_jep, suc_jep)) {
      DRETURN(STATUS_EUNKNOWN);
   }

   /* make sure task_id is the first task in a range */
   if (task_id < b0 || ((task_id - b0) % sb) != 0) {
      DRETURN(STATUS_EUNKNOWN);
   }

   /* make the actual call to the core dependence function */
   task_depend(&rmin, &rmax, a0, sa, sb, task_id);

   /* do some basic checks on the output */
   if (rmin < a0 || rmax > a1) {
      DRETURN(STATUS_EUNKNOWN);
   }

   /* if an existing range was not given, create one */
   if (*range == NULL) {
      *range = lCreateElem(RN_Type);
      if (*range == NULL) {
         DRETURN(STATUS_EUNKNOWN);
      }
   }

   range_set_all_ids(*range, rmin, rmax, sa);

   DRETURN(0);
}

/*****************************************************************************
 Tasks matching this profile are considered finished for dependence purposes
 *****************************************************************************/
static bool task_depend_is_finished(const lListElem *job, u_long32 task_id)
{
   const lListElem *ja_task = NULL;

   DENTER(TOP_LAYER, "task_depend_is_finished");

   if (!job_is_enrolled(job, task_id)) {
      DRETURN(false);
   }

   ja_task = job_search_task(job, NULL, task_id);
   if (ja_task != NULL) {
      const lListElem *task = NULL; /* PE_Type */
      if (lGetUlong(ja_task, JAT_status) != JFINISHED) {
         DRETURN(false);
      }
      for_each(task, lGetList(ja_task, JAT_task_list)) {
         if (lGetUlong(lFirst(lGetList(task, JB_ja_tasks)), JAT_status)
               !=JFINISHED) {
            DRETURN(false);
         }
      }
   }

   DRETURN(true);
}


/**************** qmaster/task/sge_task_depend_update() *******************
*  NAME
*     sge_task_depend_update() -- update job array dependencies for a task
*
*  SYNOPSIS
*     bool sge_task_depend_update(lListElem *jep, lList **alpp,
                                  u_long32 task_id)
*
*  FUNCTION
*     This function recalculates array dependency hold information
*     for a particular task id of job jep (-hold_jid_ad option).
*     If the task is independent (i.e., has no predecessor tasks that
*     are in an unfinished state), its id will be removed from the
*     JB_ja_a_h_ids hold range and potentially moved to JB_ja_n_h_ids.
*     Enrolled tasks may be held or unheld if they are not finished,
*     causing task mod events to be generated if the task was updated.
*
*  INPUTS
*     lListElem *jep   - JB_Type element
*     lList **alpp     - AN_Type list pointer
*     u_long32 task_id - a task id in job jep
*
*  RESULT
*     bool - true if the job was modified, otherwise false
*
*  NOTES
*     If array dependency information cannot be determined, then
*     this function will assume that a task dependence still exists.
*
*     A false result from this function should not be considered failure.
*     Update status is returned to help the caller decide whether modify
*     event code should be emitted.
*
*     If the job argument jep is NULL, or the task indicated by task_id
*     in jep is finished, false is returned.
*
*     MT-NOTE: Is not thread safe. Reads from the global Job-List
*
******************************************************************************/
bool sge_task_depend_update(lListElem *jep, lList **alpp, u_long32 task_id)
{
   const lListElem *pre = NULL;  /* JRE_Type */
   u_long32 hold_state, new_state;
   int Depend = 0;

   DENTER(TOP_LAYER, "sge_task_depend_update");

   /* this should not really be necessary */
   if (jep == NULL) {
      DPRINTF(("got NULL for job argument\n"));
      DRETURN(false);
   }

   /* JA: FIXME: now emulating qalter -h; just hold the task even if
      it is running, which handles task failed followed by qmod -c */
   if (task_depend_is_finished(jep, task_id)) {
      DRETURN(false);
   }

   /* process the resolved predecessor list */
   for_each(pre, lGetList(jep, JB_ja_ad_predecessor_list)) {
      u_long32 sa, sa_task_id, amin, amax;
      const lListElem *pred_jep = NULL; /* JB_Type */
      lListElem *dep_range = NULL;      /* RN_Type */

      /* locate the job id in the master list, if not found we can't do much */
      pred_jep = job_list_locate(*(object_type_get_master_list(SGE_TYPE_JOB)),
                                 lGetUlong(pre, JRE_job_number));
      if (!pred_jep) continue;

      /* use the RSP functions to determine dependent predecessor task range */
      if (sge_task_depend_get_range(&dep_range, alpp, pred_jep, jep, task_id)) {
         /* since we can't calculate it, we must assume dependence */
         lFreeElem(&dep_range);
         Depend = 1;
         break;
      }

      /* fetch predecessor job dependency range ids */
      range_get_all_ids(dep_range, &amin, &amax, &sa);

      /* all tasks between {amin, ..., amax} are dependencies unless finished */
      for (sa_task_id = amin; sa_task_id <= amax; sa_task_id += sa) {
         /* if the task is not finished => dependence */
         if (!task_depend_is_finished(pred_jep, sa_task_id))
            Depend = 1;
      }

      /* cleanup, if a dep range was allocated */
      lFreeElem(&dep_range);

      /* minor speed optimization */
      if (Depend) break;
   }

   /* alter the hold state based on dependence info */
   hold_state = job_get_hold_state(jep, task_id);
   if (Depend) {
      new_state = hold_state | MINUS_H_TGT_JA_AD;
   } else {
      new_state = hold_state & ~MINUS_H_TGT_JA_AD;
   }

   /* update the hold state, possibly moving the task between n_h_ids and
      a_h_ids (or maybe update JAT_hold and JAT_state for enrolled tasks) */
   if (new_state != hold_state) {
      job_set_hold_state(jep, alpp, task_id, new_state);
      if (job_is_enrolled(jep, task_id)) {
         /* all task mod events will need to be added individually */
         lListElem *ja_task = job_search_task(jep, NULL, task_id);
         if (ja_task != NULL)
            sge_add_jatask_event(sgeE_JATASK_MOD, jep, ja_task);
         DRETURN(false);
      }
      DRETURN(true);
   }

   DRETURN(false);
}

/****************** qmaster/task/sge_task_depend_init() **********************
*  NAME
*     sge_task_depend_init() -- initialize job array task dependencies
*
*  SYNOPSIS
*     bool sge_task_depend_init(lListElem *jep, lList **alpp)
*
*  FUNCTION
*     This function inits the JB_ja_a_h_ids dependence cache when the 
*     array dependency request list is non-empty (-hold_jid_ad option).
*     It might also update the JHELD flag of the JAT_state field for enrolled 
*     tasks, placing or clearing the MINUS_H_TGT_JA_AD bits of the JAT_hold
*     field. Task mod events are generated for the modified enrolled tasks.
*
*  INPUTS
*     lListElem *jep - JB_Type element
*     lList **alpp   - AN_Type list pointer
*
*  RESULT
*     bool - true if the job was modified, otherwise false
*
*  NOTES
*     Use thus function to completely recalculate array dependencies when
*     the ja_ad_predecessors list is modified, or upon qmaster restart.
*
*     If the job was never an array successor, this function has no effect.
*
*     MT-NOTE: sge_task_depend_init() is not MT safe (calls MT unsafe method)
*
******************************************************************************/
bool sge_task_depend_init(lListElem *jep, lList **alpp)
{
   bool ret = false;

   DENTER(TOP_LAYER, "sge_task_depend_init");

   if (jep == NULL) {
      DPRINTF(("got NULL for job argument\n"));
      DRETURN(false);
   }

   if (lGetNumberOfElem(lGetList(jep, JB_ja_ad_request_list)) > 0) {
      if (lGetNumberOfElem(lGetList(jep, JB_ja_ad_predecessor_list)) == 0) {
         /* fast case where all predecessors are "gone" */         
         if (sge_task_depend_flush(jep, alpp))
            ret = true;
      } else {
         u_long32 taskid, b0, b1, sb;
         job_get_submit_task_ids(jep, &b0, &b1, &sb);
         for (taskid = b0; taskid <= b1; taskid += sb) {
            if (sge_task_depend_update(jep, alpp, taskid))
               ret = true;
         }
      }
   }
   
   DRETURN(ret);
}

/****************** qmaster/task/sge_task_depend_flush() **********************
*  NAME
*     sge_task_depend_flush() -- flush job array task dependencies
*
*  SYNOPSIS
*     bool sge_task_depend_flush(lListElem *jep, lList **alpp)
*
*  FUNCTION
*     This function clears the JB_ja_a_h_ids dependence cache when it
*     contains one or more task ranges and the job array dependency
*     predecessor list is empty (-hold_jid_ad option). It might also
*     release the JHELD flag of the JAT_state field for enrolled tasks
*     with MINUS_H_TGT_JA_AD in the JAT_hold field. Task mod events will
*     be generated if the enrolled task was updated.
*
*  INPUTS
*     lListElem *jep - JB_Type element
*     lList **alpp   - AN_Type list pointer
*
*  RESULT
*     bool - true if the job was modified, otherwise false
*
*  NOTES
*     It is useful to call this function when jobs are removed from the
*     JB_ja_ad_predecessor_list as a way of ensuring that dependence state
*     information in both structures is consistent. It is more efficient
*     to call this function than to update dependency information accross
*     the entire range of sub-tasks of job jep.
*
*     If the job was never an array successor, this function has no effect.
*
*     MT-NOTE: sge_task_depend_flush() is MT safe
*
******************************************************************************/
bool sge_task_depend_flush(lListElem *jep, lList **alpp)
{
   bool ret = false;

   DENTER(TOP_LAYER, "sge_task_depend_flush");

   /* this should not really be necessary */
   if (jep == NULL) {
      DPRINTF(("got NULL for job argument\n"));
      DRETURN(false);
   }

   /* ensure empty hold states are consistent. if the request list is empty
      then we don't want to modify array predecessors cache */
   if (lGetNumberOfElem(lGetList(jep, JB_ja_ad_request_list)) > 0 &&
       lGetNumberOfElem(lGetList(jep, JB_ja_ad_predecessor_list)) == 0) {
      lListElem *ja_task;  /* JAT_Type */
      if (lGetList(jep, JB_ja_a_h_ids)) {
         const lListElem *range;
         lList *a_h_ids = lCopyList("", lGetList(jep, JB_ja_a_h_ids));
         for_each(range, a_h_ids) {
            u_long32 rmin, rmax, rstep, hold_state;
            range_get_all_ids(range, &rmin, &rmax, &rstep);
            for ( ; rmin <= rmax; rmin += rstep) {
               hold_state = job_get_hold_state(jep, rmin);
               hold_state &= ~MINUS_H_TGT_JA_AD;
               job_set_hold_state(jep, alpp, rmin, hold_state);
            }
         }
         lFreeList(&a_h_ids);
         /* just make sure it is null */
         if (lGetList(jep, JB_ja_a_h_ids)) {
            a_h_ids = NULL;
            lXchgList(jep, JB_ja_a_h_ids, &a_h_ids);
            lFreeList(&a_h_ids);
         }
         ret = true;
      }
      /* unhold any arary held tasks that are enrolled */
      for_each(ja_task, lGetList(jep, JB_ja_tasks)) {
         u_long32 task_id = lGetUlong(ja_task, JAT_task_number);
         u_long32 hold_state = job_get_hold_state(jep, task_id);
         if ((hold_state & MINUS_H_TGT_JA_AD) != 0) {
            hold_state &= ~MINUS_H_TGT_JA_AD;
            job_set_hold_state(jep, alpp, task_id, hold_state);
            sge_add_jatask_event(sgeE_JATASK_MOD, jep, ja_task);
         }
      }
   }

   DRETURN(ret);
}


/********** qmaster/task/sge_task_depend_is_same_range() ****************
*  NAME
*     sge_task_depend_is_same_range() -- determine array job equivalence 
*
*  SYNOPSIS
*     bool sge_task_depend_is_same_range(const lListElem *suc_jep, 
                                         const lListElem *pre_jep) 
*
*  FUNCTION
*     This function determines if the job arguments are suitable predecessor
*     and successor jobs for an array dependency pair (-hold_jid_ad option).
*
*  INPUTS
*     lListElem *pre_jep - const JB_Type element
*     lListElem *suc_jep - const JB_Type element
*
*  RESULT
*     bool - true if jobs are compatible array jobs, otherwise false
*
*  MT-NOTE
*    sge_task_depend_is_same_range() is MT safe
*
******************************************************************************/
bool sge_task_depend_is_same_range(const lListElem *pre_jep, 
                                   const lListElem *suc_jep)
{
   u_long32 a0, a1, b0, b1, sa, sb;

   DENTER(TOP_LAYER, "sge_task_depend_is_same_range");

   /* equivalent jobs cannot be NULL */
   if (pre_jep == NULL || suc_jep == NULL) {
      DPRINTF(("got NULL pre_jep or suc_jep job argument\n"));
      DRETURN(false);
   }
   
   /* equivalent jobs must be array jobs (-t option) */
   if (!job_is_array(pre_jep) || !job_is_array(suc_jep)) {
      DRETURN(false);
   }
   
   /* fetch job submit ranges */
   job_get_submit_task_ids(pre_jep, &a0, &a1, &sa);
   job_get_submit_task_ids(suc_jep, &b0, &b1, &sb);

   /* equivalent jobs must have the same range of sub-tasks */
   if (a0 != b0 || a1 != b1) {
      DRETURN(false);
   }

   DRETURN(true);
}