File: stream.R

package info (click to toggle)
r-cran-rtweet 2.0.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 18,068 kB
  • sloc: sh: 13; makefile: 2
file content (273 lines) | stat: -rw-r--r-- 8,807 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#' Collect a live stream of Twitter data
#'
#' `r lifecycle::badge("deprecated")`
#' @description
#' Streams public statuses to a file via one of the following four methods:
#'
#' 1. Sampling a small random sample of all publicly available tweets
#' 2. Filtering via a search-like query (up to 400 keywords)
#' 3. Tracking via vector of user ids (up to 5000 user_ids)
#' 4. Location via geo coordinates (1-360 degree location boxes)
#'
#' Learn more in `vignette("stream", package = "rtweet")`
#'
#' @inheritParams lookup_users
#' @param q Query used to select and customize streaming collection
#'   method.  There are four possible methods:
#'
#'   1. The default, `q = ""`, returns a small random sample of all
#'      publicly available Twitter statuses.
#'   2. To filter by keyword, provide a comma separated character string with
#'      the desired phrase(s) and keyword(s).
#'   3. Track users by providing a comma separated list of user IDs or
#'      screen names.
#'   4. Use four latitude/longitude bounding box points to stream by geo
#'      location. This must be provided via a vector of length 4, e.g.,
#'      `c(-125, 26, -65, 49)`.
#' @param timeout Integer specifying number of seconds to stream tweets for.
#'   Stream indefinitely with `timeout = Inf`.
#'
#'   The stream can be interrupted at any time, and `file_name` will still be
#'   valid file.
#' @param file_name Character with name of file. If not specified,
#'   will write to a temporary file `stream_tweets*.json`.
#' @param append If `TRUE`, will append to the end of `file_name`; if
#'   `FALSE`, will overwrite.
#' @param verbose If `TRUE`, display a progress bar.
#' @param parse Use `FALSE` to opt-out of parsing the tweets.
#' @param ... Other arguments passed in to query parameters.
#' @seealso [filtered_stream()], [`rtweet-deprecated`]
#' @references
#' They were removed from the website.
#' @return A tibble with one row per tweet
#' @export
#' @references
#' The webpages describing how it used to work were removed.
stream_tweets <- function(q = "",
                          timeout = 30,
                          parse = TRUE,
                          token = NULL,
                          file_name = NULL,
                          verbose = TRUE,
                          append = TRUE,
                          ...) {
  lifecycle::deprecate_stop("1.1.0", "stream_tweets()", "filtered_stream()",
                            details = "The streaming endpoint it used does no longer work.")
  if (is.null(file_name)) {
    file_name <- tempfile(pattern = "stream_tweets", fileext = ".json")
    inform(paste0("Writing to '", file_name, "'"))
  }
  output <- file(file_name)

  prep <- stream_prep(token, q, ...)
  stream <- curl::curl(prep$url, handle = prep$handle)

  quiet_interrupt(download_from_stream(stream, output,
    timeout = timeout,
    verbose = verbose
  ))

  if (parse) {
    return(parse_stream(file(file_name)))
  } else {
    return(invisible(NULL))
  }
}

download_from_stream <- function(stream, output, append = TRUE, timeout = 10, verbose = TRUE) {
  if (!timeout) {
    timeout <- Inf
  }
  stopifnot(is.numeric(timeout), timeout > 0)
  stop_time <- Sys.time() + timeout

  n_seen <- 0
  if (verbose) {
    pb <- progress::progress_bar$new(
      total = NA,
      show_after = 0,
      format = "Streaming tweets: :n tweets written / :bytes / :rate / :elapsedfull"
    )
  }

  open(stream, "rb")
  withr::defer(close(stream), current_env(), priority = "first")

  open(output, if (append) "ab" else "b")
  withr::defer(close(output), current_env(), priority = "last")

  lines <- list(lines = character(), fragment = "")
  while (isIncomplete(stream) && Sys.time() < stop_time) {
    buf <- readBin(stream, raw(), 64 * 1024)
    if (length(buf) == 0) {
      if (verbose()) {
        pb$tick()
      }
      Sys.sleep(0.25)
      next
    }

    text <- rawToChar(buf)
    lines <- whole_lines(text, lines$fragment)

    # only keep tweets
    # TODO: process more messages from
    # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
    json <- lapply(lines$lines, jsonlite::fromJSON)
    is_tweet <- vapply(json, function(x) has_name(x, "created_at"), logical(1))

    n_seen <- n_seen + sum(is_tweet)
    if (verbose) {
      pb$tick(length(buf), tokens = list(n = n_seen))
    }

    writeLines(lines$lines[is_tweet], output, useBytes = TRUE)
  }

  if (verbose) {
    cat("\n")
  }

  invisible()
}

