File: threaded.c

package info (click to toggle)
openhpi 2.14.1-1.2
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 20,380 kB
  • sloc: ansic: 187,087; cpp: 32,188; sh: 10,415; makefile: 4,467; perl: 1,529
file content (278 lines) | stat: -rw-r--r-- 9,350 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/*      -*- linux-c -*-
 *
 * (C) Copyright IBM Corp. 2005-2006
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  This
 * file and program are licensed under a BSD style license.  See
 * the Copying file included with the OpenHPI distribution for
 * full licensing terms.
 *
 * Author(s):
 *      Renier Morales <renier@openhpi.org>
 *
 */

#include <oh_threaded.h>
#include <oh_config.h>
#include <oh_plugin.h>
#include <oh_hotswap.h>
#include <oh_error.h>

#define OH_DISCOVERY_THREAD_SLEEP_TIME 180 * G_USEC_PER_SEC
#define OH_EVTGET_THREAD_SLEEP_TIME 3 * G_USEC_PER_SEC

GCond *oh_evtget_thread_wait = NULL;
GThread *oh_evtget_thread = NULL;
GError *oh_evtget_thread_error = NULL;
GMutex *oh_evtget_thread_mutex = NULL;
GStaticMutex oh_wake_evtget_mutex = G_STATIC_MUTEX_INIT;

GThread *oh_evtpop_thread = NULL;
GError *oh_evtpop_thread_error = NULL;
GMutex *oh_evtpop_thread_mutex = NULL;

GThread *oh_discovery_thread = NULL;
GError *oh_discovery_thread_error = NULL;
GMutex *oh_discovery_thread_mutex = NULL;
GCond *oh_discovery_thread_wait = NULL;
GStaticMutex oh_wake_discovery_mutex = G_STATIC_MUTEX_INIT;

static int oh_discovery_init(void)
{
        /* Nothing to do here...for now */
        return 0;
}

static int oh_discovery_final(void)
{
        g_mutex_free(oh_discovery_thread_mutex);
        g_cond_free(oh_discovery_thread_wait);

        return 0;
}

static int oh_event_final(void)
{
        /*g_async_queue_unref(oh_process_q);*/
        g_mutex_free(oh_evtget_thread_mutex);
        g_cond_free(oh_evtget_thread_wait);
        g_mutex_free(oh_evtpop_thread_mutex);

        return 0;
}

static gpointer oh_discovery_thread_loop(gpointer data)
{
        GTimeVal time;
        SaErrorT error = SA_OK;

        g_mutex_lock(oh_discovery_thread_mutex);
        while (1) {
                dbg("Doing threaded discovery on all handlers");
                error = oh_discovery();
                if (error) {
                        dbg("Got error on threaded discovery return.");
                }

                /* Let oh_wake_discovery_thread know this thread is done */
                g_cond_broadcast(oh_discovery_thread_wait);
                g_get_current_time(&time);
                g_time_val_add(&time, OH_DISCOVERY_THREAD_SLEEP_TIME);
                /* Go to sleep; let oh_wake_discovery_thread take the mutex */
                dbg("Going to sleep");
                if (g_cond_timed_wait(oh_discovery_thread_wait,
                                      oh_discovery_thread_mutex, &time))
                        dbg("SIGNALED: Got signal from saHpiDiscover()");
                else
                        dbg("TIMEDOUT: Woke up, am doing discovery again");
        }
        g_mutex_unlock(oh_discovery_thread_mutex);
        g_thread_exit(0);

        return data;
}

static gpointer oh_evtpop_thread_loop(gpointer data)
{
        SaErrorT error = SA_OK;

        g_mutex_lock(oh_evtpop_thread_mutex);
        while(1) {
                dbg("Thread processing events");
                error = oh_process_events();
                if (error != SA_OK) err("Error on processing of events.");
        }
        g_mutex_unlock(oh_evtpop_thread_mutex);
        g_thread_exit(0);

        return data;
}

static gpointer oh_evtget_thread_loop(gpointer data)
{
        GTimeVal time;
        SaErrorT error = SA_OK;
        static int first_loop = 1;

        g_mutex_lock(oh_evtget_thread_mutex);
        while (1) {
                /* Give the discovery time to start first -> FIXME */
                if (first_loop) {
                        struct timespec sleepytime =
                                { .tv_sec = 0, .tv_nsec = 500000000};
                        first_loop = 0;
                        nanosleep(&sleepytime, NULL);
                }

                dbg("Thread Harvesting events");
                error = oh_harvest_events();
                if (error != SA_OK) err("Error on harvest of events.");

                /* Let oh_wake_evtget_thread know this thread is done */
                g_cond_broadcast(oh_evtget_thread_wait);
                g_get_current_time(&time);
                g_time_val_add(&time, OH_EVTGET_THREAD_SLEEP_TIME);
                dbg("Going to sleep");
                if (g_cond_timed_wait(oh_evtget_thread_wait, oh_evtget_thread_mutex, &time))
                        dbg("SIGNALED: Got signal from plugin");
                else
                        dbg("TIMEDOUT: Woke up, am looping again");
        }
        g_mutex_unlock(oh_evtget_thread_mutex);
        g_thread_exit(0);

        return data;
}

