1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
#!/usr/bin/env python3
import datetime
import os
import random
import string
import sys
import time
import warnings
from typing import Any
import boto3
import requests
POLLING_DELAY_IN_SECOND = 5
MAX_UPLOAD_WAIT_IN_SECOND = 600
# NB: This is the curated top devices from AWS. We could create our own device
# pool if we want to
DEFAULT_DEVICE_POOL_ARN = (
"arn:aws:devicefarm:us-west-2::devicepool:082d10e5-d7d7-48a5-ba5c-b33d66efa1f5"
)
def parse_args() -> Any:
from argparse import ArgumentParser
parser = ArgumentParser("Run iOS tests on AWS Device Farm")
parser.add_argument(
"--project-arn", type=str, required=True, help="the ARN of the project on AWS"
)
parser.add_argument(
"--app-file", type=str, required=True, help="the iOS ipa app archive"
)
parser.add_argument(
"--xctest-file", type=str, required=True, help="the XCTest suite to run"
)
parser.add_argument(
"--name-prefix",
type=str,
required=True,
help="the name prefix of this test run",
)
parser.add_argument(
"--device-pool-arn",
type=str,
default=DEFAULT_DEVICE_POOL_ARN,
help="the name of the device pool to test on",
)
return parser.parse_args()
def upload_file(
client: Any,
project_arn: str,
prefix: str,
filename: str,
filetype: str,
mime: str = "application/octet-stream",
):
"""
Upload the app file and XCTest suite to AWS
"""
r = client.create_upload(
projectArn=project_arn,
name=f"{prefix}_{os.path.basename(filename)}",
type=filetype,
contentType=mime,
)
upload_name = r["upload"]["name"]
upload_arn = r["upload"]["arn"]
upload_url = r["upload"]["url"]
with open(filename, "rb") as file_stream:
print(f"Uploading {filename} to Device Farm as {upload_name}...")
r = requests.put(upload_url, data=file_stream, headers={"content-type": mime})
if not r.ok:
raise Exception(f"Couldn't upload {filename}: {r.reason}") # noqa: TRY002
start_time = datetime.datetime.now()
# Polling AWS till the uploaded file is ready
while True:
waiting_time = datetime.datetime.now() - start_time
if waiting_time > datetime.timedelta(seconds=MAX_UPLOAD_WAIT_IN_SECOND):
raise Exception( # noqa: TRY002
f"Uploading {filename} is taking longer than {MAX_UPLOAD_WAIT_IN_SECOND} seconds, terminating..."
)
r = client.get_upload(arn=upload_arn)
status = r["upload"].get("status", "")
print(f"{filename} is in state {status} after {waiting_time}")
if status == "FAILED":
raise Exception(f"Couldn't upload {filename}: {r}") # noqa: TRY002
if status == "SUCCEEDED":
break
time.sleep(POLLING_DELAY_IN_SECOND)
return upload_arn
def main() -> None:
args = parse_args()
client = boto3.client("devicefarm")
unique_prefix = f"{args.name_prefix}-{datetime.date.today().isoformat()}-{''.join(random.sample(string.ascii_letters, 8))}"
# Upload the test app
appfile_arn = upload_file(
client=client,
project_arn=args.project_arn,
prefix=unique_prefix,
filename=args.app_file,
filetype="IOS_APP",
)
print(f"Uploaded app: {appfile_arn}")
# Upload the XCTest suite
xctest_arn = upload_file(
client=client,
project_arn=args.project_arn,
prefix=unique_prefix,
filename=args.xctest_file,
filetype="XCTEST_TEST_PACKAGE",
)
print(f"Uploaded XCTest: {xctest_arn}")
# Schedule the test
r = client.schedule_run(
projectArn=args.project_arn,
name=unique_prefix,
appArn=appfile_arn,
devicePoolArn=args.device_pool_arn,
test={"type": "XCTEST", "testPackageArn": xctest_arn},
)
run_arn = r["run"]["arn"]
start_time = datetime.datetime.now()
print(f"Run {unique_prefix} is scheduled as {run_arn}:")
state = "UNKNOWN"
result = ""
try:
while True:
r = client.get_run(arn=run_arn)
state = r["run"]["status"]
if state == "COMPLETED":
result = r["run"]["result"]
break
waiting_time = datetime.datetime.now() - start_time
print(
f"Run {unique_prefix} in state {state} after {datetime.datetime.now() - start_time}"
)
time.sleep(30)
except Exception as error:
warnings.warn(f"Failed to run {unique_prefix}: {error}")
sys.exit(1)
if not result or result == "FAILED":
print(f"Run {unique_prefix} failed, exiting...")
sys.exit(1)
if __name__ == "__main__":
main()
|