1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
import awscrt.http
import awscrt.io
import enum
import sys
import threading
# Application for use with the test_appexit unit tests.
# Performs an HTTP request and shuts everything down,
# with the opportunity to exit the application at any stage.
Stage = enum.Enum('Stage', [
'Start',
'EventLoopGroupStart',
'HostResolverStart',
'ClientBootstrapStart',
'TlsConnectionOptionsStart',
'HttpClientStart',
'HttpClientConnected',
'HttpStreamStart',
'HttpStreamReceivingBody',
'HttpStreamDone',
'HttpConnectionClose',
'HttpConnectionDone',
'TlsConnectionOptionsDone',
'ClientBootstrapDone',
'HostResolverDone',
'EventLoopGroupDone',
'Done',
])
EXIT_STAGE = Stage.Done
TIMEOUT = 30.0
REQUEST_HOST = 's3.amazonaws.com'
REQUEST_PATH = '/code-sharing-aws-crt/elastigirl.png'
REQUEST_PORT = 443
def set_stage(stage):
print("Stage:", stage)
if stage.value >= EXIT_STAGE.value:
print("Exiting now")
sys.exit()
if __name__ == '__main__':
EXIT_STAGE = Stage[sys.argv[1]]
awscrt.io.init_logging(awscrt.io.LogLevel.Trace, 'stdout')
set_stage(Stage.Start)
# EventLoopGroupStart
elg = awscrt.io.EventLoopGroup()
set_stage(Stage.EventLoopGroupStart)
# HostResolverStart
resolver = awscrt.io.DefaultHostResolver(elg)
set_stage(Stage.HostResolverStart)
# ClientBootstrapStart
bootstrap = awscrt.io.ClientBootstrap(elg, resolver)
set_stage(Stage.ClientBootstrapStart)
# TlsConnectionOptionsStart
tls_ctx_opt = awscrt.io.TlsContextOptions()
tls_ctx = awscrt.io.ClientTlsContext(tls_ctx_opt)
tls_conn_opt = tls_ctx.new_connection_options()
tls_conn_opt.set_server_name(REQUEST_HOST)
set_stage(Stage.TlsConnectionOptionsStart)
# HttpClientStart
http_connection = awscrt.http.HttpClientConnection.new(
host_name=REQUEST_HOST,
port=REQUEST_PORT,
bootstrap=bootstrap,
tls_connection_options=tls_conn_opt)
set_stage(Stage.HttpClientStart)
# HttpClientConnected
try:
http_connection = http_connection.result(TIMEOUT)
except Exception:
# the internet's a flaky place and this isn't a correctness test
print("Connection failed. Exiting out early...")
set_stage(Stage.Done)
set_stage(Stage.HttpClientConnected)
# HttpStreamStart
request = awscrt.http.HttpRequest(path=REQUEST_PATH)
request.headers.add('Host', REQUEST_HOST)
receiving_body_event = threading.Event()
def on_incoming_body(http_stream, chunk):
receiving_body_event.set()
http_stream = http_connection.request(request, on_body=on_incoming_body)
http_stream.activate()
set_stage(Stage.HttpStreamStart)
# HttpStreamReceivingBody
assert receiving_body_event.wait(TIMEOUT)
set_stage(Stage.HttpStreamReceivingBody)
# HttpStreamDone
try:
status_code = http_stream.completion_future.result(TIMEOUT)
except Exception:
# the internet's a flaky place and this isn't a correctness test
print("Request failed. Continuing with cleanup...")
del http_stream
del request
set_stage(Stage.HttpStreamDone)
# HttpConnectionClose
connection_shutdown_future = http_connection.close()
set_stage(Stage.HttpConnectionClose)
# HttpConnectionDone
del http_connection
connection_shutdown_future.result(TIMEOUT)
set_stage(Stage.HttpConnectionDone)
# TlsConnectionOptionsDone
del tls_conn_opt
del tls_ctx
del tls_ctx_opt
set_stage(Stage.TlsConnectionOptionsDone)
# ClientBootstrapDone
bootstrap_shutdown_event = bootstrap.shutdown_event
del bootstrap
assert bootstrap_shutdown_event.wait(TIMEOUT)
set_stage(Stage.ClientBootstrapDone)
# HostResolverDone
del resolver
set_stage(Stage.HostResolverDone)
# EventLoopGroupDone
elg_shutdown_event = elg.shutdown_event
del elg
assert elg_shutdown_event.wait(TIMEOUT)
set_stage(Stage.EventLoopGroupDone)
# Done
set_stage(Stage.Done)
|