1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
#' Poll Functions
#'
#' Poll functions
#'
#' \code{zmq.poll()} initials ZMQ poll items given ZMQ \code{socket}'s
#' and ZMQ poll \code{type}'s. Both \code{socket} and \code{type} are
#' in vectors of the same length, while \code{socket} contains socket pointers
#' and \code{type} contains types of poll.
#' See \code{\link{ZMQ.PO}()} for the possible values of
#' \code{type}. ZMQ defines several poll types and utilize
#' them to poll multiple sockets.
#'
#' \code{zmq.poll.free()} frees ZMQ poll structure memory internally.
#'
#' \code{zmq.poll.length()} obtains total numbers of ZMQ poll items.
#'
#' \code{zmq.poll.get.revents()} obtains revent types from ZMQ poll item by
#' the input index.
#'
#' @param socket
#' a vector of ZMQ sockets
#' @param type
#' a vector of socket types corresponding to \code{socket} argument
#' @param timeout
#' timeout for poll, see ZeroMQ manual for details
#' @param index
#' an index of ZMQ poll items to obtain revents
#' @param MC
#' a message control, see \code{\link{ZMQ.MC}()} for details
#'
#' @return \code{zmq.poll()} returns a ZMQ code and an errno,
#' see ZeroMQ manual for details, no error/warning/interrupt in this
#' \code{R} function, but some error/warning/interrupt may catch by
#' the \code{C} function \code{zmq_poll()}.
#' @return \code{zmq.poll.length()} returns the total number of poll items
#' @return \code{zmq.poll.get.revents()} returns the revent type
#'
#' @author Wei-Chen Chen \email{wccsnow@@gmail.com}.
#'
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#'
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#'
#' @examples
#' \dontrun{
#' ### Using poll pattern.
#' ### See demo/mspoller.r for details.
#'
#' ### Run next in background or the other window.
#' SHELL> Rscript wuserver.r &
#' SHELL> Rscript taskvent.r &
#' SHELL> Rscript mspoller.r
#'
#' ### The mspoller.r has next.
#' library(pbdZMQ, quietly = TRUE)
#'
#' ### Initial.
#' context <- zmq.ctx.new()
#' receiver <- zmq.socket(context, ZMQ.ST()$PULL)
#' zmq.connect(receiver, "tcp://localhost:5557")
#' subscriber <- zmq.socket(context, ZMQ.ST()$SUB)
#' zmq.connect(subscriber, "tcp://localhost:5556")
#' zmq.setsockopt(subscriber, ZMQ.SO()$SUBSCRIBE, "20993")
#'
#' ### Process messages from both sockets.
#' cat("Press Ctrl+C or Esc to stop mspoller.\n")
#' i.rec <- 0
#' i.sub <- 0
#' while(TRUE){
#' ### Set poller.
#' zmq.poll(c(receiver, subscriber),
#' c(ZMQ.PO()$POLLIN, ZMQ.PO()$POLLIN))
#'
#' ### Check receiver.
#' if(bitwAnd(zmq.poll.get.revents(1), ZMQ.PO()$POLLIN)){
#' ret <- zmq.recv(receiver)
#' if(ret$len != -1){
#' cat("task ventilator:", ret$buf, "at", i.rec, "\n")
#' i.rec <- i.rec + 1
#' }
#' }
#'
#' ### Check subscriber.
#' if(bitwAnd(zmq.poll.get.revents(2), ZMQ.PO()$POLLIN)){
#' ret <- zmq.recv(subscriber)
#' if(ret$len != -1){
#' cat("weather update:", ret$buf, "at", i.sub, "\n")
#' i.sub <- i.sub + 1
#' }
#' }
#'
#' if(i.rec >= 5 & i.sub >= 5){
#' break
#' }
#'
#' Sys.sleep(runif(1, 0.5, 1))
#' }
#'
#' ### Finish.
#' zmq.poll.free()
#' zmq.close(receiver)
#' zmq.close(subscriber)
#' zmq.ctx.destroy(context)
#' }
#'
#' @keywords programming
#' @seealso \code{\link{zmq.recv}()}, \code{\link{zmq.send}()}.
#' @rdname b3_poll
#' @name Poll Functions
NULL
#' @rdname b3_poll
#' @export
zmq.poll <- function(socket, type, timeout = -1L, MC = ZMQ.MC()){
if(length(socket) != length(type)){
stop("socket and type are of different length.")
}
type <- as.integer(type)
if(!all(type %in% 1:7)){
stop("type should be integers in 1 to 7.")
}
zmq.poll.free()
ret <- .Call("R_zmq_poll", socket, type, as.integer(timeout),
as.logical(MC$check.eintr),
PACKAGE = "pbdZMQ")
return(invisible(ret))
}
#' @rdname b3_poll
#' @export
zmq.poll.free <- function(){
ret <- .Call("R_zmq_poll_free", PACKAGE = "pbdZMQ")
invisible(ret)
}
#' @rdname b3_poll
#' @export
zmq.poll.length <- function(){
ret <- .Call("R_zmq_poll_length", PACKAGE = "pbdZMQ")
invisible(ret)
}
#' @rdname b3_poll
#' @export
zmq.poll.get.revents <- function(index = 1L){
if(index < 1){
stop("index is a positive interger.")
}
ret <- .Call("R_zmq_poll_get_revents", as.integer(index - 1)[1],
PACKAGE = "pbdZMQ")
invisible(ret)
}
|