1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
|
/*
* ws protocol handler plugin for "lws-minimal" demonstrating multithread
*
* Written in 2010-2019 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*/
#if !defined (LWS_PLUGIN_STATIC)
#define LWS_DLL
#define LWS_INTERNAL
#include <libwebsockets.h>
#endif
#include <string.h>
/* one of these created for each message in the ringbuffer */
struct msg {
void *payload; /* is malloc'd */
size_t len;
};
/*
* One of these is created for each client connecting to us.
*
* It is ONLY read or written from the lws service thread context.
*/
struct per_session_data__minimal {
struct per_session_data__minimal *pss_list;
struct lws *wsi;
uint32_t tail;
};
/* one of these is created for each vhost our protocol is used with */
struct per_vhost_data__minimal {
struct lws_context *context;
struct lws_vhost *vhost;
const struct lws_protocols *protocol;
struct per_session_data__minimal *pss_list; /* linked-list of live pss*/
pthread_t pthread_spam[2];
pthread_mutex_t lock_ring; /* serialize access to the ring buffer */
struct lws_ring *ring; /* {lock_ring} ringbuffer holding unsent content */
const char *config;
char finished;
};
#if defined(WIN32)
static void usleep(unsigned long l) { Sleep(l / 1000); }
#endif
/*
* This runs under both lws service and "spam threads" contexts.
* Access is serialized by vhd->lock_ring.
*/
static void
__minimal_destroy_message(void *_msg)
{
struct msg *msg = _msg;
free(msg->payload);
msg->payload = NULL;
msg->len = 0;
}
/*
* This runs under the "spam thread" thread context only.
*
* We spawn two threads that generate messages with this.
*
*/
static void *
thread_spam(void *d)
{
struct per_vhost_data__minimal *vhd =
(struct per_vhost_data__minimal *)d;
struct msg amsg;
int len = 128, index = 1, n, whoami = 0;
for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
if (pthread_equal(pthread_self(), vhd->pthread_spam[n]))
whoami = n + 1;
do {
/* don't generate output if nobody connected */
if (!vhd->pss_list)
goto wait;
pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
/* only create if space in ringbuffer */
n = (int)lws_ring_get_count_free_elements(vhd->ring);
if (!n) {
lwsl_user("dropping!\n");
goto wait_unlock;
}
amsg.payload = malloc((unsigned int)(LWS_PRE + len));
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
goto wait_unlock;
}
n = lws_snprintf((char *)amsg.payload + LWS_PRE, (unsigned int)len,
"%s: tid: %d, msg: %d", vhd->config,
whoami, index++);
amsg.len = (unsigned int)n;
n = (int)lws_ring_insert(vhd->ring, &amsg, 1);
if (n != 1) {
__minimal_destroy_message(&amsg);
lwsl_user("dropping!\n");
} else
/*
* This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED
* in the lws service thread context.
*/
lws_cancel_service(vhd->context);
wait_unlock:
pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
wait:
usleep(100000);
} while (!vhd->finished);
lwsl_notice("thread_spam %d exiting\n", whoami);
pthread_exit(NULL);
return NULL;
}
/* this runs under the lws service thread context only */
static int
callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct per_session_data__minimal *pss =
(struct per_session_data__minimal *)user;
struct per_vhost_data__minimal *vhd =
(struct per_vhost_data__minimal *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi));
const struct lws_protocol_vhost_options *pvo;
const struct msg *pmsg;
void *retval;
int n, m, r = 0;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
/* create our per-vhost struct */
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(struct per_vhost_data__minimal));
if (!vhd)
return 1;
pthread_mutex_init(&vhd->lock_ring, NULL);
/* recover the pointer to the globals struct */
pvo = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"config");
if (!pvo || !pvo->value) {
lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
return 1;
}
vhd->config = pvo->value;
vhd->context = lws_get_context(wsi);
vhd->protocol = lws_get_protocol(wsi);
vhd->vhost = lws_get_vhost(wsi);
vhd->ring = lws_ring_create(sizeof(struct msg), 8,
__minimal_destroy_message);
if (!vhd->ring) {
lwsl_err("%s: failed to create ring\n", __func__);
return 1;
}
/* start the content-creating threads */
for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
if (pthread_create(&vhd->pthread_spam[n], NULL,
thread_spam, vhd)) {
lwsl_err("thread creation failed\n");
r = 1;
goto init_fail;
}
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
init_fail:
vhd->finished = 1;
for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
if (vhd->pthread_spam[n])
pthread_join(vhd->pthread_spam[n], &retval);
if (vhd->ring)
lws_ring_destroy(vhd->ring);
pthread_mutex_destroy(&vhd->lock_ring);
break;
case LWS_CALLBACK_ESTABLISHED:
/* add ourselves to the list of live pss held in the vhd */
lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
pss->tail = lws_ring_get_oldest_tail(vhd->ring);
pss->wsi = wsi;
break;
case LWS_CALLBACK_CLOSED:
/* remove our closing pss from the list of live pss */
lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
pss, vhd->pss_list);
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
if (!pmsg) {
pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
break;
}
/* notice we allowed for LWS_PRE in the payload already */
m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
pmsg->len, LWS_WRITE_TEXT);
if (m < (int)pmsg->len) {
pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
lwsl_err("ERROR %d writing to ws socket\n", m);
return -1;
}
lws_ring_consume_and_update_oldest_tail(
vhd->ring, /* lws_ring object */
struct per_session_data__minimal, /* type of objects with tails */
&pss->tail, /* tail of guy doing the consuming */
1, /* number of payload objects being consumed */
vhd->pss_list, /* head of list of objects with tails */
tail, /* member name of tail in objects with tails */
pss_list /* member name of next object in objects with tails */
);
/* more to do? */
if (lws_ring_get_element(vhd->ring, &pss->tail))
/* come back as soon as we can write more */
lws_callback_on_writable(pss->wsi);
pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
break;
case LWS_CALLBACK_RECEIVE:
break;
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
if (!vhd)
break;
/*
* When the "spam" threads add a message to the ringbuffer,
* they create this event in the lws service thread context
* using lws_cancel_service().
*
* We respond by scheduling a writable callback for all
* connected clients.
*/
lws_start_foreach_llp(struct per_session_data__minimal **,
ppss, vhd->pss_list) {
lws_callback_on_writable((*ppss)->wsi);
} lws_end_foreach_llp(ppss, pss_list);
break;
default:
break;
}
return r;
}
#define LWS_PLUGIN_PROTOCOL_MINIMAL \
{ \
"lws-minimal", \
callback_minimal, \
sizeof(struct per_session_data__minimal), \
128, \
0, NULL, 0 \
}
|