File: stream_in.Rd

package info (click to toggle)
r-cran-jsonlite 1.9.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,340 kB
  • sloc: ansic: 3,792; sh: 9; makefile: 6
file content (148 lines) | stat: -rw-r--r-- 6,350 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/stream.R
\name{stream_in, stream_out}
\alias{stream_in, stream_out}
\alias{stream_in}
\alias{stream_out}
\title{Streaming JSON input/output}
\usage{
stream_in(con, handler = NULL, pagesize = 500, verbose = TRUE, ...)

stream_out(x, con = stdout(), pagesize = 500, verbose = TRUE, prefix = "", ...)
}
\arguments{
\item{con}{a \link{connection} object. If the connection is not open,
\code{stream_in} and \code{stream_out} will automatically open
and later close (and destroy) the connection. See details.}

\item{handler}{a custom function that is called on each page of JSON data. If not specified,
the default handler stores all pages and binds them into a single data frame that will be
returned by \code{stream_in}. See details.}

\item{pagesize}{number of lines to read/write from/to the connection per iteration.}

\item{verbose}{print some information on what is going on.}

\item{...}{arguments for \code{\link[=fromJSON]{fromJSON()}} and \code{\link[=toJSON]{toJSON()}} that
control JSON formatting/parsing where applicable. Use with caution.}

\item{x}{object to be streamed out. Currently only data frames are supported.}

\item{prefix}{string to write before each line (use \code{"\\u001e"} to write rfc7464 text sequences)}
}
\value{
The \code{stream_out} function always returns \code{NULL}.
When no custom handler is specified, \code{stream_in} returns a data frame of all pages binded together.
When a custom handler function is specified, \code{stream_in} always returns \code{NULL}.
}
\description{
The \code{stream_in} and \code{stream_out} functions implement line-by-line processing
of JSON data over a \link{connection}, such as a socket, url, file or pipe. JSON
streaming requires the \href{https://ndjson.org}{ndjson} format, which slightly differs
from \code{\link[=fromJSON]{fromJSON()}} and \code{\link[=toJSON]{toJSON()}}, see details.
}
\details{
Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done
using \strong{lines of minified JSON records}, a.k.a. \href{https://ndjson.org}{ndjson}.
This is pretty standard: JSON databases such as MongoDB use the same format to
import/export datasets. Note that this means that the
total stream combined is not valid JSON itself; only the individual lines are. Also note
that because line-breaks are used as separators, prettified JSON is not permitted: the
JSON lines \emph{must} be minified. In this respect, the format is a bit different from
\code{\link[=fromJSON]{fromJSON()}} and \code{\link[=toJSON]{toJSON()}} where all lines are part of a single JSON
structure with optional line breaks.

The \code{handler} is a callback function which is called for each page (batch) of
JSON data with exactly one argument (usually a data frame with \code{pagesize} rows).
If \code{handler} is missing or \code{NULL}, a default handler is used which stores all
intermediate pages of data, and at the very end binds all pages together into one single
data frame that is returned by \code{stream_in}. When a custom \code{handler} function
is specified, \code{stream_in} does not store any intermediate results and always returns
\code{NULL}. It is then up to the \code{handler} to process or store data pages.
A \code{handler} function that does not store intermediate results in memory (for
example by writing output to another connection) results in a pipeline that can process an
unlimited amount of data. See example.

Note that a vector of JSON strings already in R can parsed with \code{stream_in} by
creating a connection to it with \code{\link[=textConnection]{textConnection()}}.

If a connection is not opened yet, \code{stream_in} and \code{stream_out}
will automatically open and later close the connection. Because R destroys connections
when they are closed, they cannot be reused. To use a single connection for multiple
calls to \code{stream_in} or \code{stream_out}, it needs to be opened
beforehand. See example.
}
\examples{
# compare formats
x <- iris[1:3,]
toJSON(x)
stream_out(x)

# Trivial example
mydata <- stream_in(url("https://jeroen.github.io/data/iris.json"))

\dontrun{
#stream large dataset to file and back
library(nycflights13)
stream_out(flights, file(tmp <- tempfile()))
flights2 <- stream_in(file(tmp))
unlink(tmp)
all.equal(flights2, as.data.frame(flights))

# stream over HTTP
diamonds2 <- stream_in(url("https://jeroen.github.io/data/diamonds.json"))

# stream over HTTP with gzip compression
flights3 <- stream_in(gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights3, as.data.frame(flights))

# stream over HTTPS (HTTP+SSL) via curl
library(curl)
flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights4, as.data.frame(flights))

# or alternatively:
flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights5, as.data.frame(flights))

# Full JSON IO stream from URL to file connection.
# Calculate delays for flights over 1000 miles in batches of 5k
library(dplyr)
con_in <- gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz"))
con_out <- file(tmp <- tempfile(), open = "wb")
stream_in(con_in, handler = function(df){
  df <- dplyr::filter(df, distance > 1000)
  df <- dplyr::mutate(df, delta = dep_delay - arr_delay)
  stream_out(df, con_out, pagesize = 1000)
}, pagesize = 5000)
close(con_out)

# stream it back in
mydata <- stream_in(file(tmp))
nrow(mydata)
unlink(tmp)

# Data from http://openweathermap.org/current#bulk
# Each row contains a nested data frame.
daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50)
subset(daily14, city$name == "Berlin")$data[[1]]

# Or with dplyr:
library(dplyr)
daily14f <- flatten(daily14)
filter(daily14f, city.name == "Berlin")$data[[1]]

# Stream import large data from zip file
tmp <- tempfile()
download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp)
companies <- stream_in(unz(tmp, "companies.json"))
}
}
\references{
MongoDB export format: \url{https://www.mongodb.com/docs/database-tools/mongoexport/}

Documentation for the JSON Lines text file format: \url{https://jsonlines.org/}
}
\seealso{
\code{\link[=fromJSON]{fromJSON()}}, \code{\link[=read_json]{read_json()}}
}