File: repository.py

package info (click to toggle)
python-bitbucket-api 0.5.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 128 kB
  • sloc: python: 661; makefile: 4
file content (114 lines) | stat: -rw-r--r-- 5,135 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
from tempfile import NamedTemporaryFile
from zipfile import ZipFile


URLS = {
    'CREATE_REPO': 'repositories/',
    'GET_REPO': 'repositories/%(username)s/%(repo_slug)s/',
    'UPDATE_REPO': 'repositories/%(username)s/%(repo_slug)s/',
    'DELETE_REPO': 'repositories/%(username)s/%(repo_slug)s/',
    # Get archive
    'GET_ARCHIVE': 'repositories/%(username)s/%(repo_slug)s/%(format)s/master/',
}


class Repository(object):
    """ This class provide repository-related methods to Bitbucket objects."""

    def __init__(self, bitbucket):
        self.bitbucket = bitbucket
        self.bitbucket.URLS.update(URLS)

    def _get_files_in_dir(self, repo_slug=None, dir='/'):
        repo_slug = repo_slug or self.bitbucket.repo_slug or ''
        dir = dir.lstrip('/')
        url = self.bitbucket.url(
            'GET_ARCHIVE',
            username=self.bitbucket.username,
            repo_slug=repo_slug,
            format='src')
        dir_url = url + dir
        response = self.bitbucket.dispatch('GET', dir_url, auth=self.bitbucket.auth)
        if response[0] and isinstance(response[1], dict):
            repo_tree = response[1]
            url = self.bitbucket.url(
                'GET_ARCHIVE',
                username=self.bitbucket.username,
                repo_slug=repo_slug,
                format='raw')
            # Download all files in dir
            for file in repo_tree['files']:
                file_url = url + '/'.join((file['path'],))
                response = self.bitbucket.dispatch('GET', file_url, auth=self.bitbucket.auth)
                self.bitbucket.repo_tree[file['path']] = response[1]
            # recursively download in dirs
            for directory in repo_tree['directories']:
                dir_path = '/'.join((dir, directory))
                self._get_files_in_dir(repo_slug=repo_slug, dir=dir_path)

    def public(self, username=None):
        """ Returns all public repositories from an user.
            If username is not defined, tries to return own public repos.
        """
        username = username or self.bitbucket.username or ''
        url = self.bitbucket.url('GET_USER', username=username)
        response = self.bitbucket.dispatch('GET', url)
        try:
            return (response[0], response[1]['repositories'])
        except TypeError:
            pass
        return response

    def all(self):
        """ Return own repositories."""
        url = self.bitbucket.url('GET_USER', username=self.bitbucket.username)
        response = self.bitbucket.dispatch('GET', url, auth=self.bitbucket.auth)
        try:
            return (response[0], response[1]['repositories'])
        except TypeError:
            pass
        return response

    def get(self, repo_slug=None):
        """ Get a single repository on Bitbucket and return it."""
        repo_slug = repo_slug or self.bitbucket.repo_slug or ''
        url = self.bitbucket.url('GET_REPO', username=self.bitbucket.username, repo_slug=repo_slug)
        return self.bitbucket.dispatch('GET', url, auth=self.bitbucket.auth)

    def create(self, repo_name, scm='git', private=True, **kwargs):
        """ Creates a new repository on own Bitbucket account and return it."""
        url = self.bitbucket.url('CREATE_REPO')
        return self.bitbucket.dispatch('POST', url, auth=self.bitbucket.auth, name=repo_name, scm=scm, is_private=private, **kwargs)

    def update(self, repo_slug=None, **kwargs):
        """ Updates repository on own Bitbucket account and return it."""
        repo_slug = repo_slug or self.bitbucket.repo_slug or ''
        url = self.bitbucket.url('UPDATE_REPO', username=self.bitbucket.username, repo_slug=repo_slug)
        return self.bitbucket.dispatch('PUT', url, auth=self.bitbucket.auth, **kwargs)

    def delete(self, repo_slug=None):
        """ Delete a repository on own Bitbucket account.
            Please use with caution as there is NO confimation and NO undo.
        """
        repo_slug = repo_slug or self.bitbucket.repo_slug or ''
        url = self.bitbucket.url('DELETE_REPO', username=self.bitbucket.username, repo_slug=repo_slug)
        return self.bitbucket.dispatch('DELETE', url, auth=self.bitbucket.auth)

    def archive(self, repo_slug=None, format='zip', prefix=''):
        """ Get one of your repositories and compress it as an archive.
            Return the path of the archive.

            format parameter is curently not supported.
        """
        prefix = '%s'.lstrip('/') % prefix
        self._get_files_in_dir(repo_slug=repo_slug, dir='/')
        if self.bitbucket.repo_tree:
            with NamedTemporaryFile(delete=False) as archive:
                with ZipFile(archive, 'w') as zip_archive:
                    for name, file in self.bitbucket.repo_tree.items():
                        with NamedTemporaryFile(delete=False) as temp_file:
                            temp_file.write(file.encode('utf-8'))
                        zip_archive.write(temp_file.name, prefix + name)
            return (True, archive.name)
        return (False, 'Could not archive your project.')