File: bundle.py

package info (click to toggle)
git-pw 2.0.0-2
  • links: PTS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 480 kB
  • sloc: python: 1,972; makefile: 4
file content (319 lines) | stat: -rw-r--r-- 8,986 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""
Bundle subcommands.
"""

import logging
import sys

import click

from git_pw import api
from git_pw import utils

LOG = logging.getLogger(__name__)

_list_headers = ('ID', 'Name', 'Owner', 'Public')
_sort_fields = ('id', '-id', 'name', '-name')


def _get_bundle(bundle_id):
    """Fetch bundle by ID or name.

    Allow users to provide a string to search for bundles. This doesn't make
    sense to expose via the API since there's no uniqueness constraint on
    bundle names.
    """
    if bundle_id.isdigit():
        return api.detail('bundles', bundle_id)

    bundles = api.index('bundles', [('q', bundle_id)])
    if len(bundles) == 0:
        LOG.error('No matching bundle found: %s', bundle_id)
        sys.exit(1)
    elif len(bundles) > 1:
        LOG.error('More than one bundle found: %s', bundle_id)
        sys.exit(1)

    return bundles[0]


@click.command(name='apply', context_settings=dict(
    ignore_unknown_options=True,
))
@click.argument('bundle_id')
@click.argument('args', nargs=-1, type=click.UNPROCESSED)
def apply_cmd(bundle_id, args):
    """Apply bundle.

    Apply a bundle locally using the 'git-am' command. Any additional ARGS
    provided will be passed to the 'git-am' command.
    """
    LOG.debug('Applying bundle: id=%s', bundle_id)

    bundle = _get_bundle(bundle_id)
    mbox = api.download(bundle['mbox'])

    utils.git_am(mbox, args)


@click.command(name='download')
@click.argument('bundle_id')
@click.argument('output', type=click.File('wb'), required=False)
def download_cmd(bundle_id, output):
    """Download bundle in mbox format.

    Download a bundle but do not apply it. ``OUTPUT`` is optional and can be an
    output path or ``-`` to output to ``stdout``. If ``OUTPUT`` is not
    provided, the output path will be automatically chosen.
    """
    LOG.debug('Downloading bundle: id=%s', bundle_id)

    path = None
    bundle = _get_bundle(bundle_id)

    path = api.download(bundle['mbox'], output=output)

    if path:
        LOG.info('Downloaded bundle to %s', path)


def _show_bundle(bundle, fmt):

    def _format_patch(patch):
        return '%-4d %s' % (patch.get('id'), patch.get('name'))

    output = [
        ('ID', bundle.get('id')),
        ('Name', bundle.get('name')),
        ('URL', bundle.get('web_url')),
        ('Owner', bundle.get('owner').get('username')),
        ('Project', bundle.get('project').get('name')),
        ('Public', bundle.get('public'))]

    prefix = 'Patches'
    for patch in bundle.get('patches'):
        output.append((prefix, _format_patch(patch)))
        prefix = ''

    utils.echo(output, ['Property', 'Value'], fmt=fmt)


@click.command(name='show')
@utils.format_options
@click.argument('bundle_id')
def show_cmd(fmt, bundle_id):
    """Show information about bundle.

    Retrieve Patchwork metadata for a bundle.
    """
    LOG.debug('Showing bundle: id=%s', bundle_id)

    bundle = _get_bundle(bundle_id)

    _show_bundle(bundle, fmt)


@click.command(name='list')
@click.option('--owner', 'owners', metavar='OWNER', multiple=True,
              help='Show only bundles with these owners. Should be an email, '
              'name or ID. Private bundles of other users will not be shown.')
@utils.pagination_options(sort_fields=_sort_fields, default_sort='name')
@utils.format_options(headers=_list_headers)
@click.argument('name', required=False)
@api.validate_multiple_filter_support
def list_cmd(owners, limit, page, sort, fmt, headers, name):
    """List bundles.

    List bundles on the Patchwork instance.
    """
    LOG.debug('List bundles: owners=%s, limit=%r, page=%r, sort=%r',
              ','.join(owners), limit, page, sort)

    params = []

    for owner in owners:
        # we support server-side filtering by username (but not email) in 1.1
        if (api.version() >= (1, 1) and '@' not in owner) or owner.isdigit():
            params.append(('owner', owner))
        else:
            params.extend(api.retrieve_filter_ids('users', 'owner', owner))

    params.extend([
        ('q', name),
        ('page', page),
        ('per_page', limit),
        ('order', sort),
    ])

    bundles = api.index('bundles', params)

    # Format and print output

    output = []

    for bundle in bundles:
        item = [
            bundle.get('id'),
            utils.trim(bundle.get('name') or ''),
            bundle.get('owner').get('username'),
            'yes' if bundle.get('public') else 'no',
        ]

        output.append([])
        for idx, header in enumerate(_list_headers):
            if header not in headers:
                continue

            output[-1].append(item[idx])

    utils.echo_via_pager(output, headers, fmt=fmt)


@click.command(name='create')
@click.option('--public/--private', default=False,
              help='Allow other users to view this bundle. If private, only '
              'you will be able to see this bundle.')
@click.argument('name')
@click.argument('patch_ids', type=click.INT, nargs=-1, required=True)
@api.validate_minimum_version(
    (1, 2), 'Creating bundles is only supported from API version 1.2',
)
@utils.format_options
def create_cmd(name, patch_ids, public, fmt):
    """Create a bundle.

    Create a bundle with the given NAME and patches from PATCH_ID.

    Requires API version 1.2 or greater.
    """
    LOG.debug('Create bundle: name=%s, patches=%s, public=%s',
              name, patch_ids, public)

    data = [
        ('name', name),
        ('patches', patch_ids),
        ('public', public),
    ]

    bundle = api.create('bundles', data)

    _show_bundle(bundle, fmt)


@click.command(name='update')
@click.option('--name')
@click.option('--patch', 'patch_ids', type=click.INT, multiple=True,
              help='Add the specified patch(es) to the bundle.')
@click.option('--public/--private', default=None,
              help='Allow other users to view this bundle. If private, only '
              'you will be able to see this bundle.')
@click.argument('bundle_id')
@api.validate_minimum_version(
    (1, 2), 'Updating bundles is only supported from API version 1.2',
)
@utils.format_options
def update_cmd(bundle_id, name, patch_ids, public, fmt):
    """Update a bundle.

    Update bundle BUNDLE_ID. If PATCH_IDs are specified, this will overwrite
    all patches in the bundle. Use 'bundle add' and 'bundle remove' to add or
    remove patches.

    Requires API version 1.2 or greater.
    """
    LOG.debug(
        'Updating bundle: id=%s, name=%s, patches=%s, public=%s',
        bundle_id, name, patch_ids, public,
    )

    data = []

    for key, value in [('name', name), ('public', public)]:
        if value is None:
            continue

        data.append((key, value))

    if patch_ids:  # special case patches to ignore the empty set
        data.append(('patches', patch_ids))

    bundle = api.update('bundles', bundle_id, data)

    _show_bundle(bundle, fmt)


@click.command(name='delete')
@click.argument('bundle_id')
@api.validate_minimum_version(
    (1, 2), 'Deleting bundles is only supported from API version 1.2',
)
@utils.format_options
def delete_cmd(bundle_id, fmt):
    """Delete a bundle.

    Delete bundle BUNDLE_ID.

    Requires API version 1.2 or greater.
    """
    LOG.debug('Delete bundle: id=%s', bundle_id)

    api.delete('bundles', bundle_id)


@click.command(name='add')
@click.argument('bundle_id')
@click.argument('patch_ids', type=click.INT, nargs=-1, required=True)
@api.validate_minimum_version(
    (1, 2), 'Modifying bundles is only supported from API version 1.2',
)
@utils.format_options
def add_cmd(bundle_id, patch_ids, fmt):
    """Add one or more patches to a bundle.

    Append the provided PATCH_IDS to bundle BUNDLE_ID.

    Requires API version 1.2 or greater.
    """
    LOG.debug('Add to bundle: id=%s, patches=%s', bundle_id, patch_ids)

    bundle = _get_bundle(bundle_id)

    data = [
        ('patches', patch_ids + tuple([p['id'] for p in bundle['patches']])),
    ]

    bundle = api.update('bundles', bundle_id, data)

    _show_bundle(bundle, fmt)


@click.command(name='remove')
@click.argument('bundle_id')
@click.argument('patch_ids', type=click.INT, nargs=-1, required=True)
@api.validate_minimum_version(
    (1, 2), 'Modifying bundles is only supported from API version 1.2',
)
@utils.format_options
def remove_cmd(bundle_id, patch_ids, fmt):
    """Remove one or more patches from a bundle.

    Remove the provided PATCH_IDS to bundle BUNDLE_ID.

    Requires API version 1.2 or greater.
    """
    LOG.debug('Remove from bundle: id=%s, patches=%s', bundle_id, patch_ids)

    bundle = _get_bundle(bundle_id)

    patches = [p['id'] for p in bundle['patches'] if p['id'] not in patch_ids]
    if not patches:
        LOG.error(
            'Bundles cannot be empty. Consider deleting the bundle instead'
        )
        sys.exit(1)

    data = [('patches', tuple(patches))]

    bundle = api.update('bundles', bundle_id, data)

    _show_bundle(bundle, fmt)