File: seg_sge_module.c

package info (click to toggle)
globus-gram-job-manager-sge 1.5-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,488 kB
  • sloc: sh: 9,757; ansic: 1,016; perl: 893; makefile: 324
file content (1476 lines) | stat: -rw-r--r-- 39,840 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
/*
 * Sun Grid Engine Scheduler Event Generator implementation for GT4.
 *
 * See CREDITS file for attributions.
 * See LICENSE file for license terms.
 */

/* This #define is needed for the correct operation of the GLIBC strptime
 * function. */
#define _XOPEN_SOURCE 1

#include "globus_common.h"
#include "globus_scheduler_event_generator.h"
#include "version.h"

#include <string.h>

#define SEG_SGE_DEBUG(level, message) \
    GlobusDebugPrintf(SEG_SGE, level, message)

/* This error code is used to represent the
 * "we want to skip a log entry" state. */
#define SEG_SGE_SKIP_LINE -10

/* this is for read control when finding the 1st timestamp in the logfile */
#define SEG_SGE_FOUND_FILE_TIMESTAMP -20

/**
 * Debug levels:
 * If the environment variable SEG_SGE_DEBUG is set to a bitwise or
 * of these values, then a corresponding log message will be generated.
 */
typedef enum
{
    /**
     * Information of function calls and exits
     */
    SEG_SGE_DEBUG_INFO = (1<<0),
    /**
     * Warnings of things which may be bad.
     */
    SEG_SGE_DEBUG_WARN = (1<<1),
    /**
     * Fatal errors.
     */
    SEG_SGE_DEBUG_ERROR = (1<<2),
    /**
     * Details of function executions.
     */
    SEG_SGE_DEBUG_TRACE = (1<<3)
}
globus_l_seg_sge_debug_level_t;

enum
{
    SEG_SGE_ERROR_UNKNOWN = 1,
    SEG_SGE_ERROR_OUT_OF_MEMORY,
    SEG_SGE_ERROR_BAD_PATH,
    SEG_SGE_ERROR_LOG_PERMISSIONS,
    SEG_SGE_ERROR_LOG_NOT_PRESENT
};

/**
 * State of the SGE log file parser.
 *
 * RJP  Jan.2008 added 4 fields to handle file rotation
 *
 *
 */
typedef struct
{
    /** Path of the current log file being parsed */
    char *                              path;
    /** Timestamp of when to start generating events from */
    struct tm                           start_timestamp;
    /** Stdio file handle of the log file */
    FILE *                              fp;
    /** Buffer of log file data */
    char *                              buffer;
    /** Callback for periodic file polling */
    globus_callback_handle_t            callback;
    /** Length of the buffer */
    size_t                              buffer_length;
    /** Starting offset of valid data in the buffer. */
    size_t                              buffer_point;
    /** Amount of valid data in the buffer */
    size_t                              buffer_valid;
    /** simple test whether all we're looking for is the timestamp; */
    globus_bool_t                       need_timestamp;
    /** First timestamp in log-file */
    time_t                              file_timestamp;
    /** file rotation number at 1st read - assumes N+1 old files labeled 0,1,2,3,4,5,6,...,N */
    int                                 file_number;
    /** file inode for quick test of file rotation */
    int                                 file_inode;
    /**
     * Flag indicating a Log close event indicating that the current
     * log was found in the log
     */
    globus_bool_t                       end_of_log;
    /**
     * Flag inidicating that this logfile isn't the one corresponding to
     * today, so and EOF on it should require us to close and open a newer
     * one
     */
    globus_bool_t                       old_log;
    /**
     * Path to the directory where the SGE server log files are located
     */
    char *                              log_file;
} globus_l_sge_logfile_state_t;

static globus_mutex_t                   globus_l_sge_mutex;
static globus_cond_t                    globus_l_sge_cond;
static globus_bool_t                    shutdown_called;
static int                              callback_count;


/* Function signature declarations. */
/*  rjp Jan.2008 added 3 routines for handling file rotation */

GlobusDebugDefine(SEG_SGE);

static
int
globus_l_sge_module_activate(void);

static
int
globus_l_sge_module_deactivate(void);

static
void
globus_l_sge_read_callback(
	void *                              user_arg);

