File: dataset-write.R

package info (click to toggle)
apache-arrow 23.0.1-1
  • links: PTS
  • area: main
  • in suites: sid
  • size: 76,220 kB
  • sloc: cpp: 654,608; python: 70,522; ruby: 45,964; ansic: 18,742; sh: 7,365; makefile: 669; javascript: 125; xml: 41
file content (423 lines) | stat: -rw-r--r-- 16,792 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

#' Write a dataset
#'
#' This function allows you to write a dataset. By writing to more efficient
#' binary storage formats, and by specifying relevant partitioning, you can
#' make it much faster to read and query.
#'
#' @param dataset [Dataset], [RecordBatch], [Table], `arrow_dplyr_query`, or
#' `data.frame`. If an `arrow_dplyr_query`, the query will be evaluated and
#' the result will be written. This means that you can `select()`, `filter()`, `mutate()`,
#' etc. to transform the data before it is written if you need to.
#' @param path string path, URI, or `SubTreeFileSystem` referencing a directory
#' to write to (directory will be created if it does not exist)
#' @param format a string identifier of the file format. Default is to use
#' "parquet" (see [FileFormat])
#' @param partitioning `Partitioning` or a character vector of columns to
#' use as partition keys (to be written as path segments). Default is to
#' use the current `group_by()` columns.
#' @param basename_template string template for the names of files to be written.
#' Must contain `"{i}"`, which will be replaced with an autoincremented
#' integer to generate basenames of datafiles. For example, `"part-{i}.arrow"`
#' will yield `"part-0.arrow", ...`.
#' If not specified, it defaults to `"part-{i}.<default extension>"`.
#' @param hive_style logical: write partition segments as Hive-style
#' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`.
#' @param existing_data_behavior The behavior to use when there is already data
#' in the destination directory.  Must be one of "overwrite", "error", or
#' "delete_matching".
#' - "overwrite" (the default) then any new files created will overwrite
#'   existing files
#' - "error" then the operation will fail if the destination directory is not
#'   empty
#' - "delete_matching" then the writer will delete any existing partitions
#'   if data is going to be written to those partitions and will leave alone
#'   partitions which data is not written to.
#' @param max_partitions maximum number of partitions any batch may be
#' written into. Default is 1024L.
#' @param max_open_files maximum number of files that can be left opened
#' during a write operation. If greater than 0 then this will limit the
#' maximum number of files that can be left open. If an attempt is made to open
#' too many files then the least recently used file will be closed.
#' If this setting is set too low you may end up fragmenting your data
#' into many small files. The default is 900 which also allows some # of files to be
#' open by the scanner before hitting the default Linux limit of 1024.
#' @param max_rows_per_file maximum number of rows per file.
#' If greater than 0 then this will limit how many rows are placed in any single file.
#' Default is 0L.
#' @param min_rows_per_group write the row groups to the disk when this number of
#' rows have accumulated. Default is 0L.
#' @param max_rows_per_group maximum rows allowed in a single
#' group and when this number of rows is exceeded, it is split and the next set
#' of rows is written to the next group. This value must be set such that it is
#' greater than `min_rows_per_group`. Default is 1024 * 1024.
#' @param create_directory whether to create the directories written into.
#' Requires appropriate permissions on the storage backend. If set to FALSE,
#' directories are assumed to be already present if writing on a classic
#' hierarchical filesystem. Default is TRUE
#' @param ... additional format-specific arguments. For available Parquet
#' options, see [write_parquet()]. The available Feather options are:
#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
#'   versions 0.14 and lower can read it. Default is `FALSE`. You can also
#'   enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`.
#' - `metadata_version`: A string like "V5" or the equivalent integer indicating
#'   the Arrow IPC MetadataVersion. Default (`NULL`) will use the latest version,
#'   unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in
#'   which case it will be V4.
#' - `codec`: A [Codec] which will be used to compress body buffers of written
#'   files. Default (NULL) will not compress body buffers.
#' - `null_fallback`: character to be used in place of missing values (`NA` or
#' `NULL`) when using Hive-style partitioning. See [hive_partition()].
#' @return The input `dataset`, invisibly
#' @examplesIf arrow_with_dataset() & arrow_with_parquet() & requireNamespace("dplyr", quietly = TRUE)
#' # You can write datasets partitioned by the values in a column (here: "cyl").
#' # This creates a structure of the form cyl=X/part-Z.parquet.
#' one_level_tree <- tempfile()
#' write_dataset(mtcars, one_level_tree, partitioning = "cyl")
#' list.files(one_level_tree, recursive = TRUE)
#'
#' # You can also partition by the values in multiple columns
#' # (here: "cyl" and "gear").
#' # This creates a structure of the form cyl=X/gear=Y/part-Z.parquet.
#' two_levels_tree <- tempfile()
#' write_dataset(mtcars, two_levels_tree, partitioning = c("cyl", "gear"))
#' list.files(two_levels_tree, recursive = TRUE)
#'
#' # In the two previous examples we would have:
#' # X = {4,6,8}, the number of cylinders.
#' # Y = {3,4,5}, the number of forward gears.
#' # Z = {0,1,2}, the number of saved parts, starting from 0.
#'
#' # You can obtain the same result as as the previous examples using arrow with
#' # a dplyr pipeline. This will be the same as two_levels_tree above, but the
#' # output directory will be different.
#' library(dplyr)
#' two_levels_tree_2 <- tempfile()
#' mtcars |>
#'   group_by(cyl, gear) |>
#'   write_dataset(two_levels_tree_2)
#' list.files(two_levels_tree_2, recursive = TRUE)
#'
#' # And you can also turn off the Hive-style directory naming where the column
#' # name is included with the values by using `hive_style = FALSE`.
#'
#' # Write a structure X/Y/part-Z.parquet.
#' two_levels_tree_no_hive <- tempfile()
#' mtcars |>
#'   group_by(cyl, gear) |>
#'   write_dataset(two_levels_tree_no_hive, hive_style = FALSE)
#' list.files(two_levels_tree_no_hive, recursive = TRUE)
#' @export
write_dataset <- function(
  dataset,
  path,
  format = c("parquet", "feather", "arrow", "ipc", "csv", "tsv", "txt", "text"),
  partitioning = dplyr::group_vars(dataset),
  basename_template = paste0("part-{i}.", as.character(format)),
  hive_style = TRUE,
  existing_data_behavior = c("overwrite", "error", "delete_matching"),
  max_partitions = 1024L,
  max_open_files = 900L,
  max_rows_per_file = 0L,
  min_rows_per_group = 0L,
  max_rows_per_group = bitwShiftL(1, 20),
  create_directory = TRUE,
  ...
) {
  format <- match.arg(format)
  if (format %in% c("feather", "ipc")) {
    format <- "arrow"
  }
  if (inherits(dataset, "arrow_dplyr_query")) {
    # partitioning vars need to be in the `select` schema
    dataset <- ensure_group_vars(dataset)
  } else {
    check_named_cols(dataset)
    if (inherits(dataset, "grouped_df")) {
      force(partitioning)
      # Drop the grouping metadata before writing; we've already consumed it
      # now to construct `partitioning` and don't want it in the metadata$r
      dataset <- dplyr::ungroup(dataset)
    }
    dataset <- as_adq(dataset)
  }

  plan <- ExecPlan$create()
  on.exit(plan$.unsafe_delete())

  final_node <- plan$Build(dataset)
  if (!is.null(final_node$extras$sort %||% final_node$extras$head %||% final_node$extras$tail)) {
    # Because sorting and topK are only handled in the SinkNode (or in R!),
    # they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan
    # to capture those, and then create a new plan for writing
    # TODO(ARROW-15681): do sorting in WriteNode in C++
    dataset <- as_adq(plan$Run(final_node))
    plan <- ExecPlan$create()
    final_node <- plan$Build(dataset)
  }

  if (!inherits(partitioning, "Partitioning")) {
    partition_schema <- final_node$schema[partitioning]
    if (isTRUE(hive_style)) {
      partitioning <- HivePartitioning$create(
        partition_schema,
        null_fallback = list(...)$null_fallback
      )
    } else {
      partitioning <- DirectoryPartitioning$create(partition_schema)
    }
  }

  path_and_fs <- get_path_and_filesystem(path)

  dots <- list(...)
  if (format %in% c("txt", "text") && !any(c("delimiter", "delim") %in% names(dots))) {
    stop("A delimiter must be given for a txt format.")
  }
  if (format == "tsv" && any(c("delimiter", "delim") %in% names(dots))) {
    stop("Can't set a delimiter for the tsv format.")
  }

  output_schema <- final_node$schema
  # This is a workaround because CsvFileFormat$create defaults the delimiter to ","
  if (format == "tsv") {
    options <- FileWriteOptions$create(
      format,
      column_names = names(output_schema),
      delimiter = "\t",
      ...
    )
  } else {
    options <- FileWriteOptions$create(
      format,
      column_names = names(output_schema),
      ...
    )
  }

  # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
  # and encapsulate this logic better
  existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
  existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L

  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
    max_rows_per_group <- max_rows_per_file
  }

  validate_positive_int_value(max_partitions)
  validate_positive_int_value(max_open_files)
  validate_positive_int_value(min_rows_per_group)
  validate_positive_int_value(max_rows_per_group)

  plan$Write(
    final_node,
    options,
    path_and_fs$fs,
    path_and_fs$path,
    partitioning,
    basename_template,
    existing_data_behavior,
    max_partitions,
    max_open_files,
    max_rows_per_file,
    min_rows_per_group,
    max_rows_per_group,
    create_directory
  )
}

