File: s3_parallel_input_stream.c

package info (click to toggle)
aws-crt-python 0.28.4%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 78,428 kB
  • sloc: ansic: 437,955; python: 27,657; makefile: 5,855; sh: 4,289; ruby: 208; java: 82; perl: 73; cpp: 25; xml: 11
file content (244 lines) | stat: -rw-r--r-- 8,938 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

#include "aws/s3/private/s3_parallel_input_stream.h"

#include <aws/common/file.h>
#include <aws/common/task_scheduler.h>

#include <aws/io/event_loop.h>
#include <aws/io/future.h>
#include <aws/io/stream.h>

#include <errno.h>
#include <inttypes.h>

AWS_STATIC_STRING_FROM_LITERAL(s_readonly_bytes_mode, "rb");

void aws_parallel_input_stream_init_base(
    struct aws_parallel_input_stream *stream,
    struct aws_allocator *alloc,
    const struct aws_parallel_input_stream_vtable *vtable,
    void *impl) {

    AWS_ZERO_STRUCT(*stream);
    stream->alloc = alloc;
    stream->vtable = vtable;
    stream->impl = impl;
    stream->shutdown_future = aws_future_void_new(alloc);
    aws_ref_count_init(&stream->ref_count, stream, (aws_simple_completion_callback *)vtable->destroy);
}

struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream) {
    if (stream != NULL) {
        aws_ref_count_acquire(&stream->ref_count);
    }
    return stream;
}

struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream) {
    if (stream != NULL) {
        aws_ref_count_release(&stream->ref_count);
    }
    return NULL;
}

int aws_parallel_input_stream_get_length(struct aws_parallel_input_stream *stream, int64_t *out_length) {
    if (stream->vtable->get_length) {
        return stream->vtable->get_length(stream, out_length);
    } else {
        return aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION);
    }
}

struct aws_future_bool *aws_parallel_input_stream_read(
    struct aws_parallel_input_stream *stream,
    uint64_t offset,
    size_t max_length,
    struct aws_byte_buf *dest) {
    /* Ensure the buffer has space available */
    if (dest->len == dest->capacity) {
        struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
        aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
        return future;
    }

    struct aws_future_bool *future = stream->vtable->read(stream, offset, max_length, dest);
    AWS_POSTCONDITION(future != NULL);
    return future;
}

struct aws_future_void *aws_parallel_input_stream_get_shutdown_future(struct aws_parallel_input_stream *stream) {
    return aws_future_void_acquire(stream->shutdown_future);
}

struct aws_parallel_input_stream_from_file_impl {
    struct aws_parallel_input_stream base;

    struct aws_string *file_path;
    struct aws_event_loop_group *reading_elg;

    bool direct_io_read;
};

static void s_parallel_from_file_destroy(struct aws_parallel_input_stream *stream) {
    struct aws_parallel_input_stream_from_file_impl *impl =
        AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_impl, base);

    aws_string_destroy(impl->file_path);
    aws_event_loop_group_release(impl->reading_elg);

    aws_future_void_set_result(stream->shutdown_future);
    aws_future_void_release(stream->shutdown_future);
    aws_mem_release(stream->alloc, impl);

    return;
}

struct read_task_impl {
    struct aws_parallel_input_stream *para_stream;

    struct aws_future_bool *end_future;
    uint64_t offset;
    size_t length;
    struct aws_byte_buf *dest;
};

static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
    (void)task_status;
    struct read_task_impl *read_task = arg;
    struct aws_parallel_input_stream *stream = read_task->para_stream;
    struct aws_parallel_input_stream_from_file_impl *impl =
        AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_impl, base);
    struct aws_future_bool *end_future = read_task->end_future;
    bool eof_reached = false;
    size_t actually_read = 0;
    int error_code = AWS_ERROR_SUCCESS;
    if (impl->direct_io_read) {
        /* Try direct IO. */
        if (aws_file_path_read_from_offset_direct_io(
                impl->file_path, read_task->offset, read_task->length, read_task->dest, &actually_read)) {
            if (aws_last_error() == AWS_ERROR_UNSUPPORTED_OPERATION) {
                /* Direct IO not supported, fallback to normal read */
                /* Log the warning */
                AWS_LOGF_WARN(
                    AWS_LS_S3_GENERAL,
                    "Direct IO not supported, fallback to normal read. File path: %s",
                    aws_string_c_str(impl->file_path));
                /* Set direct IO to be false to avoid extra checks. */
                impl->direct_io_read = false;
                aws_reset_error();
            } else {
                error_code = aws_last_error();
                goto finish;
            }
        } else {
            /* Succeed. */
            goto finish;
        }
    }

    if (aws_file_path_read_from_offset(
            impl->file_path, read_task->offset, read_task->length, read_task->dest, &actually_read)) {
        error_code = aws_last_error();
    }