static
int
globus_l_sge_parse_events(
	globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_clean_buffer(
	globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_increase_buffer(
	globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_split_into_fields(
	globus_l_sge_logfile_state_t *      state,
	char ***                            fields,
	size_t *                            nfields);

static
int
globus_l_sge_find_logfile(
	globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_set_logfile_name(
	 globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_check_rotated(
        globus_l_sge_logfile_state_t * state);

static
int
globus_l_sge_get_file_timestamp(
        globus_l_sge_logfile_state_t * state);


/**** RJP 4.2 change -- replace aobve with this  *****/

GlobusExtensionDefineModule(globus_seg_sge) =
{
  "globus_seg_sge",
    globus_l_sge_module_activate,
    globus_l_sge_module_deactivate,
    NULL,
    NULL,
    &local_version

};

/**************End 4.2 Change  ******************/


/* This function will be used by the SEG calling code to
 * initialize this module. */
static
int
globus_l_sge_module_activate(void)
{
    time_t                              timestamp_val;
    globus_l_sge_logfile_state_t *      logfile_state;
    int                                 rc;
    globus_reltime_t                    delay;
    char                               *globus_sge_conf= NULL;
    char                               *sge_config = NULL;
    char                               *sge_root = NULL, *sge_cell = NULL;
    globus_result_t                     result;

    rc = globus_module_activate(GLOBUS_COMMON_MODULE);
    if (rc != GLOBUS_SUCCESS)
    {
	goto error;
    }
    rc = globus_mutex_init(&globus_l_sge_mutex, NULL);

    if (rc != GLOBUS_SUCCESS)
    {
	goto deactivate_common_error;
    }
    rc = globus_cond_init(&globus_l_sge_cond, NULL);
    if (rc != GLOBUS_SUCCESS)
    {
	goto destroy_mutex_error;
    }
    shutdown_called = GLOBUS_FALSE;
    callback_count = 0;

    GlobusDebugInit(
	    SEG_SGE,
	    SEG_SGE_DEBUG_INFO
	    SEG_SGE_DEBUG_WARN
	    SEG_SGE_DEBUG_ERROR
	    SEG_SGE_DEBUG_TRACE);

    logfile_state = calloc(1, sizeof(globus_l_sge_logfile_state_t));

    if (logfile_state == NULL)
    {
        rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	goto destroy_cond_error;
    }

    rc = globus_l_sge_increase_buffer(logfile_state);
    if (rc != GLOBUS_SUCCESS)
    {
	goto free_logfile_state_error;
    }

    /* Configuration info */
    result = globus_scheduler_event_generator_get_timestamp(&timestamp_val);

    if (result != GLOBUS_SUCCESS)
    {
	goto free_logfile_state_buffer_error;
    }

    if (timestamp_val != 0)
    {
	if (globus_libc_localtime_r(&timestamp_val,
		    &logfile_state->start_timestamp) == NULL)
	{
	    goto free_logfile_state_buffer_error;
	}
    }

    result = globus_eval_path(
            "${sysconfdir}/globus/globus-sge.conf",
            &globus_sge_conf);

    if (result != GLOBUS_SUCCESS)
    {
        goto free_logfile_state_buffer_error;
    }

    result = globus_common_get_attribute_from_config_file(
	    "",
	    globus_sge_conf,
	    "log_path",
	    &logfile_state->log_file);

    /* Same algorithm for missing logfile finding as seg.pm:
     * 1. If sge_root and sge_cell are set in globus-sge.conf, use those
     * 2. If sge_config is set in globus-sge.conf, source it and echo out
     *    $SGE_ROOT and $SGE_CELL as needed
     * 3. If $SGE_ROOT or $SGE_CELL is set in environment, use them
     */
    if (result != GLOBUS_SUCCESS
        || logfile_state->log_file == NULL
        || logfile_state->log_file[0] == '\0')
    {
        globus_common_get_attribute_from_config_file(
            "",
	    globus_sge_conf,
            "sge_root",
            &sge_root);

        if (sge_root != NULL && strcmp(sge_root, "undefined") == 0)
        {
            free(sge_root);
            sge_root = NULL;
        }

        globus_common_get_attribute_from_config_file(
            "",
	    globus_sge_conf,
            "sge_cell",
            &sge_cell);
        if (sge_cell != NULL && strcmp(sge_cell, "undefined") == 0)
        {
            free(sge_cell);
            sge_cell = NULL;
        }

        globus_common_get_attribute_from_config_file(
            "",
	    globus_sge_conf,
            "sge_config",
            &sge_config);

        if (sge_root == NULL || sge_cell == NULL)
        {
            if (sge_config != NULL)
            {
                FILE *tf;
                int tfd;
                char *cmd;
                struct stat st;

                tf = tmpfile();
                if (tf)
                {
                    tfd = fileno(tf);
                    if (tfd > -1)
                    {
                        cmd = globus_common_create_string(
                            ". \"%s\" && printf \"$SGE_ROOT\\n$SGE_CELL\\n\" 1>&%d",
                            sge_config,
                            tfd);
                        system(cmd);
                        free(cmd);
                        fstat(tfd, &st);
                        rewind(tf);

                        if (sge_root == NULL)
                        {
                            sge_root = malloc((size_t) st.st_size);

                            if (sge_root)
                            {
                                fgets(sge_root, (int) st.st_size, tf);
                                sge_root[strlen(sge_root)-1] = 0;
                            }
                            else
                            {
                                fscanf(tf, "%*[^\n]\n");
                            }
                        }
                        else
                        {
                            fscanf(tf, "%*[^\n]\n");
                        }
                        if (sge_cell == NULL)
                        {
                            sge_cell = malloc((size_t) st.st_size);

                            if (sge_cell)
                            {
                                fgets(sge_cell, (int) st.st_size, tf);
                                sge_cell[strlen(sge_cell)-1] = 0;
                            }
                            else
                            {
                                fscanf(tf, "%*[^\n]\n");
                            }
                        }
                        else
                        {
                            fscanf(tf, "%*[^\n]\n");
                        }
                    }
                    fclose(tf);
                }
            }
            if (sge_root == NULL)
            {
                char * tmp = getenv("SGE_ROOT");

                if (tmp)
                {
                    sge_root = strdup(tmp);
                }
            }
            if (sge_cell == NULL)
            {
                char * tmp = getenv("SGE_CELL");

                if (tmp)
                {
                    sge_cell = strdup(tmp);
                }
            }
        }
        if (sge_root == NULL || sge_cell == NULL)
        {
            rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
            goto free_sge_cell;
        }
        logfile_state->log_file = globus_common_create_string(
            "%s/%s/common/reporting",
            sge_root, sge_cell);

        if (logfile_state->log_file == NULL)
        {
            rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
            goto free_sge_cell;
        }

        free(sge_root);
        sge_root = NULL;
        free(sge_cell);
        sge_cell = NULL;
    }
    if (logfile_state->log_file == NULL)
    {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		("unable to find log file in configuration\n"));
	goto free_logfile_state_buffer_error;
    }

    /* Locate our logfile.
     * Other DRMs need to know the current time to determine which
     * logfile to inspect.  SGE just keeps a single large 'reporting' log. */

    /* --- Above is true but we've implemented file rotation
     * within the finding logfile routine   rjp Jan.2008
     */

    rc = globus_l_sge_find_logfile(logfile_state);
    if (rc == GLOBUS_SUCCESS)
    {
	logfile_state->fp = fopen(logfile_state->path, "r");

	if (logfile_state->fp == NULL)
	{
	    rc = SEG_SGE_ERROR_OUT_OF_MEMORY;

	    goto free_logfile_state_path_error;
	}
	GlobusTimeReltimeSet(delay, 0, 0);
    }
    else if(rc == SEG_SGE_ERROR_LOG_NOT_PRESENT)
    {
	GlobusTimeReltimeSet(delay, 1, 0);
    }
    else
    {
	goto free_logfile_state_path_error;
    }

    /* Setup a callback so that our main read function will be
     * invoked at a later time.
     */
    result = globus_callback_register_oneshot(
	    NULL,
	    &delay,
	    globus_l_sge_read_callback,
	    logfile_state);
    if (result != GLOBUS_SUCCESS)
    {
	goto free_logfile_state_path_error;
    }
    callback_count++;

    return 0;

free_sge_cell:
    if (sge_cell != NULL)
    {
        free(sge_cell);
    }
free_sge_root:
    if (sge_root != NULL)
    {
        free(sge_root);
    }
free_logfile_state_path_error:
    if (logfile_state->path)
    {
	globus_libc_free(logfile_state->path);
    }
    if (logfile_state->log_file)
    {
	globus_libc_free(logfile_state->log_file);
    }
free_logfile_state_buffer_error:
    globus_libc_free(logfile_state->buffer);
    if (globus_sge_conf != NULL)
    {
        free(globus_sge_conf);
    }
free_logfile_state_error:
    globus_libc_free(logfile_state);
destroy_cond_error:
    globus_cond_destroy(&globus_l_sge_cond);
destroy_mutex_error:
    globus_mutex_destroy(&globus_l_sge_mutex);
deactivate_common_error:
    globus_module_deactivate(GLOBUS_COMMON_MODULE);
error:
    return 1;
}
/* globus_l_sge_module_activate() */

/* This function is called before we are shut down so that we can
 * clean up properly. */
static
int
globus_l_sge_module_deactivate(void)
{
    globus_mutex_lock(&globus_l_sge_mutex);
    shutdown_called = GLOBUS_TRUE;

    while (callback_count > 0)
    {
	globus_cond_wait(&globus_l_sge_cond, &globus_l_sge_mutex);
    }
    globus_mutex_unlock(&globus_l_sge_mutex);

    GlobusDebugDestroy(SEG_SGE);

    globus_module_deactivate(GLOBUS_COMMON_MODULE);

    return 0;
}

/*
 * This is our master read function.  It will be called periodically
 * as a result of a previous globus_callback_register_oneshot() invocation.
 */
static
void
globus_l_sge_read_callback(
	void *                              user_arg)
{
    int                                 rc;
    struct stat                         s;
    globus_l_sge_logfile_state_t *      state = user_arg;
    size_t                              max_to_read;
    globus_bool_t                       eof_hit = GLOBUS_FALSE;
    globus_reltime_t                    delay;
    globus_result_t                     result;


    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_read_callback() invoked.\n"));

    globus_mutex_lock(&globus_l_sge_mutex);
    if (shutdown_called)
    {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("polling while deactivating"));

	globus_mutex_unlock(&globus_l_sge_mutex);
	goto error;
    }
    globus_mutex_unlock(&globus_l_sge_mutex);

    /* file may not have existed earlier  rjp Jan.2008 */
    if(state->fp == NULL)
    {
        if( state->path == NULL )
	{
  	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("no file name available"));
            goto error;
        }
	else
	{
            rc = stat(state->path,&s);
	    if(rc == 0)
	      {
	        SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("opening file in callback"));
                state->fp = fopen(state->path,"r");
                state->file_inode = s.st_ino;
	      }
	}
    }


    /* Provided that we have an open log filehandle.. */
    if (state->fp != NULL)
    {
        /* Calculate how much data will fit within the read-buffer. */
	max_to_read = state->buffer_length - state->buffer_valid
	    - state->buffer_point;

	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
	    ("Reading a maximum of %u bytes from SGE reporting file = %s\n",
		max_to_read, state->path));

	/* Actually perform the read. */
	rc = fread(state->buffer + state->buffer_point +
		state->buffer_valid, 1, max_to_read, state->fp);

	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
	    ("Read %d bytes\n", rc));

	/* If we haven't read the most we could, we have either: */
	if (rc < max_to_read)
	{
	    /* Reached the end of the file..*/
	    if (feof(state->fp))
	    {
	        SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("Reached EOF\n"));
		eof_hit = GLOBUS_TRUE;
	  	clearerr(state->fp);
	    }
	    else
	    {
		/* Or something bad has happened.
		 * This error state is currently unhandled... */

		/* XXX: Read error */
	    }
	}

	/* Update our state to record that we've added more valid data
	 * to the buffer. */
	state->buffer_valid += rc;

	/* Parse data.  This function will also generate event
	 * notifications and send them to the main server. */
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("Parsing events in buffer.\n"));
	rc = globus_l_sge_parse_events(state);

	/* Move any remaining log data to the start of the buffer,
	 * overwriting any old log data that we have already parsed. */
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
	    ("Cleaning buffer of parsed events.\n"));
	rc = globus_l_sge_clean_buffer(state);

    }


    if( (eof_hit == GLOBUS_TRUE) )
      {

        /* Here we hand log-rotation possibility - by resetting file_number
         *  1. check to see if log has been rotated and
         *  2. reset file_number so that next file opened will
         *  be correctly identified  rjp Jan.2008
         */
        rc = globus_l_sge_check_rotated(state);

        if(rc != 0)       /* file has been rotated */
	  {
	    state->file_number++;
            state->old_log = GLOBUS_TRUE;
	  }

        if(state->old_log)
	  {
           if(state->fp)
	     {
  	      fclose(state->fp);
              state->fp = NULL;
	     }

	   /* decrement file number.
	    * Note if file was rotated while open,
            * the above increment of file number
            * allows this to work   rjp Jan.2008
            */

           state->file_number--;
           rc = globus_l_sge_set_logfile_name(state);

           rc = stat(state->path,&s);
           if(rc == 0)
	     {
               state->fp = fopen(state->path,"r");
               state->file_inode = s.st_ino;
	     }

           if(state->fp != NULL)
	     {
	       /* we got a new file */
               eof_hit = GLOBUS_FALSE;
	     }
	  }
      }


      /* Determine if we have reached the EOF on the logfile.
       * If we have, set a moderately long delay.
       * If not, set  zero delay so we can read the rest! */

    if (eof_hit == GLOBUS_TRUE || state->fp == NULL)
    {
	GlobusTimeReltimeSet(delay, 2, 0);
    }
    else
    {
	GlobusTimeReltimeSet(delay, 0, 0);
    }


    /* Make the call to get ourselves invoked again. */
    /* rjp --> this used to include a pointer to the callback in the logfile_state struct.
     * as &state->callback, But this causes a memory leak. Removed and put to NULL */
    result = globus_callback_register_oneshot(
    	    NULL,
	    &delay,
	    globus_l_sge_read_callback,
	    state);

    if (result != GLOBUS_SUCCESS)
    {
	goto error;
    }

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	  ("globus_l_sge_read_callback() exited with/success \n"));

    return;
