1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
import gzip
import io
import json
import os
import zipfile
from pathlib import Path
from typing import Any, Dict, List
import boto3 # type: ignore[import]
import requests
import rockset # type: ignore[import]
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
S3_RESOURCE = boto3.resource("s3")
def _get_request_headers() -> Dict[str, str]:
return {
"Accept": "application/vnd.github.v3+json",
"Authorization": "token " + os.environ["GITHUB_TOKEN"],
}
def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]:
"""Get all workflow artifacts with 'test-report' in the name."""
response = requests.get(
f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100",
)
artifacts = response.json()["artifacts"]
while "next" in response.links.keys():
response = requests.get(
response.links["next"]["url"], headers=_get_request_headers()
)
artifacts.extend(response.json()["artifacts"])
artifact_urls = {}
for artifact in artifacts:
if artifact["name"].startswith(prefix):
artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"]
return artifact_urls
def _download_artifact(
artifact_name: Path, artifact_url: str, workflow_run_attempt: int
) -> Path:
# [Artifact run attempt]
# All artifacts on a workflow share a single namespace. However, we can
# re-run a workflow and produce a new set of artifacts. To avoid name
# collisions, we add `-runattempt1<run #>-` somewhere in the artifact name.
#
# This code parses out the run attempt number from the artifact name. If it
# doesn't match the one specified on the command line, skip it.
atoms = str(artifact_name).split("-")
for atom in atoms:
if atom.startswith("runattempt"):
found_run_attempt = int(atom[len("runattempt") :])
if workflow_run_attempt != found_run_attempt:
print(
f"Skipping {artifact_name} as it is an invalid run attempt. "
f"Expected {workflow_run_attempt}, found {found_run_attempt}."
)
print(f"Downloading {artifact_name}")
response = requests.get(artifact_url, headers=_get_request_headers())
with open(artifact_name, "wb") as f:
f.write(response.content)
return artifact_name
def download_s3_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
bucket = S3_RESOURCE.Bucket("gha-artifacts")
objs = bucket.objects.filter(
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
)
found_one = False
paths = []
for obj in objs:
found_one = True
p = Path(Path(obj.key).name)
print(f"Downloading {p}")
with open(p, "wb") as f:
f.write(obj.get()["Body"].read())
paths.append(p)
if not found_one:
print(
"::warning title=s3 artifacts not found::"
"Didn't find any test reports in s3, there might be a bug!"
)
return paths
def download_gha_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
artifact_urls = _get_artifact_urls(prefix, workflow_run_id)
paths = []
for name, url in artifact_urls.items():
paths.append(_download_artifact(Path(name), url, workflow_run_attempt))
return paths
def upload_to_rockset(collection: str, docs: List[Any]) -> None:
print(f"Writing {len(docs)} documents to Rockset")
client = rockset.Client(
api_server="api.rs2.usw2.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)
client.Collection.retrieve(collection).add_docs(docs)
print("Done!")
def upload_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
docs: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(docs)} documents to S3")
body = io.StringIO()
for doc in docs:
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(
"ossci-raw-job-status",
f"{collection}/{workflow_run_id}/{workflow_run_attempt}",
).put(
Body=gzip.compress(body.getvalue().encode()),
ContentEncoding="gzip",
ContentType="application/json",
)
print("Done!")
def unzip(p: Path) -> None:
"""Unzip the provided zipfile to a similarly-named directory.
Returns None if `p` is not a zipfile.
Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/
"""
assert p.is_file()
unzipped_dir = p.with_name("unzipped-" + p.stem)
print(f"Extracting {p} to {unzipped_dir}")
with zipfile.ZipFile(p, "r") as zip:
zip.extractall(unzipped_dir)
|