#' Write a dataset into partitioned flat files.
#'
#' The `write_*_dataset()` are a family of wrappers around [write_dataset] to allow for easy switching
#' between functions for writing datasets.
#'
#' @inheritParams write_dataset
#' @param col_names Whether to write an initial header line with column names.
#' @param batch_size Maximum number of rows processed at a time. Default is 1024L.
#' @param delim Delimiter used to separate values. Defaults to `","` for `write_delim_dataset()` and
#' `write_csv_dataset()`, and `"\t` for `write_tsv_dataset()`. Cannot be changed for `write_tsv_dataset()`.
#' @param na a character vector of strings to interpret as missing values. Quotes are not allowed in this string.
#' The default is an empty string `""`.
#' @param eol the end of line character to use for ending rows. The default is `"\n"`.
#' @param quote How to handle fields which contain characters that need to be quoted.
#' - `needed` - Enclose all strings and binary values in quotes which need them, because their CSV rendering can
#'  contain quotes itself  (the default)
#' - `all` -   Enclose all valid values in quotes. Nulls are not quoted. May cause readers to
#' interpret all values as strings if schema is inferred.
#' - `none` -   Do not enclose any values in quotes. Prevents values from containing quotes ("),
#' cell delimiters (,) or line endings (\\r, \\n), (following RFC4180). If values
#' contain these characters, an error is caused when attempting to write.
#' @return The input `dataset`, invisibly.
#'
#' @seealso [write_dataset()]
#' @export
write_delim_dataset <- function(
  dataset,
  path,
  partitioning = dplyr::group_vars(dataset),
  basename_template = "part-{i}.txt",
  hive_style = TRUE,
  existing_data_behavior = c("overwrite", "error", "delete_matching"),
  max_partitions = 1024L,
  max_open_files = 900L,
  max_rows_per_file = 0L,
  min_rows_per_group = 0L,
  max_rows_per_group = bitwShiftL(1, 20),
  col_names = TRUE,
  batch_size = 1024L,
  delim = ",",
  na = "",
  eol = "\n",
  quote = c("needed", "all", "none")
) {
  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
    max_rows_per_group <- max_rows_per_file
  }

  quoting_style_arrow_opts <- c("Needed", "AllValid", "None")
  quote <- match(match.arg(quote), c("needed", "all", "none"))
  quote <- quoting_style_arrow_opts[quote]

  write_dataset(
    dataset = dataset,
    path = path,
    format = "txt",
    partitioning = partitioning,
    basename_template = basename_template,
    hive_style = hive_style,
    existing_data_behavior = existing_data_behavior,
    max_partitions = max_partitions,
    max_open_files = max_open_files,
    max_rows_per_file = max_rows_per_file,
    min_rows_per_group = min_rows_per_group,
    max_rows_per_group = max_rows_per_group,
    include_header = col_names,
    batch_size = batch_size,
    delimiter = delim,
    null_string = na,
    eol = eol,
    quoting_style = quote
  )
}