error:
    globus_mutex_lock(&globus_l_sge_mutex);
    if (shutdown_called)
    {
	callback_count--;

	if (callback_count == 0)
	{
	    globus_cond_signal(&globus_l_sge_cond);
	}
    }
    else
    {
        fprintf(stderr,
                "FATAL: Unable to register callback. SGE SEG exiting\n");
        exit(EXIT_FAILURE);
    }
    globus_mutex_unlock(&globus_l_sge_mutex);

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_read_callback() exited with/error\n"));
    return;
}
/* globus_l_sge_read_callback() */

/**
 * Determine the SGE log file name.
 * This is actually really easy for SGE, because the filename doesn't change --
 * it'll always be called 'reporting' and we'll already have the
 * exact path to use.
 *
 * above is now modified for simple reporting file rotation: rjp Jan.2008
 *
 * @param state
 *     SGE log state structure. The path field of the structure may be
 *     modified by this function.
 *
 * @retval GLOBUS_SUCCESS
 *     Name of an log file name has been found and the file exists.
 * @retval 1
 *     Something bad occurred.
 */
static
int
globus_l_sge_find_logfile(
	globus_l_sge_logfile_state_t *      state)
{
    struct stat                         s;
    int                                 rc;
    time_t                              stamp;
    globus_bool_t                       file_found = GLOBUS_FALSE;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_find_logfile()\n"));

    if (state->path == NULL)
    {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("allocating path\n"));
	state->path = malloc(strlen(state->log_file) + 10);
	if (state->path == NULL)
	{
	    rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	    goto error;
	}
    }


    /* log_file contains string of base file name including path */
    /* Simply copy the path string from log_file to path. */

    stamp = mktime(&state->start_timestamp);
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("input timestamp = %d\n",stamp));

    state->file_number=-1;
    while(!file_found)
      {
         SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                      ("find file loop with file_number = %d\n",
                       state->file_number));

        rc = globus_l_sge_set_logfile_name(state);
        if (rc < 0)
	  {
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN, ("couldn't format string\n"));
	    rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	    goto error;
	  }

        rc = stat(state->path, &s);
	if(rc == 0)
	  {
            rc = globus_l_sge_get_file_timestamp(state);
	  } else {
            SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                         ("file = %s not found\n",state->path));
            if(state->file_number >= 0)
              {
                /* rjp Jan.2008
                   here we assume input timestamp is earlier than
                   the chain of rotated log files. Back it up 1.
                */
	        state->file_number--;
                rc = globus_l_sge_set_logfile_name(state);
                file_found = GLOBUS_TRUE;

              } else {
                /* it's possible the direct file (file_number = -1)
                   doesn't exist yet so set to skip over (see next if/else) */
                state->file_timestamp = 0;
	      }
	  }

        if( state->file_timestamp > 0 && state->file_timestamp < stamp )
	  {
            SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
                         ("found our file = %s with Timestamp %d \n",
                          state->path,state->file_timestamp));
            file_found=GLOBUS_TRUE;
	  }
	else
	  {
            SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
                         ("Not file to use = %s with Timestamp %d \n",
                          state->path,state->file_timestamp));

	    /* next file */
            state->file_number++;
	
	    /** now it;s possible under quick file rotations that no
             *  timestamp is put in the file, thus state->file_timestamp = 0.
             *  In this case, as written above, we'll appropriately skip
             *  that file:   rjp Jan.2008
             */
	  }
      }

    rc = stat(state->path, &s);
    state->file_inode = s.st_ino;

    if (rc < 0)
    {
	switch (errno)
	{
	    case ENOENT:
		/* Doesn't exist,
		*/
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("file %s doesn't exist\n", state->path));

		break;

	    case EACCES:
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("permissions needed to access logfile %s\n",
			 state->path));
		/* Permission problem (fatal) */
		rc = SEG_SGE_ERROR_LOG_PERMISSIONS;
		goto error;

	    case ENOTDIR:
	    case ELOOP:
	    case ENAMETOOLONG:
		/* broken path (fatal) */
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("broken path to logfile %s\n",
			 state->path));
		rc = SEG_SGE_ERROR_BAD_PATH;
		goto error;

	    case EFAULT:
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("bad pointer\n"));
		globus_assert(errno != EFAULT);

	    case EINTR:
	    case ENOMEM:

	    default:
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("unexpected errno\n"));
		rc = SEG_SGE_ERROR_UNKNOWN;
		goto error;
	}
    }

    if (rc != 0)
    {
        state->file_inode = 0;
	goto error;
    }

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_find_logfile() exits w/out error\n"));
    return 0;

