1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include "aws/s3/private/s3_parallel_input_stream.h"
#include <aws/common/file.h>
#include <aws/io/future.h>
#include <aws/io/stream.h>
#include <errno.h>
void aws_parallel_input_stream_init_base(
struct aws_parallel_input_stream *stream,
struct aws_allocator *alloc,
const struct aws_parallel_input_stream_vtable *vtable,
void *impl) {
AWS_ZERO_STRUCT(*stream);
stream->alloc = alloc;
stream->vtable = vtable;
stream->impl = impl;
aws_ref_count_init(&stream->ref_count, stream, (aws_simple_completion_callback *)vtable->destroy);
}
struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream) {
if (stream != NULL) {
aws_ref_count_acquire(&stream->ref_count);
}
return stream;
}
struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream) {
if (stream != NULL) {
aws_ref_count_release(&stream->ref_count);
}
return NULL;
}
struct aws_future_bool *aws_parallel_input_stream_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest) {
/* Ensure the buffer has space available */
if (dest->len == dest->capacity) {
struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
return future;
}
struct aws_future_bool *future = stream->vtable->read(stream, offset, dest);
return future;
}
struct aws_parallel_input_stream_from_file_impl {
struct aws_parallel_input_stream base;
struct aws_string *file_path;
};
static void s_para_from_file_destroy(struct aws_parallel_input_stream *stream) {
struct aws_parallel_input_stream_from_file_impl *impl = stream->impl;
aws_string_destroy(impl->file_path);
aws_mem_release(stream->alloc, impl);
}
struct aws_future_bool *s_para_from_file_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest) {
struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
struct aws_parallel_input_stream_from_file_impl *impl = stream->impl;
bool success = false;
struct aws_input_stream *file_stream = NULL;
struct aws_stream_status status = {
.is_end_of_stream = false,
.is_valid = true,
};
file_stream = aws_input_stream_new_from_file(stream->alloc, aws_string_c_str(impl->file_path));
if (!file_stream) {
goto done;
}
if (aws_input_stream_seek(file_stream, offset, AWS_SSB_BEGIN)) {
goto done;
}
/* Keep reading until fill the buffer.
* Note that we must read() after seek() to determine if we're EOF, the seek alone won't trigger it. */
while ((dest->len < dest->capacity) && !status.is_end_of_stream) {
/* Read from stream */
if (aws_input_stream_read(file_stream, dest) != AWS_OP_SUCCESS) {
goto done;
}
/* Check if stream is done */
if (aws_input_stream_get_status(file_stream, &status) != AWS_OP_SUCCESS) {
goto done;
}
}
success = true;
done:
if (success) {
aws_future_bool_set_result(future, status.is_end_of_stream);
} else {
aws_future_bool_set_error(future, aws_last_error());
}
aws_input_stream_release(file_stream);
return future;
}
static struct aws_parallel_input_stream_vtable s_parallel_input_stream_from_file_vtable = {
.destroy = s_para_from_file_destroy,
.read = s_para_from_file_read,
};
struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
struct aws_allocator *allocator,
struct aws_byte_cursor file_name) {
struct aws_parallel_input_stream_from_file_impl *impl =
aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_impl));
aws_parallel_input_stream_init_base(&impl->base, allocator, &s_parallel_input_stream_from_file_vtable, impl);
impl->file_path = aws_string_new_from_cursor(allocator, &file_name);
if (!aws_path_exists(impl->file_path)) {
/* If file path not exists, raise error from errno. */
aws_translate_and_raise_io_error(errno);
goto error;
}
return &impl->base;
error:
s_para_from_file_destroy(&impl->base);
return NULL;
}
|