File: pipsi.py

package info (click to toggle)
pipsi 0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 104 kB
  • sloc: python: 493; makefile: 2
file content (424 lines) | stat: -rw-r--r-- 12,847 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
import os
import sys
import shutil
import glob
from os.path import join, realpath, dirname, normpath, normcase
from operator import methodcaller
try:
    from urlparse import urlparse
except ImportError:
    from urllib.parse import urlparse

import click
from pkg_resources import Requirement

try:
    WindowsError
except NameError:
    IS_WIN = False
    BIN_DIR = 'bin'
else:
    IS_WIN = True
    BIN_DIR = 'Scripts'

FIND_SCRIPTS_SCRIPT = r'''if 1:
    import os
    import sys
    import pkg_resources
    pkg = sys.argv[1]
    prefix = sys.argv[2]
    dist = pkg_resources.get_distribution(pkg)
    if dist.has_metadata('RECORD'):
        for line in dist.get_metadata_lines('RECORD'):
            print(os.path.join(dist.location, line.split(',')[0]))
    elif dist.has_metadata('installed-files.txt'):
        for line in dist.get_metadata_lines('installed-files.txt'):
            print(os.path.join(dist.egg_info, line.split(',')[0]))
    elif dist.has_metadata('entry_points.txt'):
        try:
            from ConfigParser import SafeConfigParser
            from StringIO import StringIO
        except ImportError:
            from configparser import SafeConfigParser
            from io import StringIO
        parser = SafeConfigParser()
        parser.readfp(StringIO(
            '\n'.join(dist.get_metadata_lines('entry_points.txt'))))
        if parser.has_section('console_scripts'):
            for name, _ in parser.items('console_scripts'):
                print(os.path.join(prefix, name))
'''


# The `click` custom context settings
CONTEXT_SETTINGS = dict(
    help_option_names=['-h', '--help'],
)


def normalize_package(value):
    # Strips the version and normalizes name
    requirement = Requirement.parse(value)
    return requirement.project_name.lower()


def normalize(path):
    return normcase(normpath(realpath(path)))


def real_readlink(filename):
    try:
        target = os.readlink(filename)
    except (OSError, IOError, AttributeError):
        return None
    return normpath(realpath(join(dirname(filename), target)))


def statusoutput(argv, **kw):
    from subprocess import Popen, PIPE
    p = Popen(
        argv, stdout=PIPE, stderr=PIPE, **kw)
    output = p.communicate()[0].strip()
    if not isinstance(output, str):
        output = output.decode('utf-8', 'replace')
    return p.returncode, output


def publish_script(src, dst):
    if IS_WIN:
        # always copy new exe on windows
        shutil.copy(src, dst)
        click.echo('  Copied Executable ' + dst)
        return True
    else:
        old_target = real_readlink(dst)
        if old_target == src:
            return True
        try:
            os.remove(dst)
        except OSError:
            pass
        try:
            os.symlink(src, dst)
        except OSError:
            pass
        else:
            click.echo('  Linked script ' + dst)
            return True


def find_scripts(virtualenv, package):
    prefix = normalize(join(virtualenv, BIN_DIR, ''))

    files = statusoutput([
        join(prefix, 'python'), '-c', FIND_SCRIPTS_SCRIPT,
        package, prefix
    ])[1].splitlines()

    files = map(normalize, files)
    files = filter(
        methodcaller('startswith', prefix),
        files,
    )

    def valid(filename):
        return os.path.isfile(filename) and \
            IS_WIN or os.access(filename, os.X_OK)

    result = list(filter(valid, files))

    if IS_WIN:
        for filename in files:
            globed = glob.glob(filename + '*')
            result.extend(filter(valid, globed))
    return result


class UninstallInfo(object):

    def __init__(self, package, paths=None, installed=True):
        self.package = package
        self.paths = paths or []
        self.installed = installed

    def perform(self):
        for path in self.paths:
            try:
                os.remove(path)
            except OSError:
                shutil.rmtree(path)