int oh_threaded_init()
{
        int error = 0;

        dbg("Attempting to init event");
        if (!g_thread_supported()) {
                dbg("Initializing thread support");
                g_thread_init(NULL);
        } else {
                dbg("Already supporting threads");
        }

        error = oh_event_init();
        if (oh_discovery_init() || error) error = 1;

        return error;
}

int oh_threaded_start()
{
        dbg("Starting discovery thread");
        oh_discovery_thread_wait = g_cond_new();
        oh_discovery_thread_mutex = g_mutex_new();
        oh_discovery_thread = g_thread_create(oh_discovery_thread_loop,
                                NULL, FALSE,
                                &oh_discovery_thread_error);

        dbg("Starting event threads");
        oh_evtget_thread_wait = g_cond_new();
        oh_evtget_thread_mutex = g_mutex_new();
        oh_evtget_thread = g_thread_create(oh_evtget_thread_loop,
                                NULL, FALSE, &oh_evtget_thread_error);
        oh_evtpop_thread_mutex = g_mutex_new();
        oh_evtpop_thread = g_thread_create(oh_evtpop_thread_loop,
                                NULL, FALSE, &oh_evtpop_thread_error);

        return 0;
}

int oh_threaded_final()
{
        oh_discovery_final();
        oh_event_final();

        return 0;
}

/**
 * oh_wake_discovery_thread
 * @wait: Says whether we should wait for the discovery thread
 * to do one round through the plugin instances. Otherwise, we
 * just knock on the discovery thread's door and return quickly.
 *
 * If wait is true, the discovery thread is woken up
 * and we wait until it does a round throughout the
 * plugin instances. If the thread is already running,
 * we will wait for it until it completes the round.
 *
 * Returns: void
 **/
void oh_wake_discovery_thread(SaHpiBoolT wait)
{
        if (!wait) { /* If not waiting, just signal the thread and go. */
                g_cond_broadcast(oh_discovery_thread_wait);
                return;
        }

        g_static_mutex_lock(&oh_wake_discovery_mutex);
        if (g_mutex_trylock(oh_discovery_thread_mutex)) {
                /* The thread was asleep; wake it up. */
                dbg("Going to wait for discovery thread to loop once.");
                g_cond_broadcast(oh_discovery_thread_wait);
                g_cond_wait(oh_discovery_thread_wait,
                                oh_discovery_thread_mutex);
                dbg("Got signal from discovery"
                    " thread being done. Giving lock back");
                g_mutex_unlock(oh_discovery_thread_mutex);
        } else {
                /* Thread was already up. Wait until it completes */
                dbg("Waiting for discovery thread...");
                g_mutex_lock(oh_discovery_thread_mutex);
                dbg("...Done waiting for discovery thread.");
                g_mutex_unlock(oh_discovery_thread_mutex);
        }
        g_static_mutex_unlock(&oh_wake_discovery_mutex);

        return;
}

/**
 * oh_wake_event_thread
 * @wait: Says whether we should wait for the event thread
 * to do one round through the plugin instances. Otherwise, we
 * just knock on the event thread's door and return quickly.
 *
 * If wait is true, the event thread is woken up
 * and we wait until it does a round throughout the
 * plugin instances. If the thread is already running,
 * we will wait for it until it completes the round.
 *
 * Returns: void
 **/
void oh_wake_event_thread(SaHpiBoolT wait)
{
        if (!wait) { /* If not waiting, just signal the thread and go. */
                g_cond_broadcast(oh_evtget_thread_wait);
                return;
        }

        g_static_mutex_lock(&oh_wake_evtget_mutex);
        if (g_mutex_trylock(oh_evtget_thread_mutex)) {
                /* The thread was asleep; wake it up. */
                dbg("Going to wait for event thread to loop once.");
                g_cond_broadcast(oh_evtget_thread_wait);
                g_cond_wait(oh_evtget_thread_wait,
                            oh_evtget_thread_mutex);
                dbg("Got signal from event"
                    " thread being done. Giving lock back");
                g_mutex_unlock(oh_evtget_thread_mutex);
        } else {
                /* Thread was already up. Wait until it completes */
                dbg("Waiting for event thread...");
                g_mutex_lock(oh_evtget_thread_mutex);
                dbg("...Done waiting for event thread.");
                g_mutex_unlock(oh_evtget_thread_mutex);
        }
        g_static_mutex_unlock(&oh_wake_evtget_mutex);

        return;
}