1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
#' @title AsyncQueue
#' @description An AsyncQueue client
#' @export
#' @family async
#' @template r6
#' @examples \dontrun{
#' # Using sleep
#' reqlist <- list(
#' HttpRequest$new(url = "https://httpbin.org/get")$get(),
#' HttpRequest$new(url = "https://httpbin.org/post")$post(),
#' HttpRequest$new(url = "https://httpbin.org/put")$put(),
#' HttpRequest$new(url = "https://httpbin.org/delete")$delete(),
#' HttpRequest$new(url = "https://httpbin.org/get?g=5")$get(),
#' HttpRequest$new(
#' url = "https://httpbin.org/post")$post(body = list(y = 9)),
#' HttpRequest$new(
#' url = "https://httpbin.org/get")$get(query = list(hello = "world")),
#' HttpRequest$new(url = "https://ropensci.org")$get(),
#' HttpRequest$new(url = "https://ropensci.org/about")$get(),
#' HttpRequest$new(url = "https://ropensci.org/packages")$get(),
#' HttpRequest$new(url = "https://ropensci.org/community")$get(),
#' HttpRequest$new(url = "https://ropensci.org/blog")$get(),
#' HttpRequest$new(url = "https://ropensci.org/careers")$get()
#' )
#' out <- AsyncQueue$new(.list = reqlist, bucket_size = 5, sleep = 3)
#' out
#' out$bucket_size # bucket size
#' out$requests() # list requests
#' out$request() # make requests
#' out$responses() # list responses
#'
#' # Using requests per minute
#' if (interactive()) {
#' x="https://raw.githubusercontent.com/ropensci/roregistry/gh-pages/registry.json"
#' z <- HttpClient$new(x)$get()
#' urls <- jsonlite::fromJSON(z$parse("UTF-8"))$packages$url
#' repos = Filter(length, regmatches(urls, gregexpr("ropensci/[A-Za-z]+", urls)))
#' repos = unlist(repos)
#' auth <- list(Authorization = paste("token", Sys.getenv('GITHUB_PAT')))
#' reqs <- lapply(repos[1:50], function(w) {
#' HttpRequest$new(paste0("https://api.github.com/repos/", w), headers = auth)$get()
#' })
#'
#' out <- AsyncQueue$new(.list = reqs, req_per_min = 30)
#' out
#' out$bucket_size
#' out$requests()
#' out$request()
#' out$responses()
#' }}
AsyncQueue <- R6::R6Class(
'AsyncQueue',
inherit = AsyncVaried,
public = list(
#' @field bucket_size (integer) number of requests to send at once
bucket_size = 5,
#' @field sleep (integer) number of seconds to sleep between each bucket
sleep = NULL,
#' @field req_per_min (integer) requests per minute
req_per_min = NULL,
#' @description print method for AsyncQueue objects
#' @param x self
#' @param ... ignored
print = function(x, ...) {
super$print()
cat(paste0(" bucket_size: ", self$bucket_size), sep = "\n")
cat(paste0(" sleep: ", self$sleep), sep = "\n")
cat(paste0(" req_per_min: ", self$req_per_min), sep = "\n")
invisible(self)
},
#' @description Create a new `AsyncQueue` object
#' @param ...,.list Any number of objects of class [HttpRequest()],
#' must supply inputs to one of these parameters, but not both
#' @param bucket_size (integer) number of requests to send at once.
#' default: 5. See Details.
#' @param sleep (integer) seconds to sleep between buckets.
#' default: NULL (not set)
#' @param req_per_min (integer) maximum number of requests per minute.
#' if `NULL` (default), its ignored
#' @details Must set either `sleep` or `req_per_min`. If you set
#' `req_per_min` we calculate a new `bucket_size` when `$new()` is
#' called
#' @return A new `AsyncQueue` object
initialize = function(..., .list = list(), bucket_size = 5,
sleep = NULL, req_per_min = NULL) {
super$initialize(..., .list = .list)
self$bucket_size <- bucket_size
self$sleep <- sleep
self$req_per_min <- req_per_min
if (!xor(!is.null(self$sleep), !is.null(self$req_per_min)))
stop("must set either sleep or req_per_min to non-NULL integer")
if (!is.null(self$req_per_min)) {
self$bucket_size <- self$req_per_min
}
private$fill_buckets()
},
#' @description Execute asynchronous requests
#' @return nothing, responses stored inside object, though will print
#' messages if you choose verbose output
request = function() {
if (!is.null(self$sleep)) private$request_sleep()
if (!is.null(self$req_per_min)) private$request_rate()
},
#' @description List responses
#' @return a list of `HttpResponse` objects, empty list before
#' requests made
responses = function() {
super$output %||% list()
},
#' @description parse content
#' @param encoding (character) the encoding to use in parsing.
#' default:"UTF-8"
#' @return character vector, empty character vector before
#' requests made
parse = function(encoding = "UTF-8") {
vapply(super$output, function(z) z$parse(encoding = encoding), "")
},
#' @description Get HTTP status codes for each response
#' @return numeric vector, empty numeric vector before requests made
status_code = function() {
vapply(super$output, function(z) z$status_code, 1)
},
#' @description List HTTP status objects
#' @return a list of `http_code` objects, empty list before requests made
status = function() {
lapply(super$output, function(z) z$status_http())
},
#' @description Get raw content for each response
#' @return raw list, empty list before requests made
content = function() {
lapply(super$output, function(z) z$content)
},
#' @description curl request times
#' @return list of named numeric vectors, empty list before requests made
times = function() {
lapply(super$output, function(z) z$times)
}
),
private = list(
print_string = "<crul async queue>",
buckets = list(),
fill_buckets = function() {
x <- super$requests()
if (length(x) > 0) {
private$buckets <- split(x, ceiling(seq_along(x)/self$bucket_size))
}
},
request_sleep = function() {
for (i in seq_along(private$buckets)) {
super$output <- c(super$output, super$async_request(private$buckets[[i]]))
if (i < length(private$buckets)) Sys.sleep(self$sleep)
}
},
request_rate = function() {
for (i in seq_along(private$buckets)) {
start <- Sys.time()
super$output <- c(super$output, super$async_request(private$buckets[[i]]))
if (i < length(private$buckets)) {
now <- Sys.time()
diff_time <- now - start
if (diff_time < 60L) Sys.sleep(60L - diff_time)
}
}
}
)
)
|