error:
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_find_logfile() exits w/error\n"));
    return rc;
}
/* globus_l_sge_find_logfile() */


/**
 *  rjp Jan.2008
 *  routine to set the file name based on the file rotation model.
 *  Here simply all rotated files have '.file_number' extension. If other
 *  models are defined, change this routine accordingly
 *
 **/
static
int
globus_l_sge_set_logfile_name(
	globus_l_sge_logfile_state_t *      state)
{

    int                                 rc;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
                 ("globus_l_sge_set_logfile_name()\n"));
    if( state->file_number < 0)
      {
         SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                      ("non-rotated file number \n"));
         rc = sprintf(state->path,"%s",state->log_file);
         state->old_log = GLOBUS_FALSE;
      }
    else
      {
 	 SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                      ("rotated file file_number >= 0\n"));
         rc = sprintf(state->path,"%s%s%d",state->log_file,".",state->file_number);
         state->old_log = GLOBUS_TRUE;
      }

    return rc;
}
/* globus_l_sge_set_logfile_name */

/**
 * Move any data in the state buffer to the beginning, to enable reusing
 * buffer space which has already been parsed.
 */
static
int
globus_l_sge_clean_buffer(
	globus_l_sge_logfile_state_t *      state)
{
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_clean_buffer() called\n"));

    /* move data to head of buffer */
    if (state->buffer != NULL)
    {
	if(state->buffer_point > 0)
	{
	    if (state->buffer_valid > 0)
	    {
		memmove(state->buffer,
			state->buffer+state->buffer_point,
			state->buffer_valid);
	    }
	    state->buffer_point = 0;
	}
    }
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_clean_buffer() exits\n"));
    return 0;
}
/* globus_l_sge_clean_buffer() */

