1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
#' Streaming JSON input/output
#'
#' The \code{stream_in} and \code{stream_out} functions implement line-by-line processing
#' of JSON data over a \code{\link{connection}}, such as a socket, url, file or pipe. JSON
#' streaming requires the \href{http://ndjson.org}{ndjson} format, which slightly differs
#' from \code{\link{fromJSON}} and \code{\link{toJSON}}, see details.
#'
#' Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done
#' using \strong{lines of minified JSON records}, a.k.a. \href{http://ndjson.org}{ndjson}.
#' This is pretty standard: JSON databases such as \href{https://github.com/datproject/dat}{dat}
#' or MongoDB use the same format to import/export datasets. Note that this means that the
#' total stream combined is not valid JSON itself; only the individual lines are. Also note
#' that because line-breaks are used as separators, prettified JSON is not permitted: the
#' JSON lines \emph{must} be minified. In this respect, the format is a bit different from
#' \code{\link{fromJSON}} and \code{\link{toJSON}} where all lines are part of a single JSON
#' structure with optional line breaks.
#'
#' The \code{handler} is a callback function which is called for each page (batch) of
#' JSON data with exactly one argument (usually a data frame with \code{pagesize} rows).
#' If \code{handler} is missing or \code{NULL}, a default handler is used which stores all
#' intermediate pages of data, and at the very end binds all pages together into one single
#' data frame that is returned by \code{stream_in}. When a custom \code{handler} function
#' is specified, \code{stream_in} does not store any intermediate results and always returns
#' \code{NULL}. It is then up to the \code{handler} to process or store data pages.
#' A \code{handler} function that does not store intermediate results in memory (for
#' example by writing output to another connection) results in a pipeline that can process an
#' unlimited amount of data. See example.
#'
#' Note that a vector of JSON strings already in R can parsed with \code{stream_in} by
#' creating a connection to it with \code{\link{textConnection}}.
#'
#' If a connection is not opened yet, \code{stream_in} and \code{stream_out}
#' will automatically open and later close the connection. Because R destroys connections
#' when they are closed, they cannot be reused. To use a single connection for multiple
#' calls to \code{stream_in} or \code{stream_out}, it needs to be opened
#' beforehand. See example.
#'
#' @param con a \code{\link{connection}} object. If the connection is not open,
#' \code{stream_in} and \code{stream_out} will automatically open
#' and later close (and destroy) the connection. See details.
#' @param handler a custom function that is called on each page of JSON data. If not specified,
#' the default handler stores all pages and binds them into a single data frame that will be
#' returned by \code{stream_in}. See details.
#' @param x object to be streamed out. Currently only data frames are supported.
#' @param pagesize number of lines to read/write from/to the connection per iteration.
#' @param verbose print some information on what is going on.
#' @param ... arguments for \code{\link{fromJSON}} and \code{\link{toJSON}} that
#' control JSON formatting/parsing where applicable. Use with caution.
#' @name stream_in, stream_out
#' @export stream_in stream_out
#' @rdname stream_in
#' @references MongoDB export format: \url{https://docs.mongodb.com/manual/reference/program/mongoexport/}
#' @references Documentation for the JSON Lines text file format: \url{https://jsonlines.org/}
#' @seealso \code{\link{fromJSON}}, \code{\link{read_json}}
#' @return The \code{stream_out} function always returns \code{NULL}.
#' When no custom handler is specified, \code{stream_in} returns a data frame of all pages binded together.
#' When a custom handler function is specified, \code{stream_in} always returns \code{NULL}.
#' @examples # compare formats
#' x <- iris[1:3,]
#' toJSON(x)
#' stream_out(x)
#'
#' # Trivial example
#' mydata <- stream_in(url("http://httpbin.org/stream/100"))
#'
#' \dontrun{
#' #stream large dataset to file and back
#' library(nycflights13)
#' stream_out(flights, file(tmp <- tempfile()))
#' flights2 <- stream_in(file(tmp))
#' unlink(tmp)
#' all.equal(flights2, as.data.frame(flights))
#'
#' # stream over HTTP
#' diamonds2 <- stream_in(url("http://jeroen.github.io/data/diamonds.json"))
#'
#' # stream over HTTP with gzip compression
#' flights3 <- stream_in(gzcon(url("http://jeroen.github.io/data/nycflights13.json.gz")))
#' all.equal(flights3, as.data.frame(flights))
#'
#' # stream over HTTPS (HTTP+SSL) via curl
#' library(curl)
#' flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz")))
#' all.equal(flights4, as.data.frame(flights))
#'
#' # or alternatively:
#' flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz")))
#' all.equal(flights5, as.data.frame(flights))
#'
#' # Full JSON IO stream from URL to file connection.
#' # Calculate delays for flights over 1000 miles in batches of 5k
#' library(dplyr)
#' con_in <- gzcon(url("http://jeroen.github.io/data/nycflights13.json.gz"))
#' con_out <- file(tmp <- tempfile(), open = "wb")
#' stream_in(con_in, handler = function(df){
#' df <- dplyr::filter(df, distance > 1000)
#' df <- dplyr::mutate(df, delta = dep_delay - arr_delay)
#' stream_out(df, con_out, pagesize = 1000)
#' }, pagesize = 5000)
#' close(con_out)
#'
#' # stream it back in
#' mydata <- stream_in(file(tmp))
#' nrow(mydata)
#' unlink(tmp)
#'
#' # Data from http://openweathermap.org/current#bulk
#' # Each row contains a nested data frame.
#' daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50)
#' subset(daily14, city$name == "Berlin")$data[[1]]
#'
#' # Or with dplyr:
#' library(dplyr)
#' daily14f <- flatten(daily14)
#' filter(daily14f, city.name == "Berlin")$data[[1]]
#'
#' # Stream import large data from zip file
#' tmp <- tempfile()
#' download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp)
#' companies <- stream_in(unz(tmp, "companies.json"))
#' }
stream_in <- function(con, handler = NULL, pagesize = 500, verbose = TRUE, ...) {
# Maybe also handle URLs here in future.
if(!is(con, "connection")){
stop("Argument 'con' must be a connection.")
}
# Same as mongolite
count <- 0
cb <- if(is.null(handler)){
out <- new.env()
function(x){
if(length(x)){
count <<- count + length(x)
out[[as.character(count)]] <<- x
}
}
} else {
if(verbose)
message("using a custom handler function.")
function(x){
handler(post_process(x, ...))
count <<- count + length(x)
}
}
if(!isOpen(con, "r")){
if(verbose)
message("opening ", is(con) ," input connection.")
# binary connection prevents recoding of utf8 to latin1 on windows
open(con, "rb")
on.exit({
if(verbose)
message("closing ", is(con) ," input connection.")
close(con)
})
}
# Read data page by page
repeat {
page <- readLines(con, n = pagesize, encoding = "UTF-8")
if(length(page)){
cleanpage <- Filter(nchar, page)
cb(lapply(cleanpage, parseJSON))
if(verbose)
cat("\r Found", count, "records...")
}
if(length(page) < pagesize)
break
}
# Either return a big data frame, or nothing.
if(is.null(handler)){
if(verbose) cat("\r Imported", count, "records. Simplifying...\n")
out <- as.list(out, sorted = FALSE)
post_process(unlist(out[order(as.numeric(names(out)))], FALSE, FALSE), ...)
} else {
invisible()
}
}
post_process <- function(x, simplifyVector = TRUE, simplifyDataFrame = simplifyVector,
simplifyMatrix = simplifyVector, flatten = FALSE){
out <- simplify(x, simplifyVector = simplifyVector, simplifyDataFrame = simplifyDataFrame,
simplifyMatrix = simplifyMatrix, flatten = flatten)
# We assume ndjson with objects
if(isTRUE(simplifyDataFrame)){
return(as.data.frame(out))
} else {
out
}
}
#' @rdname stream_in
#' @param prefix string to write before each line (use \code{"\u001e"} to write rfc7464 text sequences)
stream_out <- function(x, con = stdout(), pagesize = 500, verbose = TRUE, prefix = "", ...) {
if(!is(con, "connection")){
# Maybe handle URLs here in future.
stop("Argument 'con' must be a connection.")
}
if(!isOpen(con, "w")){
if(verbose) message("opening ", is(con) ," output connection.")
open(con, "wb")
on.exit({
if(verbose) message("closing ", is(con) ," output connection.")
close(con)
})
}
invisible(apply_by_pages(x, stream_out_page, pagesize = pagesize, con = con, verbose = verbose, prefix = prefix, ...));
}
stream_out_page <- function(page, con, prefix, ...){
# useBytes can sometimes prevent recoding of utf8 to latin1 on windows.
# on windows there is a bug when useBytes is used with a (non binary) text connection.
str <- enc2utf8(asJSON(page, collapse = FALSE, ...))
if(is.character(prefix) && length(prefix) && nchar(prefix))
str <- paste0(prefix[1], str)
writeLines(str, con = con, useBytes = TRUE)
}
|