File: s3_client_impl.h

package info (click to toggle)
aws-crt-python 0.20.4%2Bdfsg-1~bpo12%2B1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm-backports
  • size: 72,656 kB
  • sloc: ansic: 381,805; python: 23,008; makefile: 6,251; sh: 4,536; cpp: 699; ruby: 208; java: 77; perl: 73; javascript: 46; xml: 11
file content (503 lines) | stat: -rw-r--r-- 17,980 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
#ifndef AWS_S3_CLIENT_IMPL_H
#define AWS_S3_CLIENT_IMPL_H

/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

#include "aws/s3/s3_client.h"

#include <aws/common/atomics.h>
#include <aws/common/byte_buf.h>
#include <aws/common/hash_table.h>
#include <aws/common/linked_list.h>
#include <aws/common/mutex.h>
#include <aws/common/ref_count.h>
#include <aws/common/task_scheduler.h>
#include <aws/http/connection.h>
#include <aws/http/connection_manager.h>
#include <aws/http/proxy.h>

/* TODO automate this value in the future to prevent it from becoming out-of-sync. */
#define AWS_S3_CLIENT_VERSION "0.1.x"

struct aws_http_connection;
struct aws_http_connection_manager;
struct aws_host_resolver;
struct aws_s3_endpoint;

enum aws_s3_connection_finish_code {
    AWS_S3_CONNECTION_FINISH_CODE_SUCCESS,
    AWS_S3_CONNECTION_FINISH_CODE_FAILED,
    AWS_S3_CONNECTION_FINISH_CODE_RETRY,
};

/* Callback for the owner of the endpoint when the endpoint has completely cleaned up. */
typedef void(aws_s3_endpoint_shutdown_fn)(void *user_data);

struct aws_s3_endpoint_options {
    /* URL of the host that this endpoint refers to. */
    struct aws_string *host_name;

    /* Callback for when this endpoint completely shuts down. */
    aws_s3_endpoint_shutdown_fn *shutdown_callback;

    /* Bootstrap of the client to be used for spawning a connection manager. */
    struct aws_client_bootstrap *client_bootstrap;

    /* TLS connection options to be used for the connection manager. */
    const struct aws_tls_connection_options *tls_connection_options;

    /* DNS TTL to use for addresses for this endpoint. */
    size_t dns_host_address_ttl_seconds;

    /* Client that owns this endpoint */
    struct aws_s3_client *client;

    /* Maximum number of connections that can be spawned for this endpoint. */
    uint32_t max_connections;

    /* HTTP port override. If zero, determine port based on TLS context */
    uint32_t port;

    /**
     * Optional.
     * Proxy configuration for http connection.
     */
    struct aws_http_proxy_config *proxy_config;

    /**
     * Optional.
     * Configuration for fetching proxy configuration from environment.
     * By Default proxy_ev_settings.aws_http_proxy_env_var_type is set to AWS_HPEV_ENABLE which means read proxy
     * configuration from environment.
     * Only works when proxy_config is not set. If both are set, configuration from proxy_config is used.
     */
    struct proxy_env_var_settings *proxy_ev_settings;

    /**
     * Optional.
     * If set to 0, default value is used.
     */
    uint32_t connect_timeout_ms;

    /**
     * Optional.
     * Set keepalive to periodically transmit messages for detecting a disconnected peer.
     */
    struct aws_s3_tcp_keep_alive_options *tcp_keep_alive_options;

    /**
     * Optional.
     * Configuration options for connection monitoring.
     * If the transfer speed falls below the specified minimum_throughput_bytes_per_second, the operation is aborted.
     */
    struct aws_http_connection_monitoring_options *monitoring_options;
};

/* global vtable, only used when mocking for tests */
struct aws_s3_endpoint_system_vtable {
    void (*acquire)(struct aws_s3_endpoint *endpoint, bool already_holding_lock);
    void (*release)(struct aws_s3_endpoint *endpoint);
};

struct aws_s3_endpoint {
    struct {
        /* This is NOT an atomic ref-count.
         * The endpoint lives in hashtable: `aws_s3_client.synced_data.endpoints`
         * This ref-count can only be touched while holding client's lock */
        size_t ref_count;
    } client_synced_data;

    /* What allocator was used to create this endpoint. */
    struct aws_allocator *allocator;

    /* URL of the host that this endpoint refers to. */
    struct aws_string *host_name;

    /* Connection manager that manages all connections to this endpoint. */
    struct aws_http_connection_manager *http_connection_manager;