#' @rdname write_delim_dataset
#' @export
write_csv_dataset <- function(
  dataset,
  path,
  partitioning = dplyr::group_vars(dataset),
  basename_template = "part-{i}.csv",
  hive_style = TRUE,
  existing_data_behavior = c("overwrite", "error", "delete_matching"),
  max_partitions = 1024L,
  max_open_files = 900L,
  max_rows_per_file = 0L,
  min_rows_per_group = 0L,
  max_rows_per_group = bitwShiftL(1, 20),
  col_names = TRUE,
  batch_size = 1024L,
  delim = ",",
  na = "",
  eol = "\n",
  quote = c("needed", "all", "none")
) {
  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
    max_rows_per_group <- max_rows_per_file
  }

  quoting_style_arrow_opts <- c("Needed", "AllValid", "None")
  quote <- match(match.arg(quote), c("needed", "all", "none"))
  quote <- quoting_style_arrow_opts[quote]

  write_dataset(
    dataset = dataset,
    path = path,
    format = "csv",
    partitioning = partitioning,
    basename_template = basename_template,
    hive_style = hive_style,
    existing_data_behavior = existing_data_behavior,
    max_partitions = max_partitions,
    max_open_files = max_open_files,
    max_rows_per_file = max_rows_per_file,
    min_rows_per_group = min_rows_per_group,
    max_rows_per_group = max_rows_per_group,
    include_header = col_names,
    batch_size = batch_size,
    delimiter = delim,
    null_string = na,
    eol = eol,
    quoting_style = quote
  )
}

