File: rzmq_wrapper.r

package info (click to toggle)
r-cran-pbdzmq 0.3.13%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 856 kB
  • sloc: ansic: 737; sh: 93; pascal: 30; cpp: 6; makefile: 4
file content (138 lines) | stat: -rw-r--r-- 3,593 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#' All Wrapper Functions for rzmq
#' 
#' Wrapper functions for backwards compatibility with rzmq.  See vignette
#' for examples.
#' 
#' @details
#' \code{send.socket()}/\code{receive.socket()} send/receive messages over
#' a socket.  These are simple wrappers around \code{zmq.msg.send()} and
#' \code{zmq.msg.receive()}, respectively.
#' 
#' \code{init.context()} creates a new ZeroMQ context.  A useful wrapper
#' around \code{zmq.ctx.new()} which handles freeing memory for you, i.e.
#' \code{zmq.ctx.destroy()} will automatically be called for you.
#' 
#' \code{init.socket()} creates a ZeroMQ socket; serves as a high-level
#' binding for \code{zmq.socket()}, including handling freeing memory
#' automatically.  See also \code{ZMQ.ST()}.
#' 
#' \code{bind.socket()}:  see \code{zmq.bind()}.
#' 
#' \code{connect.socket()}:  see \code{zmq.connect()}
#' 
#' @param socket
#' A ZMQ socket.
#' @param data
#' An R object.
#' @param send.more
#' Logical; will more messages be sent?
#' @param serialize,unserialize
#' Logical; determines if serialize/unserialize should be called
#' on the sent/received data.
#' @param serialversion
#' NULL or numeric; the workspace format version to use when serializing.
#' NULL specifies the current default version. The only other supported
#' values are 2 and 3.
#' @param dont.wait
#' Logical; determines if reception is blocking.
#' @param context
#' A ZMQ context.
#' @param socket.type
#' The type of ZMQ socket as a string, of the form "ZMQ_type".  Valid 'type'
#' values are PAIR, PUB, SUB, REQ, REP, DEALER, PULL, PUSH, XPUB, XSUB, and
#' STERAM.
#' @param address
#' A valid address.  See details.
#' 
#' @author Wei-Chen Chen \email{wccsnow@@gmail.com}.
#' 
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#' 
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#' 
#' @keywords rzmq
#' @rdname xx_rzmq_wrapper
#' @name Wrapper Functions for rzmq
NULL



#' @rdname xx_rzmq_wrapper
#' @export
send.socket <- function(socket, data, send.more = FALSE, serialize = TRUE,
                        serialversion = NULL){
  if(send.more){
    flags <- ZMQ.SR()$SNDMORE
  } else{
    flags <- ZMQ.SR()$BLOCK
  }
  zmq.msg.send(data, socket, flags = flags, serialize = serialize,
               serialversion = serialversion)
}



#' @rdname xx_rzmq_wrapper
#' @export
receive.socket <- function(socket, unserialize = TRUE, dont.wait = FALSE){
  if(dont.wait){
    flags <- ZMQ.SR()$DONTWAIT
  } else{
    flags <- ZMQ.SR()$BLOCK
  }
  zmq.msg.recv(socket, flags = flags, unserialize = unserialize)
}



#' @rdname xx_rzmq_wrapper
#' @export
init.context <- function(){
  try.zmq.ctx.destroy <- function(ctx){
    invisible(suppressWarnings(zmq.ctx.destroy(ctx)))
  }
  
  ctx <- zmq.ctx.new()  
  reg.finalizer(ctx, try.zmq.ctx.destroy, onexit = TRUE)
  ctx 
}



#' @rdname xx_rzmq_wrapper
#' @export
init.socket <- function(context, socket.type){
  try.zmqt.close <- function(socket){
    invisible(suppressWarnings(zmq.close(socket)))
  }
  
  socket.type <- sub(".*_", "", socket.type)
  id <- which(names(ZMQ.ST()) == socket.type)
  if(length(id) != 1){
    stop("socket.type is not found.")
  } else{
    socket.type <- ZMQ.ST()[[id]]
  }
  
  socket <- zmq.socket(context, type = socket.type)
  reg.finalizer(socket, try.zmqt.close, onexit = TRUE)
  socket
}



#' @rdname xx_rzmq_wrapper
#' @export
bind.socket <- function(socket, address){
  zmq.bind(socket, address)
}



#' @rdname xx_rzmq_wrapper
#' @export
connect.socket <- function(socket, address){
  zmq.connect(socket, address)
}