class Repo(object):

    def __init__(self, home, bin_dir):
        self.home = home
        self.bin_dir = bin_dir

    def resolve_package(self, spec, python=None):
        url = urlparse(spec)
        if url.netloc == 'file':
            location = url.path
        elif url.netloc != '':
            if not url.fragment.startswith('egg='):
                raise click.UsageError('When installing from URLs you need '
                                       'to add an egg at the end.  For '
                                       'instance git+https://.../#egg=Foo')
            return url.fragment[4:], [spec]
        elif os.path.isdir(spec):
            location = spec
        else:
            return spec, [spec]

        error, name = statusoutput(
            [python or sys.executable, 'setup.py', '--name'],
            cwd=location)
        if error:
            raise click.UsageError('%s does not appear to be a local '
                                   'Python package.' % spec)

        return name, [location]

    def get_package_path(self, package):
        return join(self.home, normalize_package(package))

    def find_installed_executables(self, path):
        prefix = join(realpath(normpath(path)), '')
        try:
            for filename in os.listdir(self.bin_dir):
                exe = os.path.join(self.bin_dir, filename)
                target = real_readlink(exe)
                if target is None:
                    continue
                if target.startswith(prefix):
                    yield exe
        except OSError:
            pass

    def link_scripts(self, scripts):
        rv = []
        for script in scripts:
            script_dst = os.path.join(
                self.bin_dir, os.path.basename(script))
            if publish_script(script, script_dst):
                rv.append((script, script_dst))

        return rv

    def install(self, package, python=None, editable=False):
        package, install_args = self.resolve_package(package, python)

        venv_path = self.get_package_path(package)
        if os.path.isdir(venv_path):
            click.echo('%s is already installed' % package)
            return

        if not os.path.exists(self.bin_dir):
            os.makedirs(self.bin_dir)

        from subprocess import Popen

        def _cleanup():
            try:
                shutil.rmtree(venv_path)
            except (OSError, IOError):
                pass
            return False

        # Install virtualenv, use the pipsi used python version by default
        args = ['virtualenv', '-p', python or sys.executable, venv_path]

        try:
            if Popen(args).wait() != 0:
                click.echo('Failed to create virtualenv.  Aborting.')
                return _cleanup()

            args = [os.path.join(venv_path, BIN_DIR, 'pip'), 'install']
            if editable:
                args.append('--editable')

            if Popen(args + install_args).wait() != 0:
                click.echo('Failed to pip install.  Aborting.')
                return _cleanup()
        except Exception:
            _cleanup()
            raise

        # Find all the scripts
        scripts = find_scripts(venv_path, package)

        # And link them
        linked_scripts = self.link_scripts(scripts)

        # We did not link any, rollback.
        if not linked_scripts:
            click.echo('Did not find any scripts.  Uninstalling.')
            return _cleanup()
        return True

    def uninstall(self, package):
        path = self.get_package_path(package)
        if not os.path.isdir(path):
            return UninstallInfo(package, installed=False)
        paths = [path]
        paths.extend(self.find_installed_executables(path))
        return UninstallInfo(package, paths)

    def upgrade(self, package, editable=False):
        package, install_args = self.resolve_package(package)

        venv_path = self.get_package_path(package)
        if not os.path.isdir(venv_path):
            click.echo('%s is not installed' % package)
            return

        from subprocess import Popen

        old_scripts = set(find_scripts(venv_path, package))

        args = [os.path.join(venv_path, BIN_DIR, 'pip'), 'install',
                '--upgrade']
        if editable:
            args.append('--editable')

        if Popen(args + install_args).wait() != 0:
            click.echo('Failed to upgrade through pip.  Aborting.')
            return

        scripts = find_scripts(venv_path, package)
        linked_scripts = self.link_scripts(scripts)
        to_delete = old_scripts - set(x[0] for x in linked_scripts)

        for script_src, script_link in linked_scripts:
            if script_src in to_delete:
                try:
                    click.echo('  Removing old script %s' % script_src)
                    os.remove(script_link)
                except (IOError, OSError):
                    pass

    def list_everything(self):
        venvs = {}
        python = '/Scripts/python.exe' if IS_WIN else '/bin/python'
        for venv in os.listdir(self.home):
            venv_path = os.path.join(self.home, venv)
            if os.path.isdir(venv_path) and \
               os.path.isfile(venv_path + python):
                venvs[venv] = []

        def _find_venv(target):
            for venv in venvs:
                if target.startswith(join(self.home, venv, '')):
                    return venv

        for script in os.listdir(self.bin_dir):
            exe = os.path.join(self.bin_dir, script)
            target = real_readlink(exe)
            if target is None:
                continue
            venv = _find_venv(target)
            if venv is not None:
                venvs[venv].append(script)

        return sorted(venvs.items())


