1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/io/async_stream.h>
#include <aws/common/clock.h>
#include <aws/io/future.h>
#include <aws/testing/async_stream_tester.h>
#include <aws/testing/aws_test_harness.h>
#include <aws/testing/stream_tester.h>
#define ONE_SEC_IN_NS ((uint64_t)AWS_TIMESTAMP_NANOS)
#define MAX_TIMEOUT_NS (10 * ONE_SEC_IN_NS)
/* Common implementation for async_input_stream_fill_completes_on_XYZ() tests */
static int s_test_async_input_stream_read_to_fill(
struct aws_allocator *alloc,
struct aws_async_input_stream_tester_options *options) {
aws_io_library_init(alloc);
options->base.source_bytes = aws_byte_cursor_from_c_str("123456789");
struct aws_async_input_stream *async_stream = aws_async_input_stream_new_tester(alloc, options);
/* read into slightly short buffer */
struct aws_byte_buf buf;
aws_byte_buf_init(&buf, alloc, 5);
struct aws_future_bool *read_future = aws_async_input_stream_read_to_fill(async_stream, &buf);
ASSERT_TRUE(aws_future_bool_wait(read_future, MAX_TIMEOUT_NS));
ASSERT_INT_EQUALS(0, aws_future_bool_get_error(read_future));
ASSERT_BIN_ARRAYS_EQUALS("12345", 5, buf.buffer, buf.len);
bool eof = aws_future_bool_get_result(read_future);
ASSERT_FALSE(eof);
aws_future_bool_release(read_future);
/* read the rest */
buf.len = 0;
read_future = aws_async_input_stream_read_to_fill(async_stream, &buf);
ASSERT_TRUE(aws_future_bool_wait(read_future, MAX_TIMEOUT_NS));
ASSERT_INT_EQUALS(0, aws_future_bool_get_error(read_future));
ASSERT_BIN_ARRAYS_EQUALS("6789", 4, buf.buffer, buf.len);
eof = aws_future_bool_get_result(read_future);
ASSERT_TRUE(eof);
aws_future_bool_release(read_future);
/* cleanup */
aws_byte_buf_clean_up(&buf);
aws_async_input_stream_release(async_stream);
aws_io_library_clean_up();
return 0;
}
/* Test aws_async_input_stream_read_to_fill()
* Ensure it works when reads always complete on another thread. */
AWS_TEST_CASE(async_input_stream_fill_completes_on_thread, s_test_async_input_stream_fill_completes_on_thread)
static int s_test_async_input_stream_fill_completes_on_thread(struct aws_allocator *alloc, void *ctx) {
(void)ctx;
struct aws_async_input_stream_tester_options options = {
.completion_strategy = AWS_ASYNC_READ_COMPLETES_ON_ANOTHER_THREAD,
.base = {.max_bytes_per_read = 1},
};
return s_test_async_input_stream_read_to_fill(alloc, &options);
}
/* Test aws_async_input_stream_read_to_fill()
* Ensure it works when reads always complete immediately */
AWS_TEST_CASE(async_input_stream_fill_completes_immediately, s_test_async_input_stream_fill_completes_immediately)
static int s_test_async_input_stream_fill_completes_immediately(struct aws_allocator *alloc, void *ctx) {
(void)ctx;
struct aws_async_input_stream_tester_options options = {
.completion_strategy = AWS_ASYNC_READ_COMPLETES_IMMEDIATELY,
.base = {.max_bytes_per_read = 1},
};
return s_test_async_input_stream_read_to_fill(alloc, &options);
}
/* Test aws_async_input_stream_read_to_fill()
* Ensure it works when it's kinda random which thread completes the read */
AWS_TEST_CASE(async_input_stream_fill_completes_randomly, s_test_async_input_stream_fill_completes_randomly)
static int s_test_async_input_stream_fill_completes_randomly(struct aws_allocator *alloc, void *ctx) {
(void)ctx;
struct aws_async_input_stream_tester_options options = {
.completion_strategy = AWS_ASYNC_READ_COMPLETES_ON_RANDOM_THREAD,
.base = {.max_bytes_per_read = 1},
};
return s_test_async_input_stream_read_to_fill(alloc, &options);
}
/* Test aws_async_input_stream_read_to_fill()
* Ensure that it works when it takes one more read to realize we're at EOF */
AWS_TEST_CASE(async_input_stream_fill_eof_requires_extra_read, s_test_async_input_stream_fill_eof_requires_extra_read)
static int s_test_async_input_stream_fill_eof_requires_extra_read(struct aws_allocator *alloc, void *ctx) {
(void)ctx;
aws_io_library_init(alloc);
struct aws_async_input_stream_tester_options options = {
.base =
{
.source_bytes = aws_byte_cursor_from_c_str("123456789"),
.eof_requires_extra_read = true,
},
};
struct aws_async_input_stream *async_stream = aws_async_input_stream_new_tester(alloc, &options);
/* read into buffer of the exact length. we shouldn't realize it's at EOF yet */
struct aws_byte_buf buf;
aws_byte_buf_init(&buf, alloc, 9);
struct aws_future_bool *read_future = aws_async_input_stream_read_to_fill(async_stream, &buf);
ASSERT_TRUE(aws_future_bool_wait(read_future, MAX_TIMEOUT_NS));
ASSERT_INT_EQUALS(0, aws_future_bool_get_error(read_future));
ASSERT_BIN_ARRAYS_EQUALS("123456789", 9, buf.buffer, buf.len);
bool eof = aws_future_bool_get_result(read_future);
ASSERT_FALSE(eof);
aws_future_bool_release(read_future);
/* read again, get no data, but learn it's at EOF */
buf.len = 0;
read_future = aws_async_input_stream_read_to_fill(async_stream, &buf);
ASSERT_TRUE(aws_future_bool_wait(read_future, MAX_TIMEOUT_NS));
ASSERT_INT_EQUALS(0, aws_future_bool_get_error(read_future));
ASSERT_UINT_EQUALS(0, buf.len);
eof = aws_future_bool_get_result(read_future);
ASSERT_TRUE(eof);
aws_future_bool_release(read_future);
/* cleanup */
aws_byte_buf_clean_up(&buf);
aws_async_input_stream_release(async_stream);
aws_io_library_clean_up();
return 0;
}
/* Test aws_async_input_stream_read_to_fill()
* Ensure that it reports errors from an underlying read() call */
AWS_TEST_CASE(async_input_stream_fill_reports_error, s_test_async_input_stream_fill_reports_error)
static int s_test_async_input_stream_fill_reports_error(struct aws_allocator *alloc, void *ctx) {
(void)ctx;
aws_io_library_init(alloc);
struct aws_async_input_stream_tester_options options = {
.base =
{
.source_bytes = aws_byte_cursor_from_c_str("123456789"),
.max_bytes_per_read = 1,
.fail_on_nth_read = 2,
.fail_with_error_code = 999,
},
};
struct aws_async_input_stream *async_stream = aws_async_input_stream_new_tester(alloc, &options);
/* read into buffer */
struct aws_byte_buf buf;
aws_byte_buf_init(&buf, alloc, 512);
struct aws_future_bool *read_future = aws_async_input_stream_read_to_fill(async_stream, &buf);
ASSERT_TRUE(aws_future_bool_wait(read_future, MAX_TIMEOUT_NS));
ASSERT_INT_EQUALS(999, aws_future_bool_get_error(read_future));
aws_future_bool_release(read_future);
/* cleanup */
aws_byte_buf_clean_up(&buf);
aws_async_input_stream_release(async_stream);
aws_io_library_clean_up();
return 0;
}
|