File: test_sge_tq.c

package info (click to toggle)
gridengine 8.1.9%2Bdfsg-10
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 56,880 kB
  • sloc: ansic: 432,689; java: 87,068; cpp: 31,958; sh: 29,429; jsp: 7,757; perl: 6,336; xml: 5,828; makefile: 4,701; csh: 3,928; ruby: 2,221; tcl: 1,676; lisp: 669; yacc: 519; python: 503; lex: 361; javascript: 200
file content (246 lines) | stat: -rw-r--r-- 7,473 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
/*___INFO__MARK_BEGIN__*/
/*************************************************************************
 * 
 *  The Contents of this file are made available subject to the terms of
 *  the Sun Industry Standards Source License Version 1.2
 * 
 *  Sun Microsystems Inc., March, 2001
 * 
 * 
 *  Sun Industry Standards Source License Version 1.2
 *  =================================================
 *  The contents of this file are subject to the Sun Industry Standards
 *  Source License Version 1.2 (the "License"); You may not use this file
 *  except in compliance with the License. You may obtain a copy of the
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 * 
 *  Software provided under this License is provided on an "AS IS" basis,
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 *  See the License for the specific provisions governing your rights and
 *  obligations concerning the Software.
 * 
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 * 
 *   Copyright: 2009 by Sun Microsystems, Inc.
 * 
 *   All Rights Reserved.
 * 
 ************************************************************************/
/*___INFO__MARK_END__*/

#include <stdlib.h>
#include <stdio.h>
#include <fnmatch.h>

#include "uti/sge_rmon.h"
#include "uti/sge_tq.h"
#include "uti/sge_err.h"
#include "uti/sge_thread_ctrl.h"
#include "uti/sge_mtutil.h"

/*
 * Producer and consumer maximum should be a multiple of 2 and 3 
 */
#define TEST_SL_MAX_CONSUMER 24 
#define TEST_SL_MAX_PRODUCER 24 

#define TEST_SL_MAX_ELEMENTS 10000

struct _test_sl_thread_cp_t {
   pthread_mutex_t mutex;
   sge_tq_queue_t *queue;
   u_long32 counter;
   dstring sequence;
   volatile bool do_terminate;
};

typedef struct _test_sl_thread_cp_t test_sl_thread_cp_t;

void *
test_thread_consumer_template(void *arg, sge_tq_type_t type, const char *type_string) {
   void *ret = NULL;
   test_sl_thread_cp_t *global = (test_sl_thread_cp_t *)arg;

   DENTER(TOP_LAYER, "test_thread_consumer_main");
   while (global->do_terminate != true) { 
      const char *string;
     
      /* consume: first element */
      sge_tq_wait_for_task(global->queue, 1, type, (void **)&string);

      if (string != NULL) {
         if (fnmatch(type_string, string, 0) != 0) {
            fprintf(stderr, "got %s from queue and not %s\n", string, type_string);
            break;
         }
      } else {
         if (sge_thread_has_shutdown_started() == false) {  
            fprintf(stderr, "got NULL from queue although thread was not terminated\n");
            break;
         }
      }

      pthread_mutex_lock(&global->mutex);
      sge_dstring_append_char(&global->sequence, 'c');
      pthread_mutex_unlock(&global->mutex);
   }
   DRETURN(ret);
}

void *
test_thread_producer_template(void *arg, sge_tq_type_t type, const char *type_string) {
   void *ret = NULL;
   test_sl_thread_cp_t *global = (test_sl_thread_cp_t *)arg;

   DENTER(TOP_LAYER, "test_thread_producer_main");
   while (global->do_terminate != true) { 
      /* produce: new element */
      sge_tq_store_notify(global->queue, type, (void*)type_string);

      /* trigger termination */
      pthread_mutex_lock(&global->mutex);
      sge_dstring_append_char(&global->sequence, 'p');
      global->counter++;
      if (global->counter > TEST_SL_MAX_ELEMENTS) {
         global->do_terminate = true;
         sge_thread_notify_all_waiting();
      }
      pthread_mutex_unlock(&global->mutex);
   }
   DRETURN(ret);
}

void *
test_thread_consumer_type1(void *arg) {
   return test_thread_consumer_template(arg, SGE_TQ_TYPE1, "type_1");
}

