File: R_zmq_poll.r

package info (click to toggle)
r-cran-pbdzmq 0.3.13%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 856 kB
  • sloc: ansic: 737; sh: 93; pascal: 30; cpp: 6; makefile: 4
file content (159 lines) | stat: -rw-r--r-- 4,410 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#' Poll Functions
#' 
#' Poll functions
#' 
#' \code{zmq.poll()} initials ZMQ poll items given  ZMQ \code{socket}'s
#' and ZMQ poll \code{type}'s. Both \code{socket} and \code{type} are
#' in vectors of the same length, while \code{socket} contains socket pointers
#' and \code{type} contains types of poll.
#' See \code{\link{ZMQ.PO}()} for the possible values of
#' \code{type}. ZMQ defines several poll types and utilize
#' them to poll multiple sockets.
#' 
#' \code{zmq.poll.free()} frees ZMQ poll structure memory internally.
#'
#' \code{zmq.poll.length()} obtains total numbers of ZMQ poll items.
#'
#' \code{zmq.poll.get.revents()} obtains revent types from ZMQ poll item by
#' the input index.
#' 
#' @param socket 
#' a vector of ZMQ sockets
#' @param type 
#' a vector of socket types corresponding to \code{socket} argument
#' @param timeout
#' timeout for poll, see ZeroMQ manual for details
#' @param index
#' an index of ZMQ poll items to obtain revents
#' @param MC 
#' a message control, see \code{\link{ZMQ.MC}()} for details
#' 
#' @return \code{zmq.poll()} returns a ZMQ code and an errno,
#' see ZeroMQ manual for details, no error/warning/interrupt in this
#' \code{R} function, but some error/warning/interrupt may catch by
#' the \code{C} function \code{zmq_poll()}.
#' @return \code{zmq.poll.length()} returns the total number of poll items
#' @return \code{zmq.poll.get.revents()} returns the revent type
#' 
#' @author Wei-Chen Chen \email{wccsnow@@gmail.com}.
#' 
#' @references ZeroMQ/4.1.0 API Reference:
#' \url{http://api.zeromq.org/4-1:_start}
#' 
#' Programming with Big Data in R Website: \url{https://pbdr.org/}
#' 
#' @examples
#' \dontrun{
#' ### Using poll pattern.
#' ### See demo/mspoller.r for details.
#'
#' ### Run next in background or the other window.
#' SHELL> Rscript wuserver.r &
#' SHELL> Rscript taskvent.r &
#' SHELL> Rscript mspoller.r
#'
#' ### The mspoller.r has next.
#' library(pbdZMQ, quietly = TRUE)
#' 
#' ### Initial.
#' context <- zmq.ctx.new()
#' receiver <- zmq.socket(context, ZMQ.ST()$PULL)
#' zmq.connect(receiver, "tcp://localhost:5557")
#' subscriber <- zmq.socket(context, ZMQ.ST()$SUB)
#' zmq.connect(subscriber, "tcp://localhost:5556")
#' zmq.setsockopt(subscriber, ZMQ.SO()$SUBSCRIBE, "20993")
#' 
#' ### Process messages from both sockets.
#' cat("Press Ctrl+C or Esc to stop mspoller.\n")
#' i.rec <- 0
#' i.sub <- 0
#' while(TRUE){
#'   ### Set poller.
#'   zmq.poll(c(receiver, subscriber),
#'            c(ZMQ.PO()$POLLIN, ZMQ.PO()$POLLIN))
#' 
#'   ### Check receiver.
#'   if(bitwAnd(zmq.poll.get.revents(1), ZMQ.PO()$POLLIN)){
#'     ret <- zmq.recv(receiver)
#'     if(ret$len != -1){
#'       cat("task ventilator:", ret$buf, "at", i.rec, "\n")
#'       i.rec <- i.rec + 1
#'     }
#'   }
#' 
#'   ### Check subscriber.
#'   if(bitwAnd(zmq.poll.get.revents(2), ZMQ.PO()$POLLIN)){
#'     ret <- zmq.recv(subscriber)
#'     if(ret$len != -1){
#'       cat("weather update:", ret$buf, "at", i.sub, "\n")
#'       i.sub <- i.sub + 1
#'     }
#'   }
#' 
#'   if(i.rec >= 5 & i.sub >= 5){
#'     break
#'   }
#' 
#'   Sys.sleep(runif(1, 0.5, 1))
#' }
#' 
#' ### Finish.
#' zmq.poll.free()
#' zmq.close(receiver)
#' zmq.close(subscriber)
#' zmq.ctx.destroy(context)
#' }
#' 
#' @keywords programming
#' @seealso \code{\link{zmq.recv}()}, \code{\link{zmq.send}()}.
#' @rdname b3_poll
#' @name Poll Functions
NULL



#' @rdname b3_poll
#' @export
zmq.poll <- function(socket, type, timeout = -1L, MC = ZMQ.MC()){
  if(length(socket) != length(type)){
    stop("socket and type are of different length.")
  }

  type <- as.integer(type)
  if(!all(type %in% 1:7)){
    stop("type should be integers in 1 to 7.")
  }

  zmq.poll.free()

  ret <- .Call("R_zmq_poll", socket, type, as.integer(timeout),
               as.logical(MC$check.eintr),
               PACKAGE = "pbdZMQ")
  return(invisible(ret))
}


#' @rdname b3_poll
#' @export
zmq.poll.free <- function(){
  ret <- .Call("R_zmq_poll_free", PACKAGE = "pbdZMQ")
  invisible(ret)
}

#' @rdname b3_poll
#' @export
zmq.poll.length <- function(){
  ret <- .Call("R_zmq_poll_length", PACKAGE = "pbdZMQ")
  invisible(ret)
}

#' @rdname b3_poll
#' @export
zmq.poll.get.revents <- function(index = 1L){
  if(index < 1){
    stop("index is a positive interger.")
  }
  ret <- .Call("R_zmq_poll_get_revents", as.integer(index - 1)[1],
               PACKAGE = "pbdZMQ")
  invisible(ret)
}