/**
 * Reduce unused space in the log buffer, increasing the size of the buffer
 * if it is full.
 *
 * @param state
 *     SGE log state structure. The buffer-related fields of the structure
 *     may be modified by this function.
 */
static
int
globus_l_sge_increase_buffer(
	globus_l_sge_logfile_state_t *      state)
{
    char *                              save = state->buffer;
    const size_t                        GLOBUS_SGE_READ_BUFFER_SIZE = 4096;
    int                                 rc;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_increase_buffer() called\n"));

    /* If the buffer is full of valid data, enlarge it! */
    if (state->buffer_valid == state->buffer_length)
    {
	state->buffer = globus_libc_realloc(state->buffer,
		state->buffer_length + GLOBUS_SGE_READ_BUFFER_SIZE);
	if (state->buffer == NULL)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR, ("realloc() failed\n"));

	    rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	    goto error;
	}
    }

    state->buffer_length += GLOBUS_SGE_READ_BUFFER_SIZE;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_increase_buffer() exits w/success\n"));
    return 0;

error:
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_increase_buffer() exits w/failure\n"));
    state->buffer = save;
    return rc;
}
/* globus_l_sge_increase_buffer() */


/**
 *
 *  Simple routine to check inode number to see if it has changed.
 *  If so we assume file has been rotated
 *
 **/

