File: fake_cipd.py

package info (click to toggle)
chromium 139.0.7258.127-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 6,122,156 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (204 lines) | stat: -rw-r--r-- 6,284 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
# Copyright (c) 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from string import Template

import argparse
import io
import json
import os
import re
import shutil
import sys

ARCH_VAR = 'arch'
OS_VAR = 'os'
PLATFORM_VAR = 'platform'

CIPD_SUBDIR_RE = '@Subdir (.*)'
CIPD_DESCRIBE = 'describe'
CIPD_ENSURE = 'ensure'
CIPD_ENSURE_FILE_RESOLVE = 'ensure-file-resolve'
CIPD_EXPAND_PKG = 'expand-package-name'
CIPD_EXPORT = 'export'

DESCRIBE_STDOUT_TEMPLATE = """\
Package:       ${package}
Instance ID:   ${package}-fake-instance-id
Registered by: user:fake-testing-support
Registered at: 2023-05-10 18:53:55.078574 +0000 UTC
Refs:
  ${package}-latest
Tags:
  git_revision:${package}-fake-git-revision
  ${package}-fake-tag:1.0
  ${package}-fake-tag:2.0
"""

DESCRIBE_JSON_TEMPLATE = """{
  "result": {
    "pin": {
      "package": "${package}",
      "instance_id": "${package}-fake-instance-id"
    },
    "registered_by": "user:fake-testing-support",
    "registered_ts": 1683744835,
    "refs": [
      {
        "ref": "${package}-latest",
        "instance_id": "${package}-fake-instance-id",
        "modified_by": "user:fake-testing-support",
        "modified_ts": 1683744835
      }
    ],
    "tags": [
      {
        "tag": "git_revision:${package}-fake-git-revision",
        "registered_by": "user:fake-testing-support",
        "registered_ts": 1683744835
      },
      {
        "tag": "${package}-fake-tag:1.0",
        "registered_by": "user:fake-testing-support",
        "registered_ts": 1683744835
      },
      {
        "tag": "${package}-fake-tag:2.0",
        "registered_by": "user:fake-testing-support",
        "registered_ts": 1683744835
      }
    ]
  }
}"""


def parse_cipd(root, contents):
    tree = {}
    current_subdir = None
    for line in contents:
        line = line.strip()
        match = re.match(CIPD_SUBDIR_RE, line)
        if match:
            print('match')
            current_subdir = os.path.join(root, *match.group(1).split('/'))
            if not root:
                current_subdir = match.group(1)
        elif line and current_subdir:
            print('no match')
            tree.setdefault(current_subdir, []).append(line)
    return tree


def expand_package_name_cmd(package_name):
    package_split = package_name.split("/")
    suffix = package_split[-1]
    # Any use of var equality should return empty for testing.
    if "=" in suffix:
        if suffix != "${platform=fake-platform-ok}":
            return ""
        package_name = "/".join(package_split[:-1] + ["${platform}"])
    for v in [ARCH_VAR, OS_VAR, PLATFORM_VAR]:
        var = "${%s}" % v
        if package_name.endswith(var):
            package_name = package_name.replace(var,
                                                "%s-expanded-test-only" % v)
    return package_name


def ensure_file_resolve():
    resolved = {"result": {}}
    parser = argparse.ArgumentParser()
    parser.add_argument('-ensure-file', required=True)
    parser.add_argument('-json-output')
    args, _ = parser.parse_known_args()
    with io.open(args.ensure_file, 'r', encoding='utf-8') as f:
        new_content = parse_cipd("", f.readlines())
        for path, packages in new_content.items():
            resolved_packages = []
            for package in packages:
                package_name = expand_package_name_cmd(package.split(" ")[0])
                resolved_packages.append({
                    "package": package_name,
                    "pin": {
                        "package": package_name,
                        "instance_id": package_name + "-fake-resolved-id",
                    }
                })
            resolved["result"][path] = resolved_packages
        with io.open(args.json_output, 'w', encoding='utf-8') as f:
            f.write(json.dumps(resolved, indent=4))


def describe_cmd(package_name):
    parser = argparse.ArgumentParser()
    parser.add_argument('-json-output')
    parser.add_argument('-version', required=True)
    args, _ = parser.parse_known_args()
    json_template = Template(DESCRIBE_JSON_TEMPLATE).substitute(
        package=package_name)
    cli_out = Template(DESCRIBE_STDOUT_TEMPLATE).substitute(
        package=package_name)
    json_out = json.loads(json_template)
    found = False
    for tag in json_out['result']['tags']:
        if tag['tag'] == args.version:
            found = True
            break
    for tag in json_out['result']['refs']:
        if tag['ref'] == args.version:
            found = True
            break
    if found:
        if args.json_output:
            with io.open(args.json_output, 'w', encoding='utf-8') as f:
                f.write(json.dumps(json_out, indent=4))
        sys.stdout.write(cli_out)
        return 0
    sys.stdout.write('Error: no such ref.\n')
    return 1


def main():
    cmd = sys.argv[1]
    assert cmd in [
        CIPD_DESCRIBE, CIPD_ENSURE, CIPD_ENSURE_FILE_RESOLVE, CIPD_EXPAND_PKG,
        CIPD_EXPORT
    ]
    # Handle cipd expand-package-name
    if cmd == CIPD_EXPAND_PKG:
        # Expecting argument after cmd
        assert len(sys.argv) == 3
        # Write result to stdout
        sys.stdout.write(expand_package_name_cmd(sys.argv[2]))
        return 0
    if cmd == CIPD_DESCRIBE:
        # Expecting argument after cmd
        assert len(sys.argv) >= 3
        return describe_cmd(sys.argv[2])
    if cmd == CIPD_ENSURE_FILE_RESOLVE:
        return ensure_file_resolve()

    parser = argparse.ArgumentParser()
    parser.add_argument('-ensure-file')
    parser.add_argument('-root')
    args, _ = parser.parse_known_args()

    with io.open(args.ensure_file, 'r', encoding='utf-8') as f:
        new_content = parse_cipd(args.root, f.readlines())

    # Install new packages
    for path, packages in new_content.items():
        if not os.path.exists(path):
            os.makedirs(path)
        with io.open(os.path.join(path, '_cipd'), 'w', encoding='utf-8') as f:
            f.write('\n'.join(packages))

    # Save the ensure file that we got
    shutil.copy(args.ensure_file, os.path.join(args.root, '_cipd'))

    return 0


if __name__ == '__main__':
    sys.exit(main())