1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
|
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Write a dataset
#'
#' This function allows you to write a dataset. By writing to more efficient
#' binary storage formats, and by specifying relevant partitioning, you can
#' make it much faster to read and query.
#'
#' @param dataset [Dataset], [RecordBatch], [Table], `arrow_dplyr_query`, or
#' `data.frame`. If an `arrow_dplyr_query`, the query will be evaluated and
#' the result will be written. This means that you can `select()`, `filter()`, `mutate()`,
#' etc. to transform the data before it is written if you need to.
#' @param path string path, URI, or `SubTreeFileSystem` referencing a directory
#' to write to (directory will be created if it does not exist)
#' @param format a string identifier of the file format. Default is to use
#' "parquet" (see [FileFormat])
#' @param partitioning `Partitioning` or a character vector of columns to
#' use as partition keys (to be written as path segments). Default is to
#' use the current `group_by()` columns.
#' @param basename_template string template for the names of files to be written.
#' Must contain `"{i}"`, which will be replaced with an autoincremented
#' integer to generate basenames of datafiles. For example, `"part-{i}.arrow"`
#' will yield `"part-0.arrow", ...`.
#' If not specified, it defaults to `"part-{i}.<default extension>"`.
#' @param hive_style logical: write partition segments as Hive-style
#' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`.
#' @param existing_data_behavior The behavior to use when there is already data
#' in the destination directory. Must be one of "overwrite", "error", or
#' "delete_matching".
#' - "overwrite" (the default) then any new files created will overwrite
#' existing files
#' - "error" then the operation will fail if the destination directory is not
#' empty
#' - "delete_matching" then the writer will delete any existing partitions
#' if data is going to be written to those partitions and will leave alone
#' partitions which data is not written to.
#' @param max_partitions maximum number of partitions any batch may be
#' written into. Default is 1024L.
#' @param max_open_files maximum number of files that can be left opened
#' during a write operation. If greater than 0 then this will limit the
#' maximum number of files that can be left open. If an attempt is made to open
#' too many files then the least recently used file will be closed.
#' If this setting is set too low you may end up fragmenting your data
#' into many small files. The default is 900 which also allows some # of files to be
#' open by the scanner before hitting the default Linux limit of 1024.
#' @param max_rows_per_file maximum number of rows per file.
#' If greater than 0 then this will limit how many rows are placed in any single file.
#' Default is 0L.
#' @param min_rows_per_group write the row groups to the disk when this number of
#' rows have accumulated. Default is 0L.
#' @param max_rows_per_group maximum rows allowed in a single
#' group and when this number of rows is exceeded, it is split and the next set
#' of rows is written to the next group. This value must be set such that it is
#' greater than `min_rows_per_group`. Default is 1024 * 1024.
#' @param create_directory whether to create the directories written into.
#' Requires appropriate permissions on the storage backend. If set to FALSE,
#' directories are assumed to be already present if writing on a classic
#' hierarchical filesystem. Default is TRUE
#' @param ... additional format-specific arguments. For available Parquet
#' options, see [write_parquet()]. The available Feather options are:
#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
#' versions 0.14 and lower can read it. Default is `FALSE`. You can also
#' enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`.
#' - `metadata_version`: A string like "V5" or the equivalent integer indicating
#' the Arrow IPC MetadataVersion. Default (`NULL`) will use the latest version,
#' unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in
#' which case it will be V4.
#' - `codec`: A [Codec] which will be used to compress body buffers of written
#' files. Default (NULL) will not compress body buffers.
#' - `null_fallback`: character to be used in place of missing values (`NA` or
#' `NULL`) when using Hive-style partitioning. See [hive_partition()].
#' @return The input `dataset`, invisibly
#' @examplesIf arrow_with_dataset() & arrow_with_parquet() & requireNamespace("dplyr", quietly = TRUE)
#' # You can write datasets partitioned by the values in a column (here: "cyl").
#' # This creates a structure of the form cyl=X/part-Z.parquet.
#' one_level_tree <- tempfile()
#' write_dataset(mtcars, one_level_tree, partitioning = "cyl")
#' list.files(one_level_tree, recursive = TRUE)
#'
#' # You can also partition by the values in multiple columns
#' # (here: "cyl" and "gear").
#' # This creates a structure of the form cyl=X/gear=Y/part-Z.parquet.
#' two_levels_tree <- tempfile()
#' write_dataset(mtcars, two_levels_tree, partitioning = c("cyl", "gear"))
#' list.files(two_levels_tree, recursive = TRUE)
#'
#' # In the two previous examples we would have:
#' # X = {4,6,8}, the number of cylinders.
#' # Y = {3,4,5}, the number of forward gears.
#' # Z = {0,1,2}, the number of saved parts, starting from 0.
#'
#' # You can obtain the same result as as the previous examples using arrow with
#' # a dplyr pipeline. This will be the same as two_levels_tree above, but the
#' # output directory will be different.
#' library(dplyr)
#' two_levels_tree_2 <- tempfile()
#' mtcars |>
#' group_by(cyl, gear) |>
#' write_dataset(two_levels_tree_2)
#' list.files(two_levels_tree_2, recursive = TRUE)
#'
#' # And you can also turn off the Hive-style directory naming where the column
#' # name is included with the values by using `hive_style = FALSE`.
#'
#' # Write a structure X/Y/part-Z.parquet.
#' two_levels_tree_no_hive <- tempfile()
#' mtcars |>
#' group_by(cyl, gear) |>
#' write_dataset(two_levels_tree_no_hive, hive_style = FALSE)
#' list.files(two_levels_tree_no_hive, recursive = TRUE)
#' @export
write_dataset <- function(
dataset,
path,
format = c("parquet", "feather", "arrow", "ipc", "csv", "tsv", "txt", "text"),
partitioning = dplyr::group_vars(dataset),
basename_template = paste0("part-{i}.", as.character(format)),
hive_style = TRUE,
existing_data_behavior = c("overwrite", "error", "delete_matching"),
max_partitions = 1024L,
max_open_files = 900L,
max_rows_per_file = 0L,
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
create_directory = TRUE,
...
) {
format <- match.arg(format)
if (format %in% c("feather", "ipc")) {
format <- "arrow"
}
if (inherits(dataset, "arrow_dplyr_query")) {
# partitioning vars need to be in the `select` schema
dataset <- ensure_group_vars(dataset)
} else {
check_named_cols(dataset)
if (inherits(dataset, "grouped_df")) {
force(partitioning)
# Drop the grouping metadata before writing; we've already consumed it
# now to construct `partitioning` and don't want it in the metadata$r
dataset <- dplyr::ungroup(dataset)
}
dataset <- as_adq(dataset)
}
plan <- ExecPlan$create()
on.exit(plan$.unsafe_delete())
final_node <- plan$Build(dataset)
if (!is.null(final_node$extras$sort %||% final_node$extras$head %||% final_node$extras$tail)) {
# Because sorting and topK are only handled in the SinkNode (or in R!),
# they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan
# to capture those, and then create a new plan for writing
# TODO(ARROW-15681): do sorting in WriteNode in C++
dataset <- as_adq(plan$Run(final_node))
plan <- ExecPlan$create()
final_node <- plan$Build(dataset)
}
if (!inherits(partitioning, "Partitioning")) {
partition_schema <- final_node$schema[partitioning]
if (isTRUE(hive_style)) {
partitioning <- HivePartitioning$create(
partition_schema,
null_fallback = list(...)$null_fallback
)
} else {
partitioning <- DirectoryPartitioning$create(partition_schema)
}
}
path_and_fs <- get_path_and_filesystem(path)
dots <- list(...)
if (format %in% c("txt", "text") && !any(c("delimiter", "delim") %in% names(dots))) {
stop("A delimiter must be given for a txt format.")
}
if (format == "tsv" && any(c("delimiter", "delim") %in% names(dots))) {
stop("Can't set a delimiter for the tsv format.")
}
output_schema <- final_node$schema
# This is a workaround because CsvFileFormat$create defaults the delimiter to ","
if (format == "tsv") {
options <- FileWriteOptions$create(
format,
column_names = names(output_schema),
delimiter = "\t",
...
)
} else {
options <- FileWriteOptions$create(
format,
column_names = names(output_schema),
...
)
}
# TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
# and encapsulate this logic better
existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L
if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
}
validate_positive_int_value(max_partitions)
validate_positive_int_value(max_open_files)
validate_positive_int_value(min_rows_per_group)
validate_positive_int_value(max_rows_per_group)
plan$Write(
final_node,
options,
path_and_fs$fs,
path_and_fs$path,
partitioning,
basename_template,
existing_data_behavior,
max_partitions,
max_open_files,
max_rows_per_file,
min_rows_per_group,
max_rows_per_group,
create_directory
)
}
#' Write a dataset into partitioned flat files.
#'
#' The `write_*_dataset()` are a family of wrappers around [write_dataset] to allow for easy switching
#' between functions for writing datasets.
#'
#' @inheritParams write_dataset
#' @param col_names Whether to write an initial header line with column names.
#' @param batch_size Maximum number of rows processed at a time. Default is 1024L.
#' @param delim Delimiter used to separate values. Defaults to `","` for `write_delim_dataset()` and
#' `write_csv_dataset()`, and `"\t` for `write_tsv_dataset()`. Cannot be changed for `write_tsv_dataset()`.
#' @param na a character vector of strings to interpret as missing values. Quotes are not allowed in this string.
#' The default is an empty string `""`.
#' @param eol the end of line character to use for ending rows. The default is `"\n"`.
#' @param quote How to handle fields which contain characters that need to be quoted.
#' - `needed` - Enclose all strings and binary values in quotes which need them, because their CSV rendering can
#' contain quotes itself (the default)
#' - `all` - Enclose all valid values in quotes. Nulls are not quoted. May cause readers to
#' interpret all values as strings if schema is inferred.
#' - `none` - Do not enclose any values in quotes. Prevents values from containing quotes ("),
#' cell delimiters (,) or line endings (\\r, \\n), (following RFC4180). If values
#' contain these characters, an error is caused when attempting to write.
#' @return The input `dataset`, invisibly.
#'
#' @seealso [write_dataset()]
#' @export
write_delim_dataset <- function(
dataset,
path,
partitioning = dplyr::group_vars(dataset),
basename_template = "part-{i}.txt",
hive_style = TRUE,
existing_data_behavior = c("overwrite", "error", "delete_matching"),
max_partitions = 1024L,
max_open_files = 900L,
max_rows_per_file = 0L,
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
col_names = TRUE,
batch_size = 1024L,
delim = ",",
na = "",
eol = "\n",
quote = c("needed", "all", "none")
) {
if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
}
quoting_style_arrow_opts <- c("Needed", "AllValid", "None")
quote <- match(match.arg(quote), c("needed", "all", "none"))
quote <- quoting_style_arrow_opts[quote]
write_dataset(
dataset = dataset,
path = path,
format = "txt",
partitioning = partitioning,
basename_template = basename_template,
hive_style = hive_style,
existing_data_behavior = existing_data_behavior,
max_partitions = max_partitions,
max_open_files = max_open_files,
max_rows_per_file = max_rows_per_file,
min_rows_per_group = min_rows_per_group,
max_rows_per_group = max_rows_per_group,
include_header = col_names,
batch_size = batch_size,
delimiter = delim,
null_string = na,
eol = eol,
quoting_style = quote
)
}
#' @rdname write_delim_dataset
#' @export
write_csv_dataset <- function(
dataset,
path,
partitioning = dplyr::group_vars(dataset),
basename_template = "part-{i}.csv",
hive_style = TRUE,
existing_data_behavior = c("overwrite", "error", "delete_matching"),
max_partitions = 1024L,
max_open_files = 900L,
max_rows_per_file = 0L,
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
col_names = TRUE,
batch_size = 1024L,
delim = ",",
na = "",
eol = "\n",
quote = c("needed", "all", "none")
) {
if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
}
quoting_style_arrow_opts <- c("Needed", "AllValid", "None")
quote <- match(match.arg(quote), c("needed", "all", "none"))
quote <- quoting_style_arrow_opts[quote]
write_dataset(
dataset = dataset,
path = path,
format = "csv",
partitioning = partitioning,
basename_template = basename_template,
hive_style = hive_style,
existing_data_behavior = existing_data_behavior,
max_partitions = max_partitions,
max_open_files = max_open_files,
max_rows_per_file = max_rows_per_file,
min_rows_per_group = min_rows_per_group,
max_rows_per_group = max_rows_per_group,
include_header = col_names,
batch_size = batch_size,
delimiter = delim,
null_string = na,
eol = eol,
quoting_style = quote
)
}
#' @rdname write_delim_dataset
#' @export
write_tsv_dataset <- function(
dataset,
path,
partitioning = dplyr::group_vars(dataset),
basename_template = "part-{i}.tsv",
hive_style = TRUE,
existing_data_behavior = c("overwrite", "error", "delete_matching"),
max_partitions = 1024L,
max_open_files = 900L,
max_rows_per_file = 0L,
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
col_names = TRUE,
batch_size = 1024L,
na = "",
eol = "\n",
quote = c("needed", "all", "none")
) {
if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
}
quoting_style_arrow_opts <- c("Needed", "AllValid", "None")
quote <- match(match.arg(quote), c("needed", "all", "none"))
quote <- quoting_style_arrow_opts[quote]
write_dataset(
dataset = dataset,
path = path,
format = "tsv",
partitioning = partitioning,
basename_template = basename_template,
hive_style = hive_style,
existing_data_behavior = existing_data_behavior,
max_partitions = max_partitions,
max_open_files = max_open_files,
max_rows_per_file = max_rows_per_file,
min_rows_per_group = min_rows_per_group,
max_rows_per_group = max_rows_per_group,
include_header = col_names,
batch_size = batch_size,
null_string = na,
eol = eol,
quoting_style = quote
)
}
validate_positive_int_value <- function(value, msg) {
if (!is_integerish(value, n = 1) || is.na(value) || value < 0) {
abort(paste(substitute(value), "must be a positive, non-missing integer"))
}
}
|