1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
#' All Wrapper Functions for rzmq
#'
#' Wrapper functions for backwards compatibility with rzmq. See vignette
#' for examples.
#'
#' @details
#' \code{send.socket()}/\code{receive.socket()} send/receive messages over
#' a socket. These are simple wrappers around \code{zmq.msg.send()} and
#' \code{zmq.msg.receive()}, respectively.
#'
#' \code{init.context()} creates a new ZeroMQ context. A useful wrapper
#' around \code{zmq.ctx.new()} which handles freeing memory for you, i.e.
#' \code{zmq.ctx.destroy()} will automatically be called for you.
#'
#' \code{init.socket()} creates a ZeroMQ socket; serves as a high-level
#' binding for \code{zmq.socket()}, including handling freeing memory
#' automatically. See also \code{ZMQ.ST()}.
#'
#' \code{bind.socket()}: see \code{zmq.bind()}.
#'
#' \code{connect.socket()}: see \code{zmq.connect()}
#'
#' @param socket
#' A ZMQ socket.
#' @param data
#' An R object.
#' @param send.more
#' Logical; will more messages be sent?
#' @param serialize,unserialize
#' Logical; determines if serialize/unserialize should be called
#' on the sent/received data.
#' @param serialversion
#' NULL or numeric; the workspace format version to use when serializing.
#' NULL specifies the current default version. The only other supported
#' values are 2 and 3.
#' @param dont.wait
#' Logical; determines if reception is blocking.
#' @param context
#' A ZMQ context.
#' @param socket.type
#' The type of ZMQ socket as a string, of the form "ZMQ_type". Valid 'type'
#' values are PAIR, PUB, SUB, REQ, REP, DEALER, PULL, PUSH, XPUB, XSUB, and
#' STERAM.
#' @param address
#' A valid address. See details.
#'
#' @author Wei-Chen Chen \email{wccsnow@@gmail.com}.
#'
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#'
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#'
#' @keywords rzmq
#' @rdname xx_rzmq_wrapper
#' @name Wrapper Functions for rzmq
NULL
#' @rdname xx_rzmq_wrapper
#' @export
send.socket <- function(socket, data, send.more = FALSE, serialize = TRUE,
serialversion = NULL){
if(send.more){
flags <- ZMQ.SR()$SNDMORE
} else{
flags <- ZMQ.SR()$BLOCK
}
zmq.msg.send(data, socket, flags = flags, serialize = serialize,
serialversion = serialversion)
}
#' @rdname xx_rzmq_wrapper
#' @export
receive.socket <- function(socket, unserialize = TRUE, dont.wait = FALSE){
if(dont.wait){
flags <- ZMQ.SR()$DONTWAIT
} else{
flags <- ZMQ.SR()$BLOCK
}
zmq.msg.recv(socket, flags = flags, unserialize = unserialize)
}
#' @rdname xx_rzmq_wrapper
#' @export
init.context <- function(){
try.zmq.ctx.destroy <- function(ctx){
invisible(suppressWarnings(zmq.ctx.destroy(ctx)))
}
ctx <- zmq.ctx.new()
reg.finalizer(ctx, try.zmq.ctx.destroy, onexit = TRUE)
ctx
}
#' @rdname xx_rzmq_wrapper
#' @export
init.socket <- function(context, socket.type){
try.zmqt.close <- function(socket){
invisible(suppressWarnings(zmq.close(socket)))
}
socket.type <- sub(".*_", "", socket.type)
id <- which(names(ZMQ.ST()) == socket.type)
if(length(id) != 1){
stop("socket.type is not found.")
} else{
socket.type <- ZMQ.ST()[[id]]
}
socket <- zmq.socket(context, type = socket.type)
reg.finalizer(socket, try.zmqt.close, onexit = TRUE)
socket
}
#' @rdname xx_rzmq_wrapper
#' @export
bind.socket <- function(socket, address){
zmq.bind(socket, address)
}
#' @rdname xx_rzmq_wrapper
#' @export
connect.socket <- function(socket, address){
zmq.connect(socket, address)
}
|