static
int
globus_l_sge_check_rotated(globus_l_sge_logfile_state_t * state)
{

  int                            rc;
  struct stat s;

  SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_check_rotated() invoked.\n"));

  rc = stat(state->path,&s);
  if(s.st_ino != state->file_inode)
     {
        SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("file has been rotated().\n"));
        return 1;
     }

  SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_check_rotated() exit.\n"));
  return 0;
}
/* globus_l_sge_check_rotated */


/* This function's job is to parse any whole events from our read buffer,
 * generate state update messages and deliver them to the main process.
 *
 * It's now also used to grab the 1st timestamped entry in the reporting file
 * when file rotation is activated . rjp Jan.2008
 *
 * The format of the reporting file is indicated in the SGE documentation. */
static
int
globus_l_sge_parse_events(
	globus_l_sge_logfile_state_t *      state)
{
    char *                              eol;
    char *                              rp;
    struct tm                           tm;
    time_t                              stamp;
    char **                             fields = NULL;
    size_t                              nfields;
    time_t                              when;
    int                                 rc;
    int                                 exit_status;
    int                                 status;
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_parse_events() called\n"));

    status = 0;

    /* Find the next newline */
    while ( (status != SEG_SGE_FOUND_FILE_TIMESTAMP) &&
            (eol = memchr(state->buffer + state->buffer_point,
		    '\n',
		    state->buffer_valid)) != NULL)
    {
	/* Replace the EOL character with a NULL terminator. */
	*eol = '\0';

	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		("parsing line %s\n", state->buffer + state->buffer_point));

	rc = globus_l_sge_split_into_fields(state, &fields, &nfields);

	/* If split_into_fields fails, ignore the line.*/
	if (rc != GLOBUS_SUCCESS)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		    ("Failed to parse line %s\n",
		     state->buffer + state->buffer_point));
	    goto free_fields;
	}

	/* If the first character is a '#', ignore the line. */
	if (strstr(fields[0], "#") == fields[0]) {
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		    ("Line '%s' is a comment, skipping.\n",
		     state->buffer + state->buffer_point));
	    goto free_fields;
 	}

	/* If the number of fields is < 14, ignore the line. */
	/* This is a safety check -- we will quite happily access fields[13]
	 * after this point. */
	if (nfields < 14)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		    ("too few fields, freeing and getting next line\n"));
	    goto free_fields;
	}

	/* Extract the timestamp from the first field. */
	/* (rp is a pointer to the symbol immediately following the timestamp.) */
	rp = strptime(fields[0],"%s", &tm);

	if (rp == NULL || (*rp) != '\0')
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		    ("Unable to extract timestamp from first field in line '%s'\n",
		     state->buffer + state->buffer_point));
	    goto free_fields;
	}
	stamp = mktime(&tm);
	if (stamp == -1)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		    ("mktime generated invalid timestamp\n"));
	    goto free_fields;
	}

	/* for getting file-timestamp only  rjp Jan.2008 */
        if(state->file_timestamp == 0)
	  {
	    state->file_timestamp = stamp;
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		    ("  Setting the file timestamp to %d\n",state->file_timestamp));
            if(state->need_timestamp)
	      {
                status = SEG_SGE_FOUND_FILE_TIMESTAMP; /* will kick out of loop */
	        goto free_fields;
	      }
	  }

	when = mktime(&state->start_timestamp);

	if (stamp < when)
	{
	    /* Skip messages which are before our start timestamp */
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		    ("Skipping entry as timestamp %d is before checkpoint %d\n",
		     stamp, when));
	    status = SEG_SGE_SKIP_LINE;
	    goto free_fields;
	}

	/* Batch accounting: resources consumed by the job  */
	if (strstr(fields[1], "acct") == fields[1])
	{
            char * job_id;
	    int failed;
	    /* From the SGE 'reporting' man page:
	     *
	     * failed:
	     * Indicates the problem which occurred in case a job could not  be
	     * started on the execution host (e.g. because the owner of the job
	     * did not have a valid account on that machine).  If  Grid  Engine
	     * tries  to  start a job multiple times, this may lead to multiple
	     * entries in the accounting file corresponding to the same job ID.
	     *
	     * exit status:
	     * Exit status of the job script (or Grid Engine specific status in
	     * case of certain error conditions)
	     */

	    /* Lookup the exit status of the job. */
	    rc = sscanf(fields[13], "%d", &failed);
	    rc = sscanf(fields[14], "%d", &exit_status);

            job_id = globus_common_create_string(
                    "%s.%s",
                    fields[7], nfields > 37 ? fields[37] : "0");

	    /* Return a job failure event if the exit status is non-zero. */
	    if ( failed != 0)
	    {
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s has failed with exit status %d.\n",
			 job_id, exit_status));
		rc = globus_scheduler_event_failed(stamp, job_id, failed);
	    }
	    else
	    {
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s has done with exit status %d.\n",
			 job_id, exit_status));
		rc = globus_scheduler_event_done(stamp, job_id, exit_status);
	    }
            free(job_id);
	}
	else if (strstr(fields[1], "job_log") == fields[1])
	{
            char * job_id;

	    /* Job state change. */
	    if (strstr(fields[3], "pending") == fields[3])
	    {
                job_id = globus_common_create_string(
                        "%s.%s", fields[4], fields[5]);

		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s now pending at t=%d\n",
                            job_id, stamp));
		rc = globus_scheduler_event_pending(stamp, job_id);
                free(job_id);
	    }
	    else if (strstr(fields[3], "delivered") == fields[3])
	    {
                job_id = globus_common_create_string(
                        "%s.%s", fields[4], fields[5]);
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s now active at t=%d\n",
                            job_id, stamp));
		rc = globus_scheduler_event_active(stamp, job_id);
                free(job_id);
	    }
	    else if (strstr(fields[3], "deleted") == fields[3])
	    {
                job_id = globus_common_create_string(
                        "%s.%s", fields[4], fields[5]);
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s now completed at t=%d\n",
                        job_id,
                        stamp));
		rc = globus_scheduler_event_done(stamp, job_id, 0);
                free(job_id);
	    }
	}