void *
test_thread_consumer_type2(void *arg) {
   return test_thread_consumer_template(arg, SGE_TQ_TYPE2, "type_2");
}

void *
test_thread_consumer_unknown(void *arg) {
   return test_thread_consumer_template(arg, SGE_TQ_UNKNOWN, "type_?");
}

void *
test_thread_producer_type1(void *arg) {
   return test_thread_producer_template(arg, SGE_TQ_TYPE1, "type_1");
}

void *
test_thread_producer_type2(void *arg) {
   return test_thread_producer_template(arg, SGE_TQ_TYPE2, "type_2");
}

/*
 * Scenario: Producer - Consumer
 * - TEST_SL_MAX_CONSUMER consumer threads will be created
 * - TEST_SL_MAX_PRODUCER producer threads will be created
 * - consumer threads wait for an element in a global list
 * - producer threads put an element into the list
 * - consumer und producer append a c or p letter into a global string
 * - the producer creating the 1000th element triggers termination of threads
 * - global string contains execution sequence of p and c threads
 */
bool
test_mt_consumer_producer(void) {
   bool ret = true;
   test_sl_thread_cp_t global;
   const char *string;
   int i;
   char last = '\0';
   int switches = 0;

   DENTER(TOP_LAYER, "test_mt_consumer_producer");

   /* create a list */
   memset(&global, 0, sizeof(test_sl_thread_cp_t));
   global.do_terminate = false;
   global.counter = 0;
   pthread_mutex_init(&global.mutex, NULL);
   ret = sge_tq_create(&global.queue);

   /* spawn threads */
   if (ret) {
      pthread_t consumer[TEST_SL_MAX_CONSUMER];
      pthread_t producer[TEST_SL_MAX_PRODUCER];
      int i;

      for (i = 0; i < TEST_SL_MAX_CONSUMER; i++) {
         if (i < TEST_SL_MAX_CONSUMER / 3) {
            pthread_create(&(consumer[i]), NULL, test_thread_consumer_type1, &global);
         } else if (i < TEST_SL_MAX_CONSUMER * 2 / 3) {
            pthread_create(&(consumer[i]), NULL, test_thread_consumer_type2, &global);
         } else {
            pthread_create(&(consumer[i]), NULL, test_thread_consumer_unknown, &global);
         }
      }
      for (i = 0; i < TEST_SL_MAX_PRODUCER; i++) {
         if (i < TEST_SL_MAX_PRODUCER / 2) {
            pthread_create(&(producer[i]), NULL, test_thread_producer_type1, &global);
         } else {
            pthread_create(&(producer[i]), NULL, test_thread_producer_type2, &global);
         }
      }

      for (i = 0; i < TEST_SL_MAX_CONSUMER; i++) {
         pthread_join(consumer[i], NULL);
      }
      for (i = 0; i < TEST_SL_MAX_PRODUCER; i++) {
         pthread_join(producer[i], NULL);
      }
   }

   string = sge_dstring_get_string(&global.sequence);
   for (i = 0; i < strlen(string); i++) {
      if (last != string[i]) {
         switches++;
      }
      last = string[i];
   }


   /* following does not work in slow or virtual hosts */
#if 0
   {
      int max_possible_switches = TEST_SL_MAX_PRODUCER * TEST_SL_MAX_ELEMENTS / 2;
      int min = max_possible_switches / 33;
      int max = max_possible_switches - max_possible_switches / 33;

      /* 
       * check thread type execution sequence:
       * for this test we are happy if it is in min/max range 
       * plus/minus ~3%
       */
      if (switches < min || switches > max) {
         fprintf(stderr, "switch of thread type is out of range in %s(). "
                 "got %d but expected something in range [%d;%d]", 
                 SGE_FUNC, switches, min, max);
      }
   }
#endif

   /* cleanup */
   pthread_mutex_destroy(&global.mutex);
   ret &= sge_tq_destroy(&global.queue);

   DRETURN(ret);
}

int main(int argc, char *argv[]) {
   bool ret = true;

   DENTER_MAIN(TOP_LAYER, "test_sl");

   sge_err_init();

   ret &= test_mt_consumer_producer();

   DRETURN(ret == true ? 0 : 1);
}