1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
#' Message Functions
#'
#' Message functions
#'
#' \code{zmq.msg.send()} sends an R message.
#'
#' \code{zmq.msg.recv()} receives an R message.
#'
#' @param rmsg
#' an R message
#' @param socket
#' a ZMQ socket
#' @param flags
#' a flag for method of send and receive
#' @param serialize
#' if serialize the \code{rmsg}
#' @param unserialize
#' if unserialize the received R message
#' @param serialversion
#' NULL or numeric; the workspace format version to use when serializing.
#' NULL specifies the current default version. The only other supported
#' values are 2 and 3
#'
#' @return \code{zmq.msg.send()} returns 0 if successful, otherwise returns -1
#' and sets \code{errno} to \code{EFAULT}.
#'
#' \code{zmq.msg.recv()} returns the message if successful, otherwise returns
#' -1 and sets \code{errno} to \code{EFAULT}.
#'
#' @author Wei-Chen Chen \email{wccsnow@@gmail.com}.
#'
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#'
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#'
#' @examples
#' \dontrun{
#' ### Using request-reply pattern.
#'
#' ### At the server, run next in background or the other window.
#' library(pbdZMQ, quietly = TRUE)
#'
#' context <- zmq.ctx.new()
#' responder <- zmq.socket(context, ZMQ.ST()$REP)
#' zmq.bind(responder, "tcp://*:5555")
#' buf <- zmq.msg.recv(responder)
#' set.seed(1234)
#' ret <- rnorm(5)
#' print(ret)
#' zmq.msg.send(ret, responder)
#' zmq.close(responder)
#' zmq.ctx.destroy(context)
#'
#'
#' ### At a client, run next in foreground.
#' library(pbdZMQ, quietly = TRUE)
#'
#' context <- zmq.ctx.new()
#' requester <- zmq.socket(context, ZMQ.ST()$REQ)
#' zmq.connect(requester, "tcp://localhost:5555")
#' zmq.msg.send(NULL, requester)
#' ret <- zmq.msg.recv(requester)
#' print(ret)
#' zmq.close(requester)
#' zmq.ctx.destroy(context)
#' }
#'
#' @keywords programming
#' @seealso \code{\link{zmq.send}()}, \code{\link{zmq.recv}()}.
#' @rdname a2_message
#' @name Message Function
NULL
zmq.msg.init <- function(){
.Call("R_zmq_msg_init", PACKAGE = "pbdZMQ")
}
zmq.msg.close <- function(msg.t){
.Call("R_zmq_msg_close", msg.t, PACKAGE = "pbdZMQ")
}
#' @rdname a2_message
#' @export
zmq.msg.send <- function(rmsg, socket, flags = ZMQ.SR()$BLOCK,
serialize = TRUE, serialversion = NULL){
if(serialize){
rmsg <- serialize(rmsg, NULL, version = serialversion)
}
ret <- .Call("R_zmq_msg_send", rmsg, socket, as.integer(flags), PACKAGE = "pbdZMQ")
invisible(ret)
}
#' @rdname a2_message
#' @export
zmq.msg.recv <- function(socket, flags = ZMQ.SR()$BLOCK,
unserialize = TRUE){
rmsg <- .Call("R_zmq_msg_recv", socket, as.integer(flags), PACKAGE = "pbdZMQ")
if(unserialize && is.raw(rmsg)){
rmsg <- unserialize(rmsg)
}
rmsg
}
|