File: async-queue.R

package info (click to toggle)
r-cran-crul 1.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,620 kB
  • sloc: sh: 13; makefile: 2
file content (177 lines) | stat: -rw-r--r-- 6,486 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#' @title AsyncQueue
#' @description An AsyncQueue client
#' @export
#' @family async
#' @template r6
#' @examples \dontrun{
#' # Using sleep
#' reqlist <- list(
#'   HttpRequest$new(url = "https://httpbin.org/get")$get(),
#'   HttpRequest$new(url = "https://httpbin.org/post")$post(),
#'   HttpRequest$new(url = "https://httpbin.org/put")$put(),
#'   HttpRequest$new(url = "https://httpbin.org/delete")$delete(),
#'   HttpRequest$new(url = "https://httpbin.org/get?g=5")$get(),
#'   HttpRequest$new(
#'     url = "https://httpbin.org/post")$post(body = list(y = 9)),
#'   HttpRequest$new(
#'     url = "https://httpbin.org/get")$get(query = list(hello = "world")),
#'   HttpRequest$new(url = "https://ropensci.org")$get(),
#'   HttpRequest$new(url = "https://ropensci.org/about")$get(),
#'   HttpRequest$new(url = "https://ropensci.org/packages")$get(),
#'   HttpRequest$new(url = "https://ropensci.org/community")$get(),
#'   HttpRequest$new(url = "https://ropensci.org/blog")$get(),
#'   HttpRequest$new(url = "https://ropensci.org/careers")$get()
#' )
#' out <- AsyncQueue$new(.list = reqlist, bucket_size = 5, sleep = 3)
#' out
#' out$bucket_size # bucket size
#' out$requests() # list requests
#' out$request() # make requests
#' out$responses() # list responses
#' 
#' # Using requests per minute
#' if (interactive()) {
#' x="https://raw.githubusercontent.com/ropensci/roregistry/gh-pages/registry.json"
#' z <- HttpClient$new(x)$get()
#' urls <- jsonlite::fromJSON(z$parse("UTF-8"))$packages$url
#' repos = Filter(length, regmatches(urls, gregexpr("ropensci/[A-Za-z]+", urls)))
#' repos = unlist(repos)
#' auth <- list(Authorization = paste("token", Sys.getenv('GITHUB_PAT')))
#' reqs <- lapply(repos[1:50], function(w) {
#'   HttpRequest$new(paste0("https://api.github.com/repos/", w), headers = auth)$get()
#' })
#' 
#' out <- AsyncQueue$new(.list = reqs, req_per_min = 30)
#' out
#' out$bucket_size
#' out$requests()
#' out$request()
#' out$responses()
#' }}
AsyncQueue <- R6::R6Class(
  'AsyncQueue',
  inherit = AsyncVaried,
  public = list(
    #' @field bucket_size (integer) number of requests to send at once
    bucket_size = 5,
    #' @field sleep (integer) number of seconds to sleep between each bucket
    sleep = NULL,
    #' @field req_per_min (integer) requests per minute
    req_per_min = NULL,

    #' @description print method for AsyncQueue objects
    #' @param x self
    #' @param ... ignored
    print = function(x, ...) {
      super$print()
      cat(paste0("  bucket_size: ", self$bucket_size), sep = "\n")
      cat(paste0("  sleep: ", self$sleep), sep = "\n")
      cat(paste0("  req_per_min: ", self$req_per_min), sep = "\n")
      invisible(self)
    },

    #' @description Create a new `AsyncQueue` object
    #' @param ...,.list Any number of objects of class [HttpRequest()],
    #' must supply inputs to one of these parameters, but not both
    #' @param bucket_size (integer) number of requests to send at once.
    #' default: 5. See Details.
    #' @param sleep (integer) seconds to sleep between buckets.
    #' default: NULL (not set)
    #' @param req_per_min (integer) maximum number of requests per minute.
    #' if `NULL` (default), its ignored
    #' @details Must set either `sleep` or `req_per_min`. If you set
    #' `req_per_min` we calculate a new `bucket_size` when `$new()` is
    #' called 
    #' @return A new `AsyncQueue` object
    initialize = function(..., .list = list(), bucket_size = 5,
      sleep = NULL, req_per_min = NULL) {

      super$initialize(..., .list = .list)
      self$bucket_size <- bucket_size
      self$sleep <- sleep
      self$req_per_min <- req_per_min
      if (!xor(!is.null(self$sleep), !is.null(self$req_per_min)))
        stop("must set either sleep or req_per_min to non-NULL integer")
      if (!is.null(self$req_per_min)) {
        self$bucket_size <- self$req_per_min
      }
      private$fill_buckets()
    },

    #' @description Execute asynchronous requests
    #' @return nothing, responses stored inside object, though will print
    #' messages if you choose verbose output
    request = function() {
      if (!is.null(self$sleep)) private$request_sleep()
      if (!is.null(self$req_per_min)) private$request_rate()
    },

    #' @description List responses
    #' @return a list of `HttpResponse` objects, empty list before
    #' requests made
    responses = function() {
      super$output %||% list()
    },

    #' @description parse content
    #' @param encoding (character) the encoding to use in parsing.
    #' default:"UTF-8"
    #' @return character vector, empty character vector before
    #' requests made
    parse = function(encoding = "UTF-8") {
      vapply(super$output, function(z) z$parse(encoding = encoding), "")
    },

    #' @description Get HTTP status codes for each response
    #' @return numeric vector, empty numeric vector before requests made
    status_code = function() {
      vapply(super$output, function(z) z$status_code, 1)
    },

    #' @description List HTTP status objects
    #' @return a list of `http_code` objects, empty list before requests made
    status = function() {
      lapply(super$output, function(z) z$status_http())
    },

    #' @description Get raw content for each response
    #' @return raw list, empty list before requests made
    content = function() {
      lapply(super$output, function(z) z$content)
    },

    #' @description curl request times
    #' @return list of named numeric vectors, empty list before requests made
    times = function() {
      lapply(super$output, function(z) z$times)
    }
  ),

  private = list(
    print_string = "<crul async queue>",
    buckets = list(),
    fill_buckets = function() {
      x <- super$requests()
      if (length(x) > 0) {
        private$buckets <- split(x, ceiling(seq_along(x)/self$bucket_size))
      }
    },
    request_sleep = function() {
      for (i in seq_along(private$buckets)) {
        super$output <- c(super$output, super$async_request(private$buckets[[i]]))
        if (i < length(private$buckets)) Sys.sleep(self$sleep)
      }
    },
    request_rate = function() {
      for (i in seq_along(private$buckets)) {
        start <- Sys.time()
        super$output <- c(super$output, super$async_request(private$buckets[[i]]))
        if (i < length(private$buckets)) {
          now <- Sys.time()
          diff_time <- now - start
          if (diff_time < 60L) Sys.sleep(60L - diff_time)
        }
      }
    }
  )
)