File: R_zmq_transfer.r

package info (click to toggle)
r-cran-pbdzmq 0.3.13%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 856 kB
  • sloc: ansic: 737; sh: 93; pascal: 30; cpp: 6; makefile: 4
file content (188 lines) | stat: -rw-r--r-- 5,613 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#' File Transfer Functions
#' 
#' High level functions calling \code{zmq_send()} and \code{zmq_recv()}
#' to transfer a file in 200 KiB chunks.
#' 
#' @details
#' If no socket is passed, then by default \code{zmq.sendfile()} binds a
#' \code{ZMQ_PUSH} socket, and \code{zmq.recvfile()} connects to this with a
#' \code{ZMQ_PULL} socket. On the other hand, a PUSH/PULL, REQ/REP, or REP/REQ
#' socket pairing may be passed. In that case, the socket should already be
#' connected to the desired endpoint. Be careful not to pass the wrong socket
#' combination (e.g., do not do REQ/REQ), as this can put the processes in an
#' un-recoverable state.
#' 
#' @param port 
#' A valid tcp port.
#' @param endpoint
#' A ZMQ socket endpoint.
#' @param filename
#' The name (as a string) of the in/out files. The in and out file names
#' can be different.
#' @param verbose
#' Logical; determines if a progress bar should be shown.
#' @param flags
#' A flag for the method used by \code{zmq_sendfile} and
#' \code{zmq_recvfile}
#' @param forcebin
#' Force to read/send/recv/write in binary form. Typically for a Windows
#' system, text (ASCII) and binary files are processed differently.
#' If \code{TRUE}, "r+b" and "w+b" will be enforced in the C code.
#' This option is mainly for Windows.
#' @param ctx
#' A ZMQ ctx. If \code{NULL} (default), the function will initial one at
#' the beginning and destroy it after finishing file transfer.
#' @param socket
#' A ZMQ socket based on \code{ctx}.
#' If \code{NULL} (default), the function will create one at the beginning
#' and close it after finishing file transfer.
#' 
#' 
#' @return \code{zmq.sendfile()} and \code{zmq.recvfile()} return
#' number of bytes (invisible) in the sent message if successful,
#' otherwise returns -1 (invisible) and sets \code{errno} to the error
#' value, see ZeroMQ manual for details.
#' 
#' @author Drew Schmidt and Christian Heckendorf
#' 
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#' 
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#' 
#' @examples
#' \dontrun{
#' ### Run the sender and receiver code in separate R sessions.
#' 
#' # Receiver
#' library(pbdZMQ, quietly = TRUE)
#' zmq.recvfile(55555, "localhost", "/tmp/outfile", verbose=TRUE)
#' 
#' # Sender
#' library(pbdZMQ, quietly = TRUE)
#' zmq.sendfile(55555, "/tmp/infile", verbose=TRUE)
#' }
#' 
#' @keywords programming
#' @seealso \code{\link{zmq.msg.send}()}, \code{\link{zmq.msg.recv}()}.
#' @rdname b1_sendrecvfile
#' @name File Transfer Functions
NULL



# -----------------------------------------------------------------------------
# Send
# -----------------------------------------------------------------------------

#' @rdname b1_sendrecvfile
#' @export
zmq.sendfile <- function(port, filename, verbose=FALSE,
  flags = ZMQ.SR()$BLOCK, forcebin = FALSE, ctx = NULL, socket = NULL)
{
  if (is.null(socket))
  {
    if (is.null(ctx))
    {
      ctx <- zmq.ctx.new()
      ctx.destroy <- TRUE
    }
    else
      ctx.destroy <- FALSE
    
    socket <- zmq.socket(ctx, ZMQ.ST()$PUSH)
    socket.close <- TRUE
    
    endpoint <- address("*", port)
    zmq.bind(socket, endpoint)
  }
  else
  {
    ctx.destroy <- FALSE
    socket.close <- FALSE
  }
  
  type = attr(socket, "type")
  if (is.null(type))
    stop("unable to determine socket type")
  else if (type != ZMQ.ST()$PUSH && type != ZMQ.ST()$REQ && type != ZMQ.ST()$REP)
    stop("socket type must be one of PUSH, REQ, or REP (matching PULL, REP, and REQ respectively in zmq.recvfile())")
  
  fi <- file.info(filename)
  if (!is.na(fi$isdir) && !fi$isdir)
    filesize <- as.double(fi$size)
  else
    stop(paste("File does not exist:", filename)) 
  
  if (type == ZMQ.ST()$REP)
    receive.socket(socket)
  send.socket(socket, filesize)
  if (type == ZMQ.ST()$REQ)
    receive.socket(socket)
    
    ret <- .Call("R_zmq_send_file", socket, filename, as.integer(verbose),
      filesize, as.integer(flags), as.integer(forcebin), type, PACKAGE="pbdZMQ")
  
  if (socket.close || ctx.destroy)
    zmq.close(socket)
  if (ctx.destroy)
    zmq.ctx.destroy(ctx)
  
  invisible(ret)
}



# -----------------------------------------------------------------------------
# Receive
# -----------------------------------------------------------------------------

#' @rdname b1_sendrecvfile
#' @export
zmq.recvfile <- function(port, endpoint, filename, verbose=FALSE,
  flags = ZMQ.SR()$BLOCK, forcebin = FALSE, ctx = NULL, socket = NULL)
{
  if (is.null(socket))
  {
    if (is.null(ctx))
    {
      ctx <- zmq.ctx.new()
      ctx.destroy <- TRUE
    }
    else
      ctx.destroy <- FALSE
    
    socket <- zmq.socket(ctx, ZMQ.ST()$PULL)
    socket.close <- TRUE
    
    endpoint <- address(endpoint, port)
    zmq.connect(socket, endpoint)
  }
  else
  {
    ctx.destroy <- FALSE
    socket.close <- FALSE
  }
  
  type = attr(socket, "type")
  if (is.null(type))
    stop("unable to determine socket type")
  else if (type != ZMQ.ST()$PULL && type != ZMQ.ST()$REP && type != ZMQ.ST()$REQ)
    stop("socket type must be one of PULL, REP, or REQ (matching PUSH, REQ, and REP respectively in zmq.sendfile())")
  
  if (type == ZMQ.ST()$REQ)
    send.socket(socket, NULL)
  filesize <- receive.socket(socket)
  if (type == ZMQ.ST()$REP)
    send.socket(socket, NULL)
  
  ret <- .Call("R_zmq_recv_file", socket, filename, as.integer(verbose),
    filesize, as.integer(flags), as.integer(forcebin), type, PACKAGE="pbdZMQ")

  if (socket.close || ctx.destroy)
    zmq.close(socket)
  if (ctx.destroy)
    zmq.ctx.destroy(ctx)
  
  invisible(ret)
}