    /* Client that owns this endpoint */
    struct aws_s3_client *client;
};

/* Represents one connection on a particular VIP. */
struct aws_s3_connection {
    /* Endpoint that this connection is connected to. */
    struct aws_s3_endpoint *endpoint;

    /* The underlying, currently in-use HTTP connection. */
    struct aws_http_connection *http_connection;

    /* Request currently being processed on this connection. */
    struct aws_s3_request *request;

    /* Current retry token for the request. If it has never been retried, this will be NULL. */
    struct aws_retry_token *retry_token;
};

struct aws_s3_client_vtable {

    struct aws_s3_meta_request *(
        *meta_request_factory)(struct aws_s3_client *client, const struct aws_s3_meta_request_options *options);

    void (*create_connection_for_request)(struct aws_s3_client *client, struct aws_s3_request *request);

    void (*acquire_http_connection)(
        struct aws_http_connection_manager *conn_manager,
        aws_http_connection_manager_on_connection_setup_fn *on_connection_acquired_callback,
        void *user_data);

    size_t (*get_host_address_count)(
        struct aws_host_resolver *host_resolver,
        const struct aws_string *host_name,
        uint32_t flags);

    void (*schedule_process_work_synced)(struct aws_s3_client *client);

    void (*process_work)(struct aws_s3_client *client);

    void (*endpoint_shutdown_callback)(struct aws_s3_client *client);

    void (*finish_destroy)(struct aws_s3_client *client);

    struct aws_parallel_input_stream *(
        *parallel_input_stream_new_from_file)(struct aws_allocator *allocator, struct aws_byte_cursor file_name);
};

struct aws_s3_upload_part_timeout_stats {
    bool stop_timeout;

    /* Total number of successful upload requests */
    uint64_t num_successful_upload_requests;

    /* Stats for the request time of first 10 succeed requests */
    struct {
        uint64_t sum_ns;
        uint64_t num_samples;
    } initial_request_time;

    /* Track the timeout rate. */
    struct {
        uint64_t num_completed;
        uint64_t num_failed;
    } timeout_rate_tracking;

    /* Stats for the response to first byte time of tracked succeed requests */
    struct {
        uint64_t sum_ns;
        uint64_t num_samples;
    } response_to_first_byte_time;
};

/* Represents the state of the S3 client. */
struct aws_s3_client {
    struct aws_allocator *allocator;

    struct aws_s3_buffer_pool *buffer_pool;

    struct aws_s3_client_vtable *vtable;

    struct aws_ref_count ref_count;

    /* Client bootstrap for setting up connection managers. */
    struct aws_client_bootstrap *client_bootstrap;

    /* Event loop on the client bootstrap ELG for processing work/dispatching requests. */
    struct aws_event_loop *process_work_event_loop;

    /* Event loop group for streaming request bodies back to the user. */
    struct aws_event_loop_group *body_streaming_elg;

    /* Region of the S3 bucket. */
    struct aws_string *region;

    /* Size of parts for files when doing gets or puts.  This exists on the client as configurable option that is passed
     * to meta requests for use. */
    const size_t part_size;

    /* Size of parts for files when doing gets or puts.  This exists on the client as configurable option that is passed
     * to meta requests for use. */
    const uint64_t max_part_size;

    /* The size threshold in bytes for when to use multipart uploads for a AWS_S3_META_REQUEST_TYPE_PUT_OBJECT meta
     * request. Uploads over this size will automatically use a multipart upload strategy, while uploads smaller or
     * equal to this threshold will use a single request to upload the whole object. If not set, `part_size` will be
     * used as threshold. */
    const uint64_t multipart_upload_threshold;

    /* TLS Options to be used for each connection. */
    struct aws_tls_connection_options *tls_connection_options;

    /* Cached signing config. Can be NULL if no signing config was specified. */
    struct aws_cached_signing_config_aws *cached_signing_config;

    /* The auth provider for S3 Express. */
    aws_s3express_provider_factory_fn *s3express_provider_factory;
    void *factory_user_data;
    struct aws_s3express_credentials_provider *s3express_provider;

    /* Throughput target in Gbps that we are trying to reach. */
    const double throughput_target_gbps;

    /* The calculated ideal number of VIP's based on throughput target and throughput per vip. */
    const uint32_t ideal_vip_count;

