1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/R_zmq_poll.r
\name{Poll Functions}
\alias{Poll Functions}
\alias{zmq.poll}
\alias{zmq.poll.free}
\alias{zmq.poll.length}
\alias{zmq.poll.get.revents}
\title{Poll Functions}
\usage{
zmq.poll(socket, type, timeout = -1L, MC = ZMQ.MC())
zmq.poll.free()
zmq.poll.length()
zmq.poll.get.revents(index = 1L)
}
\arguments{
\item{socket}{a vector of ZMQ sockets}
\item{type}{a vector of socket types corresponding to \code{socket} argument}
\item{timeout}{timeout for poll, see ZeroMQ manual for details}
\item{MC}{a message control, see \code{\link{ZMQ.MC}()} for details}
\item{index}{an index of ZMQ poll items to obtain revents}
}
\value{
\code{zmq.poll()} returns a ZMQ code and an errno,
see ZeroMQ manual for details, no error/warning/interrupt in this
\code{R} function, but some error/warning/interrupt may catch by
the \code{C} function \code{zmq_poll()}.
\code{zmq.poll.length()} returns the total number of poll items
\code{zmq.poll.get.revents()} returns the revent type
}
\description{
Poll functions
}
\details{
\code{zmq.poll()} initials ZMQ poll items given ZMQ \code{socket}'s
and ZMQ poll \code{type}'s. Both \code{socket} and \code{type} are
in vectors of the same length, while \code{socket} contains socket pointers
and \code{type} contains types of poll.
See \code{\link{ZMQ.PO}()} for the possible values of
\code{type}. ZMQ defines several poll types and utilize
them to poll multiple sockets.
\code{zmq.poll.free()} frees ZMQ poll structure memory internally.
\code{zmq.poll.length()} obtains total numbers of ZMQ poll items.
\code{zmq.poll.get.revents()} obtains revent types from ZMQ poll item by
the input index.
}
\examples{
\dontrun{
### Using poll pattern.
### See demo/mspoller.r for details.
### Run next in background or the other window.
SHELL> Rscript wuserver.r &
SHELL> Rscript taskvent.r &
SHELL> Rscript mspoller.r
### The mspoller.r has next.
library(pbdZMQ, quietly = TRUE)
### Initial.
context <- zmq.ctx.new()
receiver <- zmq.socket(context, ZMQ.ST()$PULL)
zmq.connect(receiver, "tcp://localhost:5557")
subscriber <- zmq.socket(context, ZMQ.ST()$SUB)
zmq.connect(subscriber, "tcp://localhost:5556")
zmq.setsockopt(subscriber, ZMQ.SO()$SUBSCRIBE, "20993")
### Process messages from both sockets.
cat("Press Ctrl+C or Esc to stop mspoller.\n")
i.rec <- 0
i.sub <- 0
while(TRUE){
### Set poller.
zmq.poll(c(receiver, subscriber),
c(ZMQ.PO()$POLLIN, ZMQ.PO()$POLLIN))
### Check receiver.
if(bitwAnd(zmq.poll.get.revents(1), ZMQ.PO()$POLLIN)){
ret <- zmq.recv(receiver)
if(ret$len != -1){
cat("task ventilator:", ret$buf, "at", i.rec, "\n")
i.rec <- i.rec + 1
}
}
### Check subscriber.
if(bitwAnd(zmq.poll.get.revents(2), ZMQ.PO()$POLLIN)){
ret <- zmq.recv(subscriber)
if(ret$len != -1){
cat("weather update:", ret$buf, "at", i.sub, "\n")
i.sub <- i.sub + 1
}
}
if(i.rec >= 5 & i.sub >= 5){
break
}
Sys.sleep(runif(1, 0.5, 1))
}
### Finish.
zmq.poll.free()
zmq.close(receiver)
zmq.close(subscriber)
zmq.ctx.destroy(context)
}
}
\references{
ZeroMQ/4.1.0 API Reference:
\url{https://libzmq.readthedocs.io/en/zeromq4-1/}
Programming with Big Data in R Website: \url{https://pbdr.org/}
}
\seealso{
\code{\link{zmq.recv}()}, \code{\link{zmq.send}()}.
}
\author{
Wei-Chen Chen \email{wccsnow@gmail.com}.
}
\keyword{programming}
|