1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
#' File Transfer Functions
#'
#' High level functions calling \code{zmq_send()} and \code{zmq_recv()}
#' to transfer a file in 200 KiB chunks.
#'
#' @details
#' If no socket is passed, then by default \code{zmq.sendfile()} binds a
#' \code{ZMQ_PUSH} socket, and \code{zmq.recvfile()} connects to this with a
#' \code{ZMQ_PULL} socket. On the other hand, a PUSH/PULL, REQ/REP, or REP/REQ
#' socket pairing may be passed. In that case, the socket should already be
#' connected to the desired endpoint. Be careful not to pass the wrong socket
#' combination (e.g., do not do REQ/REQ), as this can put the processes in an
#' un-recoverable state.
#'
#' @param port
#' A valid tcp port.
#' @param endpoint
#' A ZMQ socket endpoint.
#' @param filename
#' The name (as a string) of the in/out files. The in and out file names
#' can be different.
#' @param verbose
#' Logical; determines if a progress bar should be shown.
#' @param flags
#' A flag for the method used by \code{zmq_sendfile} and
#' \code{zmq_recvfile}
#' @param forcebin
#' Force to read/send/recv/write in binary form. Typically for a Windows
#' system, text (ASCII) and binary files are processed differently.
#' If \code{TRUE}, "r+b" and "w+b" will be enforced in the C code.
#' This option is mainly for Windows.
#' @param ctx
#' A ZMQ ctx. If \code{NULL} (default), the function will initial one at
#' the beginning and destroy it after finishing file transfer.
#' @param socket
#' A ZMQ socket based on \code{ctx}.
#' If \code{NULL} (default), the function will create one at the beginning
#' and close it after finishing file transfer.
#'
#'
#' @return \code{zmq.sendfile()} and \code{zmq.recvfile()} return
#' number of bytes (invisible) in the sent message if successful,
#' otherwise returns -1 (invisible) and sets \code{errno} to the error
#' value, see ZeroMQ manual for details.
#'
#' @author Drew Schmidt and Christian Heckendorf
#'
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#'
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#'
#' @examples
#' \dontrun{
#' ### Run the sender and receiver code in separate R sessions.
#'
#' # Receiver
#' library(pbdZMQ, quietly = TRUE)
#' zmq.recvfile(55555, "localhost", "/tmp/outfile", verbose=TRUE)
#'
#' # Sender
#' library(pbdZMQ, quietly = TRUE)
#' zmq.sendfile(55555, "/tmp/infile", verbose=TRUE)
#' }
#'
#' @keywords programming
#' @seealso \code{\link{zmq.msg.send}()}, \code{\link{zmq.msg.recv}()}.
#' @rdname b1_sendrecvfile
#' @name File Transfer Functions
NULL
# -----------------------------------------------------------------------------
# Send
# -----------------------------------------------------------------------------
#' @rdname b1_sendrecvfile
#' @export
zmq.sendfile <- function(port, filename, verbose=FALSE,
flags = ZMQ.SR()$BLOCK, forcebin = FALSE, ctx = NULL, socket = NULL)
{
if (is.null(socket))
{
if (is.null(ctx))
{
ctx <- zmq.ctx.new()
ctx.destroy <- TRUE
}
else
ctx.destroy <- FALSE
socket <- zmq.socket(ctx, ZMQ.ST()$PUSH)
socket.close <- TRUE
endpoint <- address("*", port)
zmq.bind(socket, endpoint)
}
else
{
ctx.destroy <- FALSE
socket.close <- FALSE
}
type = attr(socket, "type")
if (is.null(type))
stop("unable to determine socket type")
else if (type != ZMQ.ST()$PUSH && type != ZMQ.ST()$REQ && type != ZMQ.ST()$REP)
stop("socket type must be one of PUSH, REQ, or REP (matching PULL, REP, and REQ respectively in zmq.recvfile())")
fi <- file.info(filename)
if (!is.na(fi$isdir) && !fi$isdir)
filesize <- as.double(fi$size)
else
stop(paste("File does not exist:", filename))
if (type == ZMQ.ST()$REP)
receive.socket(socket)
send.socket(socket, filesize)
if (type == ZMQ.ST()$REQ)
receive.socket(socket)
ret <- .Call("R_zmq_send_file", socket, filename, as.integer(verbose),
filesize, as.integer(flags), as.integer(forcebin), type, PACKAGE="pbdZMQ")
if (socket.close || ctx.destroy)
zmq.close(socket)
if (ctx.destroy)
zmq.ctx.destroy(ctx)
invisible(ret)
}
# -----------------------------------------------------------------------------
# Receive
# -----------------------------------------------------------------------------
#' @rdname b1_sendrecvfile
#' @export
zmq.recvfile <- function(port, endpoint, filename, verbose=FALSE,
flags = ZMQ.SR()$BLOCK, forcebin = FALSE, ctx = NULL, socket = NULL)
{
if (is.null(socket))
{
if (is.null(ctx))
{
ctx <- zmq.ctx.new()
ctx.destroy <- TRUE
}
else
ctx.destroy <- FALSE
socket <- zmq.socket(ctx, ZMQ.ST()$PULL)
socket.close <- TRUE
endpoint <- address(endpoint, port)
zmq.connect(socket, endpoint)
}
else
{
ctx.destroy <- FALSE
socket.close <- FALSE
}
type = attr(socket, "type")
if (is.null(type))
stop("unable to determine socket type")
else if (type != ZMQ.ST()$PULL && type != ZMQ.ST()$REP && type != ZMQ.ST()$REQ)
stop("socket type must be one of PULL, REP, or REQ (matching PUSH, REQ, and REP respectively in zmq.sendfile())")
if (type == ZMQ.ST()$REQ)
send.socket(socket, NULL)
filesize <- receive.socket(socket)
if (type == ZMQ.ST()$REP)
send.socket(socket, NULL)
ret <- .Call("R_zmq_recv_file", socket, filename, as.integer(verbose),
filesize, as.integer(flags), as.integer(forcebin), type, PACKAGE="pbdZMQ")
if (socket.close || ctx.destroy)
zmq.close(socket)
if (ctx.destroy)
zmq.ctx.destroy(ctx)
invisible(ret)
}
|