1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
from __future__ import annotations
import argparse
import csv
import hashlib
import json
import os
import re
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Dict
from tools.stats.upload_stats_lib import (
download_s3_artifacts,
unzip,
upload_to_dynamodb,
)
ARTIFACTS = [
"test-reports",
]
ARTIFACT_REGEX = re.compile(
r"test-reports-test-(?P<name>[\w\-]+)-\d+-\d+-(?P<runner>[\w\.-]+)_(?P<job>\d+).zip"
)
def get_perf_stats(
repo: str,
workflow_run_id: int,
workflow_run_attempt: int,
head_branch: str,
match_filename: str,
) -> list[dict[str, Any]]:
match_filename_regex = re.compile(match_filename)
perf_stats = []
with TemporaryDirectory() as temp_dir:
print("Using temporary directory:", temp_dir)
os.chdir(temp_dir)
for artifact in ARTIFACTS:
artifact_paths = download_s3_artifacts(
artifact, workflow_run_id, workflow_run_attempt
)
# Unzip to get perf stats csv files
for path in artifact_paths:
m = ARTIFACT_REGEX.match(str(path))
if not m:
print(f"Test report {path} has an invalid name. Skipping")
continue
test_name = m.group("name")
runner = m.group("runner")
job_id = m.group("job")
# Extract all files
unzip(path)
for csv_file in Path(".").glob("**/*.csv"):
filename = os.path.splitext(os.path.basename(csv_file))[0]
if not re.match(match_filename_regex, filename):
continue
print(f"Processing {filename} from {path}")
with open(csv_file) as csvfile:
reader = csv.DictReader(csvfile, delimiter=",")
for row in reader:
row.update(
{
"workflow_id": workflow_run_id, # type: ignore[dict-item]
"run_attempt": workflow_run_attempt, # type: ignore[dict-item]
"test_name": test_name,
"runner": runner,
"job_id": job_id,
"filename": filename,
"head_branch": head_branch,
}
)
perf_stats.append(row)
# Done processing the file, removing it
os.remove(csv_file)
return perf_stats
def generate_partition_key(repo: str, doc: Dict[str, Any]) -> str:
"""
Generate an unique partition key for the document on DynamoDB
"""
workflow_id = doc["workflow_id"]
job_id = doc["job_id"]
test_name = doc["test_name"]
filename = doc["filename"]
hash_content = hashlib.md5(json.dumps(doc).encode("utf-8")).hexdigest()
return f"{repo}/{workflow_id}/{job_id}/{test_name}/{filename}/{hash_content}"
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Upload dynamo perf stats from S3 to DynamoDB"
)
parser.add_argument(
"--workflow-run-id",
type=int,
required=True,
help="id of the workflow to get perf stats from",
)
parser.add_argument(
"--workflow-run-attempt",
type=int,
required=True,
help="which retry of the workflow this is",
)
parser.add_argument(
"--repo",
type=str,
required=True,
help="which GitHub repo this workflow run belongs to",
)
parser.add_argument(
"--head-branch",
type=str,
required=True,
help="head branch of the workflow",
)
parser.add_argument(
"--dynamodb-table",
type=str,
required=True,
help="the name of the DynamoDB table to store the stats",
)
parser.add_argument(
"--match-filename",
type=str,
default="",
help="the regex to filter the list of CSV files containing the records to upload",
)
args = parser.parse_args()
perf_stats = get_perf_stats(
args.repo,
args.workflow_run_id,
args.workflow_run_attempt,
args.head_branch,
args.match_filename,
)
upload_to_dynamodb(
dynamodb_table=args.dynamodb_table,
repo=args.repo,
docs=perf_stats,
generate_partition_key=generate_partition_key,
)
|