1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
|
/* we do not modify RSTRING pointers here */
#include "kgio.h"
#include "my_fileno.h"
#include "nonblock.h"
static VALUE sym_wait_writable;
struct wr_args {
VALUE io;
VALUE buf;
const char *ptr;
long len;
int fd;
int flags;
};
static void prepare_write(struct wr_args *a, VALUE io, VALUE str)
{
a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
a->ptr = RSTRING_PTR(a->buf);
a->len = RSTRING_LEN(a->buf);
a->io = io;
a->fd = my_fileno(io);
}
static int write_check(struct wr_args *a, long n, const char *msg, int io_wait)
{
if (a->len == n) {
done:
a->buf = Qnil;
} else if (n < 0) {
if (errno == EINTR) {
a->fd = my_fileno(a->io);
return -1;
}
if (errno == EAGAIN) {
long written = RSTRING_LEN(a->buf) - a->len;
if (io_wait) {
(void)kgio_call_wait_writable(a->io);
/* buf may be modified in other thread/fiber */
a->len = RSTRING_LEN(a->buf) - written;
if (a->len <= 0)
goto done;
a->ptr = RSTRING_PTR(a->buf) + written;
return -1;
} else if (written > 0) {
a->buf = MY_STR_SUBSEQ(a->buf, written, a->len);
} else {
a->buf = sym_wait_writable;
}
return 0;
}
kgio_wr_sys_fail(msg);
} else {
assert(n >= 0 && n < a->len && "write/send syscall broken?");
a->ptr += n;
a->len -= n;
return -1;
}
return 0;
}
static VALUE my_write(VALUE io, VALUE str, int io_wait)
{
struct wr_args a;
long n;
prepare_write(&a, io, str);
set_nonblocking(a.fd);
retry:
n = (long)write(a.fd, a.ptr, a.len);
if (write_check(&a, n, "write", io_wait) != 0)
goto retry;
if (TYPE(a.buf) != T_SYMBOL)
kgio_autopush_write(io);
return a.buf;
}
/*
* call-seq:
*
* io.kgio_write(str) -> nil
*
* Returns nil when the write completes.
*
* This may block and call any method defined to +kgio_wait_writable+
* for the class.
*/
static VALUE kgio_write(VALUE io, VALUE str)
{
return my_write(io, str, 1);
}
/*
* call-seq:
*
* io.kgio_trywrite(str) -> nil, String or :wait_writable
*
* Returns nil if the write was completed in full.
*
* Returns a String containing the unwritten portion if EAGAIN
* was encountered, but some portion was successfully written.
*
* Returns :wait_writable if EAGAIN is encountered and nothing
* was written.
*/
static VALUE kgio_trywrite(VALUE io, VALUE str)
{
return my_write(io, str, 0);
}
#ifdef USE_MSG_DONTWAIT
/*
* This method behaves like Kgio::PipeMethods#kgio_write, except
* it will use send(2) with the MSG_DONTWAIT flag on sockets to
* avoid unnecessary calls to fcntl(2).
*/
static VALUE my_send(VALUE io, VALUE str, int io_wait)
{
struct wr_args a;
long n;
prepare_write(&a, io, str);
retry:
n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
if (write_check(&a, n, "send", io_wait) != 0)
goto retry;
if (TYPE(a.buf) != T_SYMBOL)
kgio_autopush_send(io);
return a.buf;
}
/*
* This method may be optimized on some systems (e.g. GNU/Linux) to use
* MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
* Otherwise this is the same as Kgio::PipeMethods#kgio_write
*/
static VALUE kgio_send(VALUE io, VALUE str)
{
return my_send(io, str, 1);
}
/*
* This method may be optimized on some systems (e.g. GNU/Linux) to use
* MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
* Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
*/
static VALUE kgio_trysend(VALUE io, VALUE str)
{
return my_send(io, str, 0);
}
#else /* ! USE_MSG_DONTWAIT */
# define kgio_send kgio_write
# define kgio_trysend kgio_trywrite
#endif /* ! USE_MSG_DONTWAIT */
#if defined(KGIO_WITHOUT_GVL)
# include "blocking_io_region.h"
#ifdef MSG_DONTWAIT /* Linux only */
# define MY_MSG_DONTWAIT (MSG_DONTWAIT)
#else
# define MY_MSG_DONTWAIT (0)
#endif
static VALUE nogvl_send(void *ptr)
{
struct wr_args *a = ptr;
return (VALUE)send(a->fd, a->ptr, a->len, a->flags);
}
/*
* call-seq:
*
* io.kgio_syssend(str, flags) -> nil, String or :wait_writable
*
* Returns nil if the write was completed in full.
*
* Returns a String containing the unwritten portion if EAGAIN
* was encountered, but some portion was successfully written.
*
* Returns :wait_writable if EAGAIN is encountered and nothing
* was written.
*
* This method is only available on Ruby 1.9.3 or later.
*/
static VALUE kgio_syssend(VALUE io, VALUE str, VALUE flags)
{
struct wr_args a;
long n;
a.flags = NUM2INT(flags);
prepare_write(&a, io, str);
if (a.flags & MY_MSG_DONTWAIT) {
do {
n = (long)send(a.fd, a.ptr, a.len, a.flags);
} while (write_check(&a, n, "send", 0) != 0);
} else {
do {
n = (long)rb_thread_io_blocking_region(
nogvl_send, &a, a.fd);
} while (write_check(&a, n, "send", 0) != 0);
}
return a.buf;
}
#endif /* HAVE_RB_THREAD_IO_BLOCKING_REGION */
/*
* call-seq:
*
* Kgio.trywrite(io, str) -> nil, String or :wait_writable
*
* Returns nil if the write was completed in full.
*
* Returns a String containing the unwritten portion if EAGAIN
* was encountered, but some portion was successfully written.
*
* Returns :wait_writable if EAGAIN is encountered and nothing
* was written.
*
* Maybe used in place of PipeMethods#kgio_trywrite for non-Kgio objects
*/
static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
{
return my_write(io, str, 0);
}
void init_kgio_write(void)
{
VALUE mPipeMethods, mSocketMethods;
VALUE mKgio = rb_define_module("Kgio");
sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
/*
* Document-module: Kgio::PipeMethods
*
* This module may be used used to create classes that respond to
* various Kgio methods for reading and writing. This is included
* in Kgio::Pipe by default.
*/
mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
/*
* Document-module: Kgio::SocketMethods
*
* This method behaves like Kgio::PipeMethods, but contains
* optimizations for sockets on certain operating systems
* (e.g. GNU/Linux).
*/
mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
#if defined(KGIO_WITHOUT_GVL)
rb_define_method(mSocketMethods, "kgio_syssend", kgio_syssend, 2);
#endif
}
|