1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
#include "ruby.h"
#include "ruby/thread.h"
#include "rubyspec.h"
#include <math.h>
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#if defined(_WIN32)
#include "ruby/win32.h"
#define read rb_w32_read
#define write rb_w32_write
#define pipe rb_w32_pipe
#endif
#ifndef _WIN32
#include <pthread.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
static VALUE thread_spec_rb_thread_alone(VALUE self) {
return rb_thread_alone() ? Qtrue : Qfalse;
}
/* This is unblocked by unblock_func(). */
static void* blocking_gvl_func(void* data) {
int rfd = *(int *)data;
char dummy = ' ';
ssize_t r;
do {
r = read(rfd, &dummy, 1);
} while (r == -1 && errno == EINTR);
close(rfd);
return (void*)((r == 1 && dummy == 'A') ? Qtrue : Qfalse);
}
static void unblock_gvl_func(void *data) {
int wfd = *(int *)data;
char dummy = 'A';
ssize_t r;
do {
r = write(wfd, &dummy, 1);
} while (r == -1 && errno == EINTR);
close(wfd);
}
/* Returns true if the thread is interrupted. */
static VALUE thread_spec_rb_thread_call_without_gvl(VALUE self) {
int fds[2];
void* ret;
if (pipe(fds) == -1) {
rb_raise(rb_eRuntimeError, "could not create pipe");
}
ret = rb_thread_call_without_gvl(blocking_gvl_func, &fds[0],
unblock_gvl_func, &fds[1]);
return (VALUE)ret;
}
/* This is unblocked by a signal. */
static void* blocking_gvl_func_for_udf_io(void *data) {
int rfd = (int)(size_t)data;
char dummy;
if (read(rfd, &dummy, 1) == -1 && errno == EINTR) {
return (void*)Qtrue;
} else {
return (void*)Qfalse;
}
}
/* Returns true if the thread is interrupted. */
static VALUE thread_spec_rb_thread_call_without_gvl_with_ubf_io(VALUE self) {
int fds[2];
void* ret;
if (pipe(fds) == -1) {
rb_raise(rb_eRuntimeError, "could not create pipe");
}
ret = rb_thread_call_without_gvl(blocking_gvl_func_for_udf_io,
(void*)(size_t)fds[0], RUBY_UBF_IO, 0);
close(fds[0]);
close(fds[1]);
return (VALUE)ret;
}
static VALUE thread_spec_rb_thread_current(VALUE self) {
return rb_thread_current();
}
static VALUE thread_spec_rb_thread_local_aref(VALUE self, VALUE thr, VALUE sym) {
return rb_thread_local_aref(thr, SYM2ID(sym));
}
static VALUE thread_spec_rb_thread_local_aset(VALUE self, VALUE thr, VALUE sym, VALUE value) {
return rb_thread_local_aset(thr, SYM2ID(sym), value);
}
static VALUE thread_spec_rb_thread_wakeup(VALUE self, VALUE thr) {
return rb_thread_wakeup(thr);
}
static VALUE thread_spec_rb_thread_wait_for(VALUE self, VALUE s, VALUE ms) {
struct timeval tv;
tv.tv_sec = NUM2INT(s);
tv.tv_usec = NUM2INT(ms);
rb_thread_wait_for(tv);
return Qnil;
}
VALUE thread_spec_call_proc(void *arg_ptr) {
VALUE arg_array = (VALUE)arg_ptr;
VALUE arg = rb_ary_pop(arg_array);
VALUE proc = rb_ary_pop(arg_array);
return rb_funcall(proc, rb_intern("call"), 1, arg);
}
static VALUE thread_spec_rb_thread_create(VALUE self, VALUE proc, VALUE arg) {
VALUE args = rb_ary_new();
rb_ary_push(args, proc);
rb_ary_push(args, arg);
return rb_thread_create(thread_spec_call_proc, (void*)args);
}
static VALUE thread_spec_ruby_native_thread_p(VALUE self) {
if (ruby_native_thread_p()) {
return Qtrue;
} else {
return Qfalse;
}
}
#ifndef _WIN32
static VALUE false_result = Qfalse;
static VALUE true_result = Qtrue;
static void *new_thread_check(void *args) {
if (ruby_native_thread_p()) {
return &true_result;
} else {
return &false_result;
}
}
#endif
static VALUE thread_spec_ruby_native_thread_p_new_thread(VALUE self) {
#ifndef _WIN32
pthread_t t;
void *result = &true_result;
pthread_create(&t, NULL, new_thread_check, NULL);
pthread_join(t, &result);
return *(VALUE *)result;
#else
return Qfalse;
#endif
}
void Init_thread_spec(void) {
VALUE cls = rb_define_class("CApiThreadSpecs", rb_cObject);
rb_define_method(cls, "rb_thread_alone", thread_spec_rb_thread_alone, 0);
rb_define_method(cls, "rb_thread_call_without_gvl", thread_spec_rb_thread_call_without_gvl, 0);
rb_define_method(cls, "rb_thread_call_without_gvl_with_ubf_io", thread_spec_rb_thread_call_without_gvl_with_ubf_io, 0);
rb_define_method(cls, "rb_thread_current", thread_spec_rb_thread_current, 0);
rb_define_method(cls, "rb_thread_local_aref", thread_spec_rb_thread_local_aref, 2);
rb_define_method(cls, "rb_thread_local_aset", thread_spec_rb_thread_local_aset, 3);
rb_define_method(cls, "rb_thread_wakeup", thread_spec_rb_thread_wakeup, 1);
rb_define_method(cls, "rb_thread_wait_for", thread_spec_rb_thread_wait_for, 2);
rb_define_method(cls, "rb_thread_create", thread_spec_rb_thread_create, 2);
rb_define_method(cls, "ruby_native_thread_p", thread_spec_ruby_native_thread_p, 0);
rb_define_method(cls, "ruby_native_thread_p_new_thread", thread_spec_ruby_native_thread_p_new_thread, 0);
}
#ifdef __cplusplus
}
#endif
|