    /**
     * For multi-part upload, content-md5 will be calculated if the AWS_MR_CONTENT_MD5_ENABLED is specified
     *     or initial request has content-md5 header.
     * For single-part upload, if the content-md5 header is specified, it will remain unchanged. If the header is not
     *     specified, and this is set to AWS_MR_CONTENT_MD5_ENABLED, it will be calculated. */
    const enum aws_s3_meta_request_compute_content_md5 compute_content_md5;

    /* Hard limit on max connections set through the client config. */
    const uint32_t max_active_connections_override;

    struct aws_atomic_var max_allowed_connections;

    /* Retry strategy used for scheduling request retries. */
    struct aws_retry_strategy *retry_strategy;

    /**
     * Optional.
     * Proxy configuration for http connection.
     */
    struct aws_http_proxy_config *proxy_config;

    /**
     * Optional.
     * Configuration for fetching proxy configuration from environment.
     * By Default proxy_ev_settings.aws_http_proxy_env_var_type is set to AWS_HPEV_ENABLE which means read proxy
     * configuration from environment.
     * Only works when proxy_config is not set. If both are set, configuration from proxy_config is used.
     */
    struct proxy_env_var_settings *proxy_ev_settings;

    /**
     * Optional.
     * If set to 0, default value is used.
     */
    uint32_t connect_timeout_ms;

    /**
     * Optional.
     * Set keepalive to periodically transmit messages for detecting a disconnected peer.
     */
    struct aws_s3_tcp_keep_alive_options *tcp_keep_alive_options;

    /**
     * Configuration options for connection monitoring.
     * If the transfer speed falls below the specified minimum_throughput_bytes_per_second, the operation is aborted.
     * If user passes in NULL, default values are used.
     */
    struct aws_http_connection_monitoring_options monitoring_options;

    /* tls options from proxy environment settings. */
    struct aws_tls_connection_options *proxy_ev_tls_options;

    /* Shutdown callbacks to notify when the client is completely cleaned up. */
    aws_s3_client_shutdown_complete_callback_fn *shutdown_callback;
    void *shutdown_callback_user_data;

    /* Whether read backpressure (aka flow-control window) is being applied. */
    const bool enable_read_backpressure;

    /* The starting size of each meta request's flow-control window, in bytes.
     * Ignored unless `enable_read_backpressure` is true. */
    const size_t initial_read_window;

    /**
     * Timeout in ms for upload request for request after sending to the response first byte received.
     */
    struct aws_atomic_var upload_timeout_ms;

    struct {
        /* Number of overall requests currently being processed by the client. */
        struct aws_atomic_var num_requests_in_flight;

        /* Number of requests being sent/received over network. */
        struct aws_atomic_var num_requests_network_io[AWS_S3_META_REQUEST_TYPE_MAX];

        /* Number of requests sitting in their meta request priority queue, waiting to be streamed. */
        struct aws_atomic_var num_requests_stream_queued_waiting;

        /* Number of requests currently scheduled to be streamed the response body or are actively being streamed. */
        struct aws_atomic_var num_requests_streaming_response;
    } stats;

    struct {
        struct aws_mutex lock;

        /* Hash table of endpoints that are in-use by the client.
         * Key: aws_string of endpoint hostname. Value: aws_s3_endpoint */
        struct aws_hash_table endpoints;

        /* How many requests failed to be prepared. */
        uint32_t num_failed_prepare_requests;

        /* Meta requests that need added in the work event loop.
         * List contains aws_s3_meta_request_work */
        struct aws_linked_list pending_meta_request_work;

        /* aws_s3_request that are prepared and ready to be put in the threaded_data request queue. */
        struct aws_linked_list prepared_requests;

        /* Task for processing requests from meta requests on connections. */
        struct aws_task process_work_task;

        /* Task for trimming buffer bool. */
        struct aws_task trim_buffer_pool_task;

        /* Number of endpoints currently allocated. Used during clean up to know how many endpoints are still in
         * memory.*/
        uint32_t num_endpoints_allocated;

        /* Whether or not the client has started cleaning up all of its resources */
        uint32_t active : 1;

        /* True if the start_destroy function is still executing, which blocks shutdown from completing. */
        uint32_t start_destroy_executing : 1;

        /* Whether or not work processing is currently scheduled. */
        uint32_t process_work_task_scheduled : 1;

        /* Whether or not work process is currently in progress. */
        uint32_t process_work_task_in_progress : 1;

        /* Whether or not the body streaming ELG is allocated. If the body streaming ELG is NULL, but this is true, the
         * shutdown callback has not yet been called.*/
        uint32_t body_streaming_elg_allocated : 1;