finish:
    if (error_code != AWS_ERROR_SUCCESS) {
        aws_future_bool_set_error(end_future, error_code);
    } else {
        /* If the reading length is smaller than expected, and no error raised, we encountered the EOS. */
        /* The length is guaranteed to be not larger than the available space in the buffer.  */
        eof_reached = (actually_read < read_task->length);
        aws_future_bool_set_result(end_future, eof_reached);
    }
    aws_future_bool_release(end_future);
    aws_mem_release(stream->alloc, task);
    aws_mem_release(stream->alloc, read_task);
    aws_parallel_input_stream_release(stream);
}

struct aws_future_bool *s_parallel_from_file_read(
    struct aws_parallel_input_stream *stream,
    uint64_t offset,
    size_t max_length,
    struct aws_byte_buf *dest) {

    struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
    struct aws_parallel_input_stream_from_file_impl *impl =
        AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_impl, base);

    /* Calculate how much we can read based on available buffer space and max_length */
    size_t available_space = dest->capacity - dest->len;
    size_t length = aws_min_size(available_space, max_length);

    if (length == 0) {
        /* Nothing to read. Complete the read with success. */
        aws_future_bool_set_result(future, false);
        return future;
    }

    struct read_task_impl *read_task = aws_mem_calloc(impl->base.alloc, 1, sizeof(struct read_task_impl));

    AWS_LOGF_TRACE(AWS_LS_S3_GENERAL, "id=%p: Read %zu bytes from offset %" PRIu64 "", (void *)stream, length, offset);

    /* Initialize for one read */
    read_task->dest = dest;

    AWS_FATAL_ASSERT(dest->buffer);
    read_task->offset = offset;
    read_task->length = length;
    read_task->end_future = aws_future_bool_acquire(future);
    read_task->para_stream = aws_parallel_input_stream_acquire(&impl->base);

    struct aws_event_loop *loop = aws_event_loop_group_get_next_loop(impl->reading_elg);
    struct aws_task *task = aws_mem_calloc(impl->base.alloc, 1, sizeof(struct aws_task));
    aws_task_init(task, s_s3_parallel_from_file_read_task, read_task, "s3_parallel_read_task");
    aws_event_loop_schedule_task_now(loop, task);
    return future;
}

int s_parallel_from_file_get_length(struct aws_parallel_input_stream *stream, int64_t *length) {
    struct aws_parallel_input_stream_from_file_impl *impl =
        AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_impl, base);
    FILE *file = aws_fopen_safe(impl->file_path, s_readonly_bytes_mode);
    if (!file) {
        return AWS_OP_ERR;
    }

    int ret_val = aws_file_get_length(file, length);
    fclose(file);
    return ret_val;
}

static struct aws_parallel_input_stream_vtable s_parallel_input_stream_from_file_vtable = {
    .destroy = s_parallel_from_file_destroy,
    .read = s_parallel_from_file_read,
    .get_length = s_parallel_from_file_get_length,
};

struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
    struct aws_allocator *allocator,
    struct aws_byte_cursor file_name,
    struct aws_event_loop_group *reading_elg,
    bool direct_io_read) {

    struct aws_parallel_input_stream_from_file_impl *impl =
        aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_impl));

    aws_parallel_input_stream_init_base(&impl->base, allocator, &s_parallel_input_stream_from_file_vtable, impl);
    impl->file_path = aws_string_new_from_cursor(allocator, &file_name);
    impl->reading_elg = aws_event_loop_group_acquire(reading_elg);
    impl->direct_io_read = direct_io_read;

    if (!aws_path_exists(impl->file_path)) {
        /* If file path not exists, raise error from errno. */
        aws_translate_and_raise_io_error(errno);
        s_parallel_from_file_destroy(&impl->base);
        return NULL;
    }

    return &impl->base;
}