free_fields:
	if (fields != NULL)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
		    ("freeing fields\n"));
	    globus_libc_free(fields);
	    fields = NULL;
	}

	state->buffer_valid -= eol + 1 - state->buffer - state->buffer_point;
	state->buffer_point = eol + 1 - state->buffer;

    }

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_parse_events() exits\n"));
    return status;
}
/* globus_l_sge_parse_events() */

/**
 *
 *  Routine added for handling file rotation. rjp Jan.2008
 *
 */
static
int
globus_l_sge_get_file_timestamp(globus_l_sge_logfile_state_t* state)
{

    globus_bool_t    eof_hit = GLOBUS_FALSE;
    int              max_to_read;
    int              rc;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_get_file_timestamp() invoked.\n"));

    if(state->fp != NULL)
      {
        fclose(state->fp);
        state->fp = NULL;
      }

    state->fp = fopen(state->path,"r");

    if(state->fp == NULL)
      {
        SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
                     ("   unable to open file name = %s\n",state->path));
        goto error;
      }

    /* start with an empty buffer */
    state->buffer_point = 0;
    state->buffer_valid = 0;
    state->need_timestamp = GLOBUS_TRUE;
    state->file_timestamp = 0;

    while ( state->file_timestamp == 0  && !eof_hit )
      {
         /* Calculate how much data will fit within the read-buffer. */
         max_to_read = state->buffer_length - state->buffer_valid
                   - state->buffer_point;

         SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                   ("Reading a maximum of %u bytes from SGE reporting file\n",
                     max_to_read));

        /* Actually perform the read. */
         rc = fread(state->buffer + state->buffer_point +
          	  state->buffer_valid, 1, max_to_read, state->fp);

        SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                   ("Read %d bytes\n", rc));
	/* If we haven't read the most we could, we have either: */

	if (rc < max_to_read)
	   {
	     /* Reached the end of the file or some other problem - either way assume EOF */
	     eof_hit = GLOBUS_TRUE;
	   }

        state->buffer_valid += rc;
        /* try to find the file timestamp inside the buffer */
        rc = globus_l_sge_parse_events(state);

        SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                   ("     Cleaning buffer of parsed events.\n"));
        rc = globus_l_sge_clean_buffer(state);
      }


    if(state->fp != NULL)
       {
         fclose(state->fp);
         state->fp = NULL;
       }

    /* End with an empty buffer */
    state->buffer_point = 0;
    state->buffer_valid = 0;
    state->need_timestamp = GLOBUS_FALSE;

    if(state->file_timestamp == 0 )
      {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                     (" Could not get timestamp from file "));
        return -1;
      }

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
                 ("globus_l_sge_get_file_timestamp() exit.\n"));
    return  0;

 error:
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
                 ("Get Timestamp Problem opening file %s\n",state->path));
    return -1;
}
/* globus_l_sge_get_file_timestamp */