        /* Whether or not a S3 Express provider is active with the client.*/
        uint32_t s3express_provider_active : 1;

        /* True if client has been flagged to finish destroying itself. Used to catch double-destroy bugs.*/
        uint32_t finish_destroy : 1;

        struct aws_s3_upload_part_timeout_stats upload_part_stats;
    } synced_data;

    struct {
        /* Queue of prepared aws_s3_request that are waiting to be assigned to connections. */
        struct aws_linked_list request_queue;

        /* Client list of ongoing aws_s3_meta_requests. */
        struct aws_linked_list meta_requests;

        /* Number of requests in the request_queue linked_list. */
        uint32_t request_queue_size;

        /* Number of requests currently being prepared. */
        uint32_t num_requests_being_prepared;

        /* Whether or not work processing is currently scheduled. */
        uint32_t trim_buffer_pool_task_scheduled : 1;
    } threaded_data;
};

struct aws_s3_meta_request_resume_token {
    struct aws_allocator *allocator;
    struct aws_ref_count ref_count;

    enum aws_s3_meta_request_type type;

    /* Note: since pause currently only supports upload, this structure only has
        upload specific fields. Extending it to support other types is left as
        exercise for future. */
    struct aws_string *multipart_upload_id;
    size_t part_size;
    size_t total_num_parts;

    /* Note: this field is used only when s3 tells us that upload id no longer
    exists, and if this indicates that all parts have already been uploaded,
    request is completed instead of failing it.*/
    size_t num_parts_completed;
};

void aws_s3_client_notify_connection_finished(
    struct aws_s3_client *client,
    struct aws_s3_connection *connection,
    int error_code,
    enum aws_s3_connection_finish_code finish_code);

AWS_EXTERN_C_BEGIN

AWS_S3_API
struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new(struct aws_allocator *allocator);

AWS_S3_API
void aws_s3_set_dns_ttl(size_t ttl);

AWS_S3_API
uint32_t aws_s3_client_get_max_requests_prepare(struct aws_s3_client *client);

AWS_S3_API
uint32_t aws_s3_client_get_max_active_connections(
    struct aws_s3_client *client,
    struct aws_s3_meta_request *meta_request);

AWS_S3_API
uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client);

AWS_S3_API
uint32_t aws_s3_client_queue_requests_threaded(
    struct aws_s3_client *client,
    struct aws_linked_list *request_list,
    bool queue_front);

AWS_S3_API
struct aws_s3_request *aws_s3_client_dequeue_request_threaded(struct aws_s3_client *client);

AWS_S3_API
void aws_s3_client_schedule_process_work(struct aws_s3_client *client);

AWS_S3_API
void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client);

AWS_S3_API
void aws_s3_client_update_connections_threaded(struct aws_s3_client *client);

AWS_S3_API
struct aws_s3_endpoint *aws_s3_endpoint_new(
    struct aws_allocator *allocator,
    const struct aws_s3_endpoint_options *options);

AWS_S3_API void aws_s3_client_lock_synced_data(struct aws_s3_client *client);

AWS_S3_API
void aws_s3_client_unlock_synced_data(struct aws_s3_client *client);

/* Used for mocking */
AWS_S3_API
void aws_s3_endpoint_set_system_vtable(const struct aws_s3_endpoint_system_vtable *vtable);

/* Increment the endpoint's ref-count.
 * If `already_holding_lock` is false, then this call will briefly take hold of the client's lock */
struct aws_s3_endpoint *aws_s3_endpoint_acquire(struct aws_s3_endpoint *endpoint, bool already_holding_lock);

/* Decrement the endpoint's ref-count.
 * You MUST NOT call this while the client's lock is held.
 * (this call briefly holds the client's lock and may remove the endpoint
 * from the client's hashtable) */
void aws_s3_endpoint_release(struct aws_s3_endpoint *endpoint);

AWS_S3_API
extern const uint32_t g_max_num_connections_per_vip;

AWS_S3_API
extern const uint32_t g_num_conns_per_vip_meta_request_look_up[];

AWS_S3_API
extern const size_t g_expect_timeout_offset_ms;

AWS_S3_API
void aws_s3_client_update_upload_part_timeout(
    struct aws_s3_client *client,
    struct aws_s3_request *finished_upload_part_request,
    int finished_error_code);

AWS_EXTERN_C_END

#endif /* AWS_S3_CLIENT_IMPL_H */