#' @rdname write_delim_dataset
#' @export
write_tsv_dataset <- function(
  dataset,
  path,
  partitioning = dplyr::group_vars(dataset),
  basename_template = "part-{i}.tsv",
  hive_style = TRUE,
  existing_data_behavior = c("overwrite", "error", "delete_matching"),
  max_partitions = 1024L,
  max_open_files = 900L,
  max_rows_per_file = 0L,
  min_rows_per_group = 0L,
  max_rows_per_group = bitwShiftL(1, 20),
  col_names = TRUE,
  batch_size = 1024L,
  na = "",
  eol = "\n",
  quote = c("needed", "all", "none")
) {
  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
    max_rows_per_group <- max_rows_per_file
  }

  quoting_style_arrow_opts <- c("Needed", "AllValid", "None")
  quote <- match(match.arg(quote), c("needed", "all", "none"))
  quote <- quoting_style_arrow_opts[quote]

  write_dataset(
    dataset = dataset,
    path = path,
    format = "tsv",
    partitioning = partitioning,
    basename_template = basename_template,
    hive_style = hive_style,
    existing_data_behavior = existing_data_behavior,
    max_partitions = max_partitions,
    max_open_files = max_open_files,
    max_rows_per_file = max_rows_per_file,
    min_rows_per_group = min_rows_per_group,
    max_rows_per_group = max_rows_per_group,
    include_header = col_names,
    batch_size = batch_size,
    null_string = na,
    eol = eol,
    quoting_style = quote
  )
}

validate_positive_int_value <- function(value, msg) {
  if (!is_integerish(value, n = 1) || is.na(value) || value < 0) {
    abort(paste(substitute(value), "must be a positive, non-missing integer"))
  }
}