File: find_workflow_runs.py

package info (click to toggle)
llvmlite 0.46.0-0.1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 2,140 kB
  • sloc: python: 13,605; cpp: 3,192; makefile: 185; sh: 168
file content (168 lines) | stat: -rwxr-xr-x 5,904 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python
"""Find successful llvmlite workflow run IDs for a given git tag."""

import argparse
import json
import os
import subprocess
import sys


def find_workflow_run(workflow_name, tag, repo, token):
    """
    Find the latest successful workflow run for a given workflow and tag.

    Args:
        workflow_name: Name of the workflow file (e.g., 'llvmlite_conda_builder.yml')
        tag: Git tag to search for
        repo: Repository in format 'owner/repo' (e.g., 'numba/llvmlite')
        token: GitHub token for API authentication

    Returns:
        Run ID as a string, or None if not found
    """
    print(f"Search for successful run of {workflow_name} on tag {tag} in {repo}", file=sys.stderr)

    env = os.environ.copy()
    env['GH_TOKEN'] = token

    try:
        result = subprocess.run(
            [
                "gh", "api",
                "-H", "Accept: application/vnd.github+json",
                "-H", "X-GitHub-Api-Version: 2022-11-28",
                f"/repos/{repo}/actions/workflows/{workflow_name}/runs",
                "--jq", f'.workflow_runs[] | select(.head_branch == "{tag}" and .conclusion == "success") | .id'
            ],
            capture_output=True,
            text=True,
            check=True,
            env=env
        )

        run_ids = result.stdout.strip().split('\n')
        if run_ids and run_ids[0]:
            run_id = run_ids[0]
            print(f"Found run ID {run_id} for {workflow_name}", file=sys.stderr)
            return run_id

        print(f"ERROR: No successful run found for {workflow_name} on tag {tag}", file=sys.stderr)
        return None

    except subprocess.CalledProcessError as e:
        print(f"ERROR: Failed to query GitHub API: {e}", file=sys.stderr)
        print(f"stderr: {e.stderr}", file=sys.stderr)
        return None


def load_workflow_config(config_path):
    """
    Load workflow groups from JSON config.

    Args:
        config_path: Path to JSON file containing workflow groups under 'workflows' key

    Returns:
        Dict mapping group names to lists of workflow filenames
    """
    try:
        with open(config_path, 'r') as f:
            config = json.load(f)
    except FileNotFoundError:
        print(f"ERROR: Config file not found: {config_path}", file=sys.stderr)
        sys.exit(1)
    except json.JSONDecodeError as e:
        print(f"ERROR: Invalid JSON in config file: {e}", file=sys.stderr)
        sys.exit(1)

    try:
        workflow_groups = config['workflows']
    except KeyError as e:
        print(f"ERROR: Missing required key in config file: {e}", file=sys.stderr)
        sys.exit(1)

    # Validate that all groups are lists
    for group_name, workflows in workflow_groups.items():
        if not isinstance(workflows, list):
            print(f"ERROR: Workflow group '{group_name}' must be a list", file=sys.stderr)
            sys.exit(1)

    return workflow_groups


def main():
    parser = argparse.ArgumentParser(
        description='Find successful workflow run IDs for a given tag'
    )
    parser.add_argument(
        '--tag',
        default=os.environ.get('TAG', ''),
        help='Git tag to search for (default: TAG env var)'
    )
    parser.add_argument(
        '--repo',
        default=os.environ.get('GITHUB_REPOSITORY', ''),
        help='Repository in format owner/repo (default: GITHUB_REPOSITORY env var)'
    )
    parser.add_argument(
        '--token',
        default=os.environ.get('GH_TOKEN', os.environ.get('GITHUB_TOKEN', '')),
        help='GitHub token for API authentication (default: GH_TOKEN or GITHUB_TOKEN env var)'
    )
    parser.add_argument(
        '--config',
        required=True,
        help='Path to JSON config file containing workflow groups'
    )

    args = parser.parse_args()

    if not args.tag:
        parser.error("TAG is required (via --tag or TAG env var)")
    if not args.repo:
        parser.error("Repository is required (via --repo or GITHUB_REPOSITORY env var)")
    if not args.token:
        parser.error("GitHub token is required (via --token or GH_TOKEN/GITHUB_TOKEN env var)")

    # Load workflow groups from config file
    # Returns dict: {group_name: [workflow_file1.yml, workflow_file2.yml, ...]}
    grouped_workflow_names = load_workflow_config(args.config)

    # Will store found run IDs: {group_name: [run_id1, run_id2, ...]}
    found_run_ids_by_group = {}

    # For each workflow group, query GitHub API to find successful run IDs
    # matching the specified tag
    for group_name, workflow_files in grouped_workflow_names.items():
        print(f"\n=== finding {group_name} workflow runs ===", file=sys.stderr)
        run_ids = []
        for workflow_file in workflow_files:
            run_id = find_workflow_run(workflow_file, args.tag, args.repo, args.token)
            if run_id:
                run_ids.append(run_id)
            else:
                print(f"WARNING: Could not find run ID for {workflow_file}", file=sys.stderr)
        found_run_ids_by_group[group_name] = run_ids

    # Verify all workflows were found
    total_expected = sum(len(workflows) for workflows in grouped_workflow_names.values())
    total_found = sum(len(run_ids) for run_ids in found_run_ids_by_group.values())

    if total_found != total_expected:
        print(f"\nERROR: Not all workflow runs were found ({total_found}/{total_expected})", file=sys.stderr)
        sys.exit(1)

    print("\n=== found workflow run IDs ===", file=sys.stderr)
    for group_name, run_ids in found_run_ids_by_group.items():
        print(f"{group_name}: {' '.join(run_ids)}", file=sys.stderr)

    # Output run IDs in shell variable format for GitHub Actions
    # eg. wheel_run_ids=run_id1 run_id2 ...
    for group_name, run_ids in found_run_ids_by_group.items():
        print(f"{group_name}_run_ids={' '.join(run_ids)}")


if __name__ == "__main__":
    main()