1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
#include "uwsgi.h"
extern struct uwsgi_server uwsgi;
struct wsgi_request *threaded_current_wsgi_req() {
return pthread_getspecific(uwsgi.tur_key);
}
struct wsgi_request *simple_current_wsgi_req() {
return uwsgi.wsgi_req;
}
void uwsgi_register_loop(char *name, void (*func) (void)) {
struct uwsgi_loop *old_loop = NULL, *loop = uwsgi.loops;
while (loop) {
// check if the loop engine is already registered
if (!strcmp(name, loop->name))
return;
old_loop = loop;
loop = loop->next;
}
loop = uwsgi_calloc(sizeof(struct uwsgi_loop));
loop->name = name;
loop->loop = func;
if (old_loop) {
old_loop->next = loop;
}
else {
uwsgi.loops = loop;
}
}
void *uwsgi_get_loop(char *name) {
struct uwsgi_loop *loop = uwsgi.loops;
while (loop) {
if (!strcmp(name, loop->name)) {
return loop->loop;
}
loop = loop->next;
}
return NULL;
}
/*
this is the default (simple) loop.
it will run simple_loop_run function for each spawned thread
simple_loop_run monitors sockets and signals descriptors
and manages them.
*/
void simple_loop() {
uwsgi_loop_cores_run(simple_loop_run);
// Other threads may still run. Make sure they will stop.
uwsgi.workers[uwsgi.mywid].manage_next_request = 0;
if (uwsgi.workers[uwsgi.mywid].shutdown_sockets)
uwsgi_shutdown_all_sockets();
}
void uwsgi_loop_cores_run(void *(*func) (void *)) {
int i;
for (i = 1; i < uwsgi.threads; i++) {
long j = i;
pthread_create(&uwsgi.workers[uwsgi.mywid].cores[i].thread_id, &uwsgi.threads_attr, func, (void *) j);
}
long y = 0;
func((void *) y);
}
void uwsgi_setup_thread_req(long core_id, struct wsgi_request *wsgi_req) {
int i;
sigset_t smask;
pthread_setspecific(uwsgi.tur_key, (void *) wsgi_req);
if (core_id > 0) {
// block all signals on new threads
sigfillset(&smask);
#ifdef UWSGI_DEBUG
sigdelset(&smask, SIGSEGV);
#endif
pthread_sigmask(SIG_BLOCK, &smask, NULL);
// run per-thread socket hook
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (uwsgi_sock->proto_thread_fixup) {
uwsgi_sock->proto_thread_fixup(uwsgi_sock, core_id);
}
uwsgi_sock = uwsgi_sock->next;
}
for (i = 0; i < 256; i++) {
if (uwsgi.p[i]->init_thread) {
uwsgi.p[i]->init_thread(core_id);
}
}
}
}
void simple_loop_run_int(int core_id) {
long y = core_id;
simple_loop_run((void *) y);
}
void *simple_loop_run(void *arg1) {
long core_id = (long) arg1;
struct wsgi_request *wsgi_req = &uwsgi.workers[uwsgi.mywid].cores[core_id].req;
if (uwsgi.threads > 1) {
uwsgi_setup_thread_req(core_id, wsgi_req);
}
// initialize the main event queue to monitor sockets
int main_queue = event_queue_init();
uwsgi_add_sockets_to_queue(main_queue, core_id);
event_queue_add_fd_read(main_queue, uwsgi.loop_stop_pipe[0]);
if (uwsgi.signal_socket > -1) {
event_queue_add_fd_read(main_queue, uwsgi.signal_socket);
event_queue_add_fd_read(main_queue, uwsgi.my_signal_socket);
}
// ok we are ready, let's start managing requests and signals
while (uwsgi.workers[uwsgi.mywid].manage_next_request) {
wsgi_req_setup(wsgi_req, core_id, NULL);
if (wsgi_req_accept(main_queue, wsgi_req)) {
continue;
}
if (wsgi_req_recv(main_queue, wsgi_req)) {
uwsgi_destroy_request(wsgi_req);
continue;
}
uwsgi_close_request(wsgi_req);
}
// end of the loop
if (uwsgi.workers[uwsgi.mywid].destroy && uwsgi.workers[0].pid > 0) {
#ifdef __APPLE__
kill(uwsgi.workers[0].pid, SIGTERM);
#else
if (uwsgi.propagate_touch) {
kill(uwsgi.workers[0].pid, SIGHUP);
}
else {
gracefully_kill(0);
}
#endif
}
return NULL;
}
|