File: try_server.py

package info (click to toggle)
mercurial 7.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 44,824 kB
  • sloc: python: 206,444; ansic: 56,415; tcl: 3,715; sh: 1,797; lisp: 1,483; cpp: 864; makefile: 752; javascript: 649; xml: 36
file content (99 lines) | stat: -rw-r--r-- 2,474 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# try_server.py - Interact with Try server
#
# Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# no-check-code because Python 3 native.

import base64
import json
import os
import subprocess
import tempfile

from .aws import AWSConnection

LAMBDA_FUNCTION = "ci-try-server-upload"


def trigger_try(c: AWSConnection, rev="."):
    """Trigger a new Try run."""
    lambda_client = c.session.client("lambda")

    cset, bundle = generate_bundle(rev=rev)

    payload = {
        "bundle": base64.b64encode(bundle).decode("utf-8"),
        "node": cset["node"],
        "branch": cset["branch"],
        "user": cset["user"],
        "message": cset["desc"],
    }

    print("resolved revision:")
    print("node: %s" % cset["node"])
    print("branch: %s" % cset["branch"])
    print("user: %s" % cset["user"])
    print("desc: %s" % cset["desc"].splitlines()[0])
    print()

    print("sending to Try...")
    res = lambda_client.invoke(
        FunctionName=LAMBDA_FUNCTION,
        InvocationType="RequestResponse",
        Payload=json.dumps(payload).encode("utf-8"),
    )

    body = json.load(res["Payload"])
    for message in body:
        print("remote: %s" % message)


def generate_bundle(rev="."):
    """Generate a bundle suitable for use by the Try service.

    Returns a tuple of revision metadata and raw Mercurial bundle data.
    """
    # `hg bundle` doesn't support streaming to stdout. So we use a temporary
    # file.
    path = None
    try:
        fd, path = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
        os.close(fd)

        args = [
            "hg",
            "bundle",
            "--type",
            "gzip-v2",
            "--base",
            "public()",
            "--rev",
            rev,
            path,
        ]

        print("generating bundle...")
        subprocess.run(args, check=True)

        with open(path, "rb") as fh:
            bundle_data = fh.read()

    finally:
        if path:
            os.unlink(path)

    args = [
        "hg",
        "log",
        "-r",
        rev,
        # We have to upload as JSON, so it won't matter if we emit binary
        # since we need to normalize to UTF-8.
        "-T",
        "json",
    ]
    res = subprocess.run(args, check=True, capture_output=True)
    return json.loads(res.stdout)[0], bundle_data