1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
from awscrt.s3 import S3Client, S3RequestType
from awscrt.io import ClientBootstrap, DefaultHostResolver, EventLoopGroup
from awscrt.auth import AwsCredentialsProvider
from awscrt.http import HttpHeaders, HttpRequest
import time
# import os
import sys
import threading
GBPS = 1000 * 1000 * 1000
class CrtLazyReadStream(object):
def __init__(self, filename, pattern, statistics, length=0):
self._filename = filename
self.length = length
self._stream = None
self._pattern = pattern
self._statistic = statistics
def _available_stream(self):
if self._stream is None:
self._stream = open(self._filename, self._pattern)
return
if self._stream.closed:
self._stream = open(self._filename, self._pattern)
def read(self, length):
self._available_stream()
data = self._stream.read(length)
read_len = len(data)
self._statistic.record_read(read_len)
if read_len is 0:
self._stream.close()
return data
def readinto1(self, m):
# Read into memoryview m.
self._available_stream()
len = self._stream.readinto1(m)
self._statistic.record_read(len)
if len is 0:
self._stream.close()
return len
def seek(self, offset, whence):
self._available_stream()
return self._stream.seek(offset, whence)
def close(self):
pass
class Statistics(object):
def __init__(self):
self._lock = threading.Lock()
self.end_time = 0
self._bytes_peak = 0
self._bytes_avg = 0
self._bytes_read = 0
self._bytes_sampled = 0
self.sec_first_byte = 0
self.star_time = time.time()
self.last_sample_time = time.time()
def record_read(self, size):
with self._lock:
self._bytes_read += size
if self.sec_first_byte == 0:
self.sec_first_byte = time.time() - self.star_time
time_now = time.time()
if time_now - self.last_sample_time > 1:
bytes_this_second = (self._bytes_read - self._bytes_sampled) / (time_now - self.last_sample_time)
self._bytes_sampled = self._bytes_read
self._bytes_avg = (self._bytes_avg + bytes_this_second) * 0.5
if self._bytes_peak < bytes_this_second:
self._bytes_peak = bytes_this_second
self.last_sample_time = time_now
def bytes_peak(self):
return (self._bytes_peak * 8) / GBPS
def bytes_avg(self):
return (self._bytes_avg * 8) / GBPS
# Configurations
region = "us-west-2"
bucket_name = "aws-crt-python-s3-testing-bucket"
object_name = "/0_10GB.txt"
file_name = "." + object_name
object_real_name = "/0_10GB"
suffix = ".txt"
repeat_times = 1
bunch_size = 160
writing_disk = True
request_type = "download"
# Initialization
event_loop_group = EventLoopGroup(18)
host_resolver = DefaultHostResolver(event_loop_group)
bootstrap = ClientBootstrap(event_loop_group, host_resolver)
credential_provider = AwsCredentialsProvider.new_default_chain(bootstrap)
s3_client = S3Client(
bootstrap=bootstrap,
region="us-west-2",
credential_provider=credential_provider,
throughput_target_gbps=100)
t_statistic = Statistics()
headers = HttpHeaders([("host", bucket_name + ".s3." + region + ".amazonaws.com")])
request = HttpRequest("GET", object_name, headers)
# file_stats = os.stat(file_name)
# data_len = file_stats.st_size
# data_stream = CrtLazyReadStream(file_name, "r+b", t_statistic, data_len)
# upload_headers = HttpHeaders([("host", bucket_name + ".s3." + region + ".amazonaws.com"),
# ("Content-Type", "text/plain"), ("Content-Length", str(data_len))])
# upload_request = HttpRequest("PUT", "/put_object_test_py_10MB.txt", upload_headers, data_stream)
def on_body(offset, chunk, **kwargs):
t_statistic.record_read(len(chunk))
# if writing_disk:
# if not os.path.exists(file_name):
# open(file_name, 'a').close()
# with open(file_name, 'rb+') as f:
# # seems like the seek here may screw up the file.
# f.seek(offset)
# f.write(chunk)
completed_connections = 0
def on_done(**kwargs):
global completed_connections
completed_connections += 1
print("Finished connection {}".format(completed_connections))
sys.stdout.flush()
def print_statistic(statistic):
print("Gbps peak:", statistic.bytes_peak())
print("Gbps avg:", statistic.bytes_avg())
# init_logging(LogLevel.Trace, "trace_log.txt")
start_time = time.time()
completed = repeat_times * bunch_size
for i in range(0, repeat_times):
futures = []
s3_requests = []
for j in range(0, bunch_size):
s3_requests.append(s3_client.make_request(
request=request,
type=S3RequestType.GET_OBJECT,
on_body=on_body,
on_done=on_done))
futures.append(s3_requests[j].finished_future)
for j in futures:
try:
j.result(100000)
except Exception as e:
completed = completed - 1
end_time = time.time()
print_statistic(t_statistic)
print("total time:", end_time - start_time)
print("completed/all:", completed, repeat_times * bunch_size)
print("latency:", t_statistic.sec_first_byte)
|