1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/stream.R
\name{stream_in, stream_out}
\alias{stream_in, stream_out}
\alias{stream_in}
\alias{stream_out}
\title{Streaming JSON input/output}
\usage{
stream_in(con, handler = NULL, pagesize = 500, verbose = TRUE, ...)
stream_out(x, con = stdout(), pagesize = 500, verbose = TRUE, prefix = "", ...)
}
\arguments{
\item{con}{a \link{connection} object. If the connection is not open,
\code{stream_in} and \code{stream_out} will automatically open
and later close (and destroy) the connection. See details.}
\item{handler}{a custom function that is called on each page of JSON data. If not specified,
the default handler stores all pages and binds them into a single data frame that will be
returned by \code{stream_in}. See details.}
\item{pagesize}{number of lines to read/write from/to the connection per iteration.}
\item{verbose}{print some information on what is going on.}
\item{...}{arguments for \code{\link[=fromJSON]{fromJSON()}} and \code{\link[=toJSON]{toJSON()}} that
control JSON formatting/parsing where applicable. Use with caution.}
\item{x}{object to be streamed out. Currently only data frames are supported.}
\item{prefix}{string to write before each line (use \code{"\\u001e"} to write rfc7464 text sequences)}
}
\value{
The \code{stream_out} function always returns \code{NULL}.
When no custom handler is specified, \code{stream_in} returns a data frame of all pages binded together.
When a custom handler function is specified, \code{stream_in} always returns \code{NULL}.
}
\description{
The \code{stream_in} and \code{stream_out} functions implement line-by-line processing
of JSON data over a \link{connection}, such as a socket, url, file or pipe. JSON
streaming requires the \href{https://ndjson.org}{ndjson} format, which slightly differs
from \code{\link[=fromJSON]{fromJSON()}} and \code{\link[=toJSON]{toJSON()}}, see details.
}
\details{
Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done
using \strong{lines of minified JSON records}, a.k.a. \href{https://ndjson.org}{ndjson}.
This is pretty standard: JSON databases such as MongoDB use the same format to
import/export datasets. Note that this means that the
total stream combined is not valid JSON itself; only the individual lines are. Also note
that because line-breaks are used as separators, prettified JSON is not permitted: the
JSON lines \emph{must} be minified. In this respect, the format is a bit different from
\code{\link[=fromJSON]{fromJSON()}} and \code{\link[=toJSON]{toJSON()}} where all lines are part of a single JSON
structure with optional line breaks.
The \code{handler} is a callback function which is called for each page (batch) of
JSON data with exactly one argument (usually a data frame with \code{pagesize} rows).
If \code{handler} is missing or \code{NULL}, a default handler is used which stores all
intermediate pages of data, and at the very end binds all pages together into one single
data frame that is returned by \code{stream_in}. When a custom \code{handler} function
is specified, \code{stream_in} does not store any intermediate results and always returns
\code{NULL}. It is then up to the \code{handler} to process or store data pages.
A \code{handler} function that does not store intermediate results in memory (for
example by writing output to another connection) results in a pipeline that can process an
unlimited amount of data. See example.
Note that a vector of JSON strings already in R can parsed with \code{stream_in} by
creating a connection to it with \code{\link[=textConnection]{textConnection()}}.
If a connection is not opened yet, \code{stream_in} and \code{stream_out}
will automatically open and later close the connection. Because R destroys connections
when they are closed, they cannot be reused. To use a single connection for multiple
calls to \code{stream_in} or \code{stream_out}, it needs to be opened
beforehand. See example.
}
\examples{
# compare formats
x <- iris[1:3,]
toJSON(x)
stream_out(x)
# Trivial example
mydata <- stream_in(url("https://jeroen.github.io/data/iris.json"))
\dontrun{
#stream large dataset to file and back
library(nycflights13)
stream_out(flights, file(tmp <- tempfile()))
flights2 <- stream_in(file(tmp))
unlink(tmp)
all.equal(flights2, as.data.frame(flights))
# stream over HTTP
diamonds2 <- stream_in(url("https://jeroen.github.io/data/diamonds.json"))
# stream over HTTP with gzip compression
flights3 <- stream_in(gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights3, as.data.frame(flights))
# stream over HTTPS (HTTP+SSL) via curl
library(curl)
flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights4, as.data.frame(flights))
# or alternatively:
flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights5, as.data.frame(flights))
# Full JSON IO stream from URL to file connection.
# Calculate delays for flights over 1000 miles in batches of 5k
library(dplyr)
con_in <- gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz"))
con_out <- file(tmp <- tempfile(), open = "wb")
stream_in(con_in, handler = function(df){
df <- dplyr::filter(df, distance > 1000)
df <- dplyr::mutate(df, delta = dep_delay - arr_delay)
stream_out(df, con_out, pagesize = 1000)
}, pagesize = 5000)
close(con_out)
# stream it back in
mydata <- stream_in(file(tmp))
nrow(mydata)
unlink(tmp)
# Data from http://openweathermap.org/current#bulk
# Each row contains a nested data frame.
daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50)
subset(daily14, city$name == "Berlin")$data[[1]]
# Or with dplyr:
library(dplyr)
daily14f <- flatten(daily14)
filter(daily14f, city.name == "Berlin")$data[[1]]
# Stream import large data from zip file
tmp <- tempfile()
download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp)
companies <- stream_in(unz(tmp, "companies.json"))
}
}
\references{
MongoDB export format: \url{https://www.mongodb.com/docs/database-tools/mongoexport/}
Documentation for the JSON Lines text file format: \url{https://jsonlines.org/}
}
\seealso{
\code{\link[=fromJSON]{fromJSON()}}, \code{\link[=read_json]{read_json()}}
}
|