1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
/**
* MessageQueueSource:
*
* This is a #GSource which wraps a #GAsyncQueue and is dispatched whenever a
* message can be pulled off the queue. Messages can be enqueued from any
* thread.
*
* The callbacks dispatched by a #MessageQueueSource have type
* #MessageQueueSourceFunc.
*
* #MessageQueueSource supports adding a #GCancellable child source which will
* additionally dispatch if a provided #GCancellable is cancelled.
*/
typedef struct {
GSource parent;
GAsyncQueue *queue; /* owned */
GDestroyNotify destroy_message;
} MessageQueueSource;
/**
* MessageQueueSourceFunc:
* @message: (transfer full) (nullable): message pulled off the queue
* @user_data: user data provided to g_source_set_callback()
*
* Callback function type for #MessageQueueSource.
*/
typedef gboolean (*MessageQueueSourceFunc) (gpointer message,
gpointer user_data);
static gboolean
message_queue_source_prepare (GSource *source,
gint *timeout_)
{
MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
return (g_async_queue_length (message_queue_source->queue) > 0);
}
static gboolean
message_queue_source_dispatch (GSource *source,
GSourceFunc callback,
gpointer user_data)
{
MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
gpointer message;
MessageQueueSourceFunc func = (MessageQueueSourceFunc) callback;
/* Pop a message off the queue. */
message = g_async_queue_try_pop (message_queue_source->queue);
/* If there was no message, bail. */
if (message == NULL)
{
/* Keep the source around to handle the next message. */
return TRUE;
}
/* @func may be %NULL if no callback was specified.
* If so, drop the message. */
if (func == NULL)
{
if (message_queue_source->destroy_message != NULL)
{
message_queue_source->destroy_message (message);
}
/* Keep the source around to consume the next message. */
return TRUE;
}
return func (message, user_data);
}
static void
message_queue_source_finalize (GSource *source)
{
MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
g_async_queue_unref (message_queue_source->queue);
}
static gboolean
message_queue_source_closure_callback (gpointer message,
gpointer user_data)
{
GClosure *closure = user_data;
GValue param_value = G_VALUE_INIT;
GValue result_value = G_VALUE_INIT;
gboolean retval;
/* The invoked function is responsible for freeing @message. */
g_value_init (&result_value, G_TYPE_BOOLEAN);
g_value_init (¶m_value, G_TYPE_POINTER);
g_value_set_pointer (¶m_value, message);
g_closure_invoke (closure, &result_value, 1, ¶m_value, NULL);
retval = g_value_get_boolean (&result_value);
g_value_unset (¶m_value);
g_value_unset (&result_value);
return retval;
}
static GSourceFuncs message_queue_source_funcs =
{
message_queue_source_prepare,
NULL, /* check */
message_queue_source_dispatch,
message_queue_source_finalize,
(GSourceFunc) message_queue_source_closure_callback,
NULL,
};
/**
* message_queue_source_new:
* @queue: the queue to check
* @destroy_message: (nullable): function to free a message, or %NULL
* @cancellable: (nullable): a #GCancellable, or %NULL
*
* Create a new #MessageQueueSource, a type of #GSource which dispatches for
* each message queued to it.
*
* If a callback function of type #MessageQueueSourceFunc is connected to the
* returned #GSource using g_source_set_callback(), it will be invoked for each
* message, with the message passed as its first argument. It is responsible for
* freeing the message. If no callback is set, messages are automatically freed
* as they are queued.
*
* Returns: (transfer full): a new #MessageQueueSource
*/
GSource *
message_queue_source_new (GAsyncQueue *queue,
GDestroyNotify destroy_message,
GCancellable *cancellable)
{
GSource *source; /* alias of @message_queue_source */
MessageQueueSource *message_queue_source; /* alias of @source */
g_return_val_if_fail (queue != NULL, NULL);
g_return_val_if_fail (cancellable == NULL ||
G_IS_CANCELLABLE (cancellable), NULL);
source = g_source_new (&message_queue_source_funcs,
sizeof (MessageQueueSource));
message_queue_source = (MessageQueueSource *) source;
/* The caller can overwrite this name with something more useful later. */
g_source_set_name (source, "MessageQueueSource");
message_queue_source->queue = g_async_queue_ref (queue);
message_queue_source->destroy_message = destroy_message;
/* Add a cancellable source. */
if (cancellable != NULL)
{
GSource *cancellable_source;
cancellable_source = g_cancellable_source_new (cancellable);
g_source_set_dummy_callback (cancellable_source);
g_source_add_child_source (source, cancellable_source);
g_source_unref (cancellable_source);
}
return source;
}
|