1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
#' Send Receive Functions
#'
#' Send and receive functions
#'
#' \code{zmq.send()} is a high level R function calling ZMQ C API
#' \code{zmq_send()} sending \code{buf} out.
#'
#' \code{zmq.recv()} is a high level R function calling ZMQ C API
#' \code{zmq_recv()} receiving buffers of length \code{len} according to the
#' \code{buf.type}.
#'
#' \code{flags} see \code{\link{ZMQ.SR}()} for detail options of send and
#' receive functions.
#'
#' \code{buf.type} currently supports \code{char} and \code{raw} which are both
#' in R object format.
#'
#' @param socket
#' a ZMQ socket
#' @param buf
#' a buffer to be sent
#' @param len
#' a length of buffer to be received, default 1024 bytes
#' @param flags
#' a flag for the method using by \code{zmq_send} and
#' \code{zmq_recv}
#' @param buf.type
#' buffer type to be received
#'
#' @return \code{zmq.send()} returns number of bytes (invisible) in the sent
#' message if successful, otherwise returns -1 (invisible) and sets
#' \code{errno} to the error value, see ZeroMQ manual for details.
#'
#' \code{zmq.recv()} returns a list (\code{ret}) containing the received buffer
#' \code{ret$buf} and the length of received buffer (\code{ret$len} which is
#' less or equal to the input \code{len}) if successful, otherwise returns -1
#' and sets \code{errno} to the error value, see ZeroMQ manual for details.
#'
#' @author Wei-Chen Chen \email{wccsnow@@gmail.com}.
#'
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#'
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#'
#' @examples
#' \dontrun{
#' ### Using request-reply pattern.
#'
#' ### At the server, run next in background or the other window.
#' library(pbdZMQ, quietly = TRUE)
#'
#' context <- zmq.ctx.new()
#' responder <- zmq.socket(context, ZMQ.ST()$REP)
#' zmq.bind(responder, "tcp://*:5555")
#' for(i.res in 1:5){
#' buf <- zmq.recv(responder, 10L)
#' cat(buf$buf, "\n")
#' Sys.sleep(0.5)
#' zmq.send(responder, "World")
#' }
#' zmq.close(responder)
#' zmq.ctx.destroy(context)
#'
#'
#' ### At a client, run next in foreground.
#' library(pbdZMQ, quietly = TRUE)
#'
#' context <- zmq.ctx.new()
#' requester <- zmq.socket(context, ZMQ.ST()$REQ)
#' zmq.connect(requester, "tcp://localhost:5555")
#' for(i.req in 1:5){
#' cat("Sending Hello ", i.req, "\n")
#' zmq.send(requester, "Hello")
#' buf <- zmq.recv(requester, 10L)
#' cat("Received World ", i.req, "\n")
#' }
#' zmq.close(requester)
#' zmq.ctx.destroy(context)
#' }
#'
#' @keywords programming
#' @seealso \code{\link{zmq.msg.send}()}, \code{\link{zmq.msg.recv}()}.
#' @rdname b0_sendrecv
#' @name Send Receive Functions
NULL
#' @rdname b0_sendrecv
#' @export
zmq.send <- function(socket, buf, flags = ZMQ.SR()$BLOCK){
if(is.character(buf)){
ret <- zmq.send.char(socket, buf, nchar(buf), flags = flags)
} else if(is.raw(buf)){
ret <- zmq.send.raw(socket, buf, length(buf), flags = flags)
} else{
stop("buf type should be char or raw.")
}
invisible(ret)
}
zmq.send.char <- function(socket, buf, len, flags = ZMQ.SR()$BLOCK){
ret <- .Call("R_zmq_send_char", socket, buf, as.integer(len),
as.integer(flags), PACKAGE = "pbdZMQ")
invisible(ret)
}
zmq.send.raw <- function(socket, buf, len, flags = ZMQ.SR()$BLOCK){
ret <- .Call("R_zmq_send_raw", socket, buf, as.integer(len),
as.integer(flags), PACKAGE = "pbdZMQ")
invisible(ret)
}
#' @rdname b0_sendrecv
#' @export
zmq.recv <- function(socket, len = 1024L, flags = ZMQ.SR()$BLOCK,
buf.type = c("char", "raw")){
if(buf.type[1] == "char"){
ret <- zmq.recv.char(socket, len, flags = flags)
} else if(buf.type[1] == "raw"){
ret <- zmq.recv.raw(socket, len, flags = flags)
} else{
stop("buf type should be char or raw.")
}
invisible(ret)
}
zmq.recv.char <- function(socket, len, flags = ZMQ.SR()$BLOCK){
ret <- .Call("R_zmq_recv_char", socket, as.integer(len), as.integer(flags),
PACKAGE = "pbdZMQ")
invisible(ret)
}
zmq.recv.raw <- function(socket, len, flags = ZMQ.SR()$BLOCK){
ret <- .Call("R_zmq_recv_raw", socket, as.integer(len), as.integer(flags),
PACKAGE = "pbdZMQ")
invisible(ret)
}
|