File: R_zmq_transfers.r

package info (click to toggle)
r-cran-pbdzmq 0.3.13%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 856 kB
  • sloc: ansic: 737; sh: 93; pascal: 30; cpp: 6; makefile: 4
file content (184 lines) | stat: -rw-r--r-- 5,322 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#' Transfer Functions for Files or Directories
#' 
#' High level functions calling \code{zmq.sendfile()} and \code{zmq.recvfile()}
#' to zip, transfer, and unzip small files or directories contains small files.
#' 
#' @details
#' \code{zmq.senddir()} calls \code{zmq.senddir()}, and
#' \code{zmq.recvdir()} calls \code{zmq.recvdir()}.
#' 
#' @param port 
#' A valid tcp port to be passed to \code{zmq.sendfile()} and
#' \code{zmq.recvfile()}.
#' @param endpoint
#' A ZMQ socket endpoint to be passed to \code{zmq.sendfile()} and
#' \code{zmq.recvfile()}.
#' @param infiles
#' The name (as a string) vector of the in files to be zipped and to be
#' sent away.
#' @param outfile
#' The name (as a string) of the out file to be saved on the disk.
#' If \code{outfile = NULL} and \code{exdir = NULL}, a tempfile will be
#' used and the tempfile nanme will be returned.
#' @param exdir
#' The name (as a string) of the out directory to save the unzip files
#' unzipped from the received \code{outfile}.
#' @param verbose
#' Logical; determines if a progress bar should be shown.
#' @param flags
#' A flag for the method used by \code{zmq_sendfile} and
#' \code{zmq_recvfile}
#' @param ctx
#' A ZMQ ctx. If \code{NULL} (default), the function will initial one at
#' the beginning and destroy it after finishing file transfer.
#' @param socket
#' A ZMQ socket based on \code{ctx}.
#' If \code{NULL} (default), the function will create one at the beginning
#' and close it after finishing file transfer.
#' 
#' 
#' @return \code{zmq.senddir()} and \code{zmq.recvdir()} return
#' number of bytes (invisible) in the sent message if successful,
#' otherwise returns -1 (invisible) and sets \code{errno} to the error
#' value, see ZeroMQ manual for details.
#' In addition, \code{zmq.recvdir()} returns a zipped file name in a list.
#' 
#' @author Wei-Chen Chen
#' 
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#' 
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#' 
#' @examples
#' \dontrun{
#' ### Run the sender and receiver code in separate R sessions.
#' 
#' ### Receiver
#' library(pbdZMQ, quietly = TRUE)
#' zmq.recvdir(55555, "localhost", outfile = "./backup_2019.zip",
#'             verbose = TRUE)
#' ### or unzip to exdir
#' # zmq.recvdir(55555, "localhost", exdir = "./backup_2019", verbose = TRUE)
#' 
#' ### Sender
#' library(pbdZMQ, quietly = TRUE)
#' zmq.senddir(55555, c("./pbdZMQ/R", "./pbdZMQ/src"), verbose = TRUE)
#' }
#' 
#' @keywords programming
#' @seealso \code{\link{zmq.sendfile}()}, \code{\link{zmq.recvfile}()}.
#' @rdname b2_sendrecvdir
#' @name Transfer Functions for Files or Directories
#' @importFrom utils unzip zip
#' 
NULL



#' @rdname b2_sendrecvdir
#' @export
zmq.senddir <- function(port, infiles, verbose = FALSE,
                        flags = ZMQ.SR()$BLOCK,
                        ctx = NULL, socket = NULL)
{
  if (is.null(socket))
  {
    if (is.null(ctx))
    { 
      ctx <- zmq.ctx.new()
      ctx.destroy <- TRUE
    }
    else
      ctx.destroy <- FALSE

    socket <- zmq.socket(ctx, ZMQ.ST()$PUSH)
    socket.close <- TRUE
    
    endpoint <- address("*", port)
    zmq.bind(socket, endpoint)
  }
  else
  {
    ctx.destroy <- FALSE
    socket.close <- FALSE
  }

  type = attr(socket, "type")
  if (is.null(type))
    stop("unable to determine socket type")
  else if (type != ZMQ.ST()$PUSH && type != ZMQ.ST()$REQ && type != ZMQ.ST()$REP)
    stop("socket type must be one of PUSH, REQ, or REP (matching PULL, REP, and REQ respectively in zmq.recvfile())")

  if (!verbose)
    extras <- "-q"
  else
    extras <- ""

  tmp.fn <- tempfile(fileext = ".zip")
  zip(tmp.fn, infiles, extras = extras)
  ret <- zmq.sendfile(port, tmp.fn, verbose = verbose, flags = flags,
                      forcebin = TRUE, ctx = ctx, socket = socket)
  
  if (socket.close || ctx.destroy)
    zmq.close(socket)

  if (ctx.destroy)
    zmq.ctx.destroy(ctx)
  
  invisible(list(ret = ret, infile = tmp.fn))
}


#' @rdname b2_sendrecvdir
#' @export
zmq.recvdir <- function(port, endpoint, outfile = NULL, exdir = NULL,
                        verbose = FALSE, flags = ZMQ.SR()$BLOCK,
                        ctx = NULL, socket = NULL)
{
  if (is.null(socket))
  {
    if (is.null(ctx))
    {
      ctx <- zmq.ctx.new()
      ctx.destroy <- TRUE
    }
    else
      ctx.destroy <- FALSE

    socket <- zmq.socket(ctx, ZMQ.ST()$PULL)
    socket.close <- TRUE

    endpoint <- address(endpoint, port)
    zmq.connect(socket, endpoint)
  }
  else
  {
    ctx.destroy <- FALSE
    socket.close <- FALSE
  }

  type = attr(socket, "type")
  if (is.null(type))
    stop("unable to determine socket type")
  else if (type != ZMQ.ST()$PULL && type != ZMQ.ST()$REP && type != ZMQ.ST()$REQ)
    stop("socket type must be one of PULL, REP, or REQ (matching PUSH, REQ, and REP respectively in zmq.sendfile())")

  if (is.null(outfile))
    outfile <- tempfile()

  ret <- zmq.recvfile(port, endpoint, outfile, verbose = verbose,
                      flags = flags, forcebin = TRUE,
                      ctx = ctx, socket = socket)

  if (!is.null(exdir))
    unzip(outfile, exdir = exdir)

  if (socket.close || ctx.destroy)
    zmq.close(socket)

  if (ctx.destroy)
    zmq.ctx.destroy(ctx)
  
  invisible(list(ret = ret, outfile = outfile))
}