quiet_interrupt <- function(code) {
  tryCatch(code, interrupt = function(e) NULL)
}

whole_lines <- function(text, fragment = "") {
  lines <- strsplit(text, "\r\n")[[1]]
  lines[[1]] <- paste0(fragment, lines[[1]])

  n <- length(lines)
  complete <- grepl("\r\n$", text)
  if (!complete) {
    fragment <- lines[[n]]
    lines <- lines[-n]
  } else {
    fragment <- ""
  }

  # Drop empty keep-alive lines
  lines <- lines[lines != ""]

  list(lines = lines, fragment = fragment)
}

stream_prep <- function(token, q = "", ..., filter_level = "none") {
  token <- check_token(token)
  stopifnot(is.atomic(q) && !is.null(q) || inherits(q, "coords"))

  if (identical(q, "")) {
    path <- "1.1/statuses/sample.json"
    params <- NULL
  } else {
    path <- "1.1/statuses/filter.json"
    params <- stream_params(q, ..., filter_level = filter_level)
  }
  # Detect if q is too long in character size (using double of premium limit)
  # https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/introduction
  if (any(nchar(params) > 2048)) {
    warning("Detected a long parameter, this query will probably fail.", call. = FALSE)
  }

  url <- httr::modify_url("https://stream.twitter.com",
    path = path,
    query = params
  )

  handle <- curl::new_handle()
  curl::handle_setheaders(handle, .list = token$sign("GET", url)$headers)

  list(url = url, handle = handle)
}

stream_params <- function(stream, ...) {
  if (inherits(stream, "coords")) {
    params <- list(locations = paste(stream$box, collapse = ","))
  } else if ((length(stream) %% 4 == 0) && is.numeric(stream)) {
    params <- list(locations = paste(stream, collapse = ","))
  } else if (is_user_id(stream)) {
    params <- list(follow = stream, ...)
  } else {
    params <- list(track = stream, ...)
  }

  params
}


#' Parser of stream
#'
#' Converts Twitter stream data (JSON file) into parsed data frame.
#'  `r lifecycle::badge("deprecated")`
#' @param path Character, name of JSON file with data collected by
#'   [stream_tweets()].
#' @param ... Unused, keeping it for back compatibility.
#' @export
#' @seealso [stream_tweets()], [`rtweet-deprecated`]
#' @examples
#' \dontrun{
#' stream_tweets(timeout = 1, file_name = "stream.json", parse = FALSE)
#' parse_stream("stream.json")
#' }
parse_stream <- function(path, ...) {
  if (...length() > 0) {
    lifecycle::deprecate_soft("1.0.0", "parse_steam(...)",
                              details = c("Parameters ignored"))
  }

  if (!is(path, "file")) {
    path <- file(path)
  }

  tweets <- jsonlite::stream_in(path, pagesize = 1, verbose = FALSE)
  tweets <- tweet(tweets)
  tweets$created_at <- format_date(tweets$created_at)

  tweets0 <- tweet(NULL)[0, ]
  if (length(tweets) != 0) {
    tweets <- tweets[, colnames(tweets0)]
  }


  users <- user(NULL)[0, ]
  if (has_name_(tweets, "user") && length(tweets$user) != 0 && all(lengths(tweets$user) != 0)) {
    users <- do.call(rbind, tweets[["user"]])[, order(colnames(users))]
  }

  tweets <- tweets[!colnames(tweets) %in% "user"]
  users <- tibble::as_tibble(users)
  tweets <- tibble::as_tibble(tweets)

  out <- structure(tweets, users = users)
  class(out) <- c("tweets", class(out))
  out
}


# Deprecated -----------------------------------------------------------------
#' A more robust version of stream_tweets
#'
#' @description
#' `r lifecycle::badge("deprecated")`
#' Please use [stream_tweets()] instead.
#'
#' @param dir Name of directory in which json files should be written.
#'   The default, NULL, will create a timestamped "stream" folder in the
#'   current working directory. If a dir name is provided that does not
#'   already exist, one will be created.
#' @param append Logical indicating whether to append or overwrite
#'   file_name if the file already exists. Defaults to FALSE, meaning
#'   this function will overwrite the preexisting file_name (in other
#'   words, it will delete any old file with the same name as
#'   file_name) meaning the data will be added as new lines to file if
#'   pre-existing.
#' @return Returns data as expected using original search_tweets
#'   function.
#' @export
#' @keywords internal
#' @seealso [`rtweet-deprecated`]
stream_tweets2 <- function(..., dir = NULL, append = FALSE) {
  lifecycle::deprecate_stop("1.0.0", "stream_tweets2()","stream_tweets()")
}