@click.group(context_settings=CONTEXT_SETTINGS)
@click.option(
    '--home', type=click.Path(),envvar='PIPSI_HOME',
    default=os.path.expanduser('~/.local/venvs'),
    help='The folder that contains the virtualenvs.')
@click.option(
    '--bin-dir', type=click.Path(),
    envvar='PIPSI_BIN_DIR',
    default=os.path.expanduser('~/.local/bin'),
    help='The path where the scripts are symlinked to.')

@click.version_option(
    message='%(prog)s, version %(version)s, python ' + str(sys.executable))
@click.pass_context
def cli(ctx, home, bin_dir):
    """pipsi is a tool that uses virtualenv and pip to install shell
    tools that are separated from each other.
    """
    ctx.obj = Repo(home, bin_dir)


@cli.command()
@click.argument('package')
@click.option('--python', default=None,
              help='The python interpreter to use.')
@click.option('--editable', '-e', is_flag=True,
              help='Enable editable installation.  This only works for '
                   'locally installed packages.')
@click.pass_obj
def install(repo, package, python, editable):
    """Installs scripts from a Python package.

    Given a package this will install all the scripts and their dependencies
    of the given Python package into a new virtualenv and symlinks the
    discovered scripts into BIN_DIR (defaults to ~/.local/bin).
    """
    if repo.install(package, python, editable):
        click.echo('Done.')
    else:
        sys.exit(1)



@cli.command()
@click.argument('package')
@click.option('--editable', '-e', is_flag=True,
              help='Enable editable installation.  This only works for '
                   'locally installed packages.')
@click.pass_obj
def upgrade(repo, package, editable):
    """Upgrades an already installed package."""
    if repo.upgrade(package, editable):
        click.echo('Done.')
    else:
        sys.exit(1)


@cli.command(short_help='Uninstalls scripts of a package.')
@click.argument('package')
@click.option('--yes', is_flag=True, help='Skips all prompts.')
@click.pass_obj
def uninstall(repo, package, yes):
    """Uninstalls all scripts of a Python package and cleans up the
    virtualenv.
    """
    uinfo = repo.uninstall(package)
    if not uinfo.installed:
        click.echo('%s is not installed' % package)
    else:
        click.echo('The following paths will be removed:')
        for path in uinfo.paths:
            click.echo('  %s' % click.format_filename(path))
        click.echo()
        if yes or click.confirm('Do you want to uninstall %s?' % package):
            uinfo.perform()
            click.echo('Done!')
        else:
            click.echo('Aborted!')
            sys.exit(1)


@cli.command('list')
@click.pass_obj
def list_cmd(repo):
    """Lists all scripts installed through pipsi."""
    click.echo('Packages and scripts installed through pipsi:')
    for venv, scripts in repo.list_everything():
        if not scripts:
            continue
        click.echo('  Package "%s":' % venv)
        for script in scripts:
            click.echo('    ' + script)

if __name__ == '__main__':
    cli()