1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
/*
* Copyright 2015 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "mongoc-array-private.h"
#include "mongoc-thread-private.h"
#include "sync-queue.h"
struct _sync_queue_t {
mongoc_array_t array;
mongoc_cond_t cond;
mongoc_mutex_t mutex;
};
sync_queue_t *
q_new ()
{
sync_queue_t *q = (sync_queue_t *)bson_malloc (sizeof(sync_queue_t));
_mongoc_array_init (&q->array, sizeof(void *));
mongoc_cond_init (&q->cond);
mongoc_mutex_init (&q->mutex);
return q;
}
void
q_put (sync_queue_t *q, void *item)
{
mongoc_mutex_lock (&q->mutex);
_mongoc_array_append_val (&q->array, item);
mongoc_cond_signal (&q->cond);
mongoc_mutex_unlock (&q->mutex);
}
/* call holding the lock */
static void *
_get (sync_queue_t *q)
{
void **data;
void *item = NULL;
size_t i;
if (q->array.len) {
data = (void **)q->array.data;
item = data[0];
/* shift the queue left */
q->array.len--;
for (i = 0; i < q->array.len; i++) {
data[i] = data[i + 1];
}
}
return item;
}
void *
q_get (sync_queue_t *q, int64_t timeout_msec)
{
void *item = NULL;
int64_t deadline;
mongoc_mutex_lock (&q->mutex);
if (timeout_msec) {
deadline = bson_get_monotonic_time () + timeout_msec * 1000;
while (!q->array.len && bson_get_monotonic_time () <= deadline) {
mongoc_cond_timedwait (&q->cond, &q->mutex, timeout_msec);
}
} else {
/* no deadline */
while (!q->array.len) {
mongoc_cond_wait (&q->cond, &q->mutex);
}
}
item = _get (q);
mongoc_mutex_unlock (&q->mutex);
return item;
}
void *
q_get_nowait (sync_queue_t *q)
{
void *item;
mongoc_mutex_lock (&q->mutex);
item = _get (q);
mongoc_mutex_unlock (&q->mutex);
return item;
}
void
q_destroy (sync_queue_t *q)
{
_mongoc_array_destroy (&q->array);
mongoc_cond_destroy (&q->cond);
mongoc_mutex_destroy (&q->mutex);
bson_free (q);
}
|