/**
 * @param state
 *     Log state structure. The string pointed to by
 *     state-\>buffer + state-\>buffer_point is modified
 * @param fields
 *     Modified to point to a newly allocated array of char * pointers which
 *     point to the start of each field within the state buffer block.
 * @param nfields
 *     Modified value pointed to by this will contain the number of fields in
 *     the @a fields array after completion.
 */
static
int
globus_l_sge_split_into_fields(
	globus_l_sge_logfile_state_t *      state,
	char ***                            fields,
	size_t *                            nfields)
{
    size_t                              i = 0;
    size_t                              cnt = 1;
    char *                              tmp;
    int                                 rc;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_split_into_fields()\n"));

    *fields = NULL;
    *nfields = 0;

    tmp = state->buffer + state->buffer_point;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("splitting %s\n", tmp));

    while (*tmp != '\0')
    {
	if (*tmp == ':')
	{
	    cnt++;
	}
	tmp++;
    }
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("%u fields\n", cnt));

    *fields = globus_libc_calloc(cnt, sizeof(char **));

    if (*fields == NULL)
    {
	rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	goto error;
    }
    *nfields = cnt;

    tmp = state->buffer + state->buffer_point;

    (*fields)[i++] = tmp;

    while (*tmp != '\0' && i < cnt)
    {
	if (*tmp == ':')
	{
	    (*fields)[i++] = tmp+1;
	    *tmp = '\0';
	}
	tmp++;
    }

#   if BUILD_DEBUG
    {
	for (i = 0; i < cnt; i++)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("field[%u]=%s\n",
			i, (*fields)[i]));
	}
    }
#   endif

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_split_into_fields(): exit success\n"));

    return 0;

error:
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_split_into_fields(): exit failure: %d\n", rc));
    return rc;;
}
/* globus_l_sge_split_into_fields() */