File: repo_manager.py

package info (click to toggle)
errbot 6.1.7+ds-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 3,712 kB
  • sloc: python: 13,831; makefile: 164; sh: 97
file content (324 lines) | stat: -rw-r--r-- 11,180 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import json
import logging
import os
import re
import shutil
import tarfile
from collections import namedtuple
from datetime import datetime, timedelta
from os import path
from pathlib import Path
from typing import Dict, Generator, List, Sequence, Tuple, Union
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import Request, urlopen

from errbot.storage import StoreMixin
from errbot.storage.base import StoragePluginBase

from .utils import ON_WINDOWS, git_clone, git_pull

log = logging.getLogger(__name__)


def human_name_for_git_url(url):
    # try to humanize the last part of the git url as much as we can
    s = url.split(":")[-1].split("/")[-2:]
    if s[-1].endswith(".git"):
        s[-1] = s[-1][:-4]
    return str("/".join(s))


INSTALLED_REPOS = "installed_repos"

REPO_INDEXES_CHECK_INTERVAL = timedelta(hours=1)

REPO_INDEX = "repo_index"
LAST_UPDATE = "last_update"

RepoEntry = namedtuple(
    "RepoEntry", "entry_name, name, python, repo, path, avatar_url, documentation"
)
FIND_WORDS_RE = re.compile(r"(\w[\w']*\w|\w)")


class RepoException(Exception):
    pass


def makeEntry(repo_name: str, plugin_name: str, json_value):
    return RepoEntry(
        entry_name=repo_name,
        name=plugin_name,
        python=json_value["python"],
        repo=json_value["repo"],
        path=json_value["path"],
        avatar_url=json_value["avatar_url"],
        documentation=json_value["documentation"],
    )


def tokenizeJsonEntry(json_dict):
    """
    Returns all the words in a repo entry.
    """
    search = " ".join((str(word) for word in json_dict.values()))
    return set(FIND_WORDS_RE.findall(search.lower()))


def which(program):
    if ON_WINDOWS:
        program += ".exe"

    def is_exe(file_path):
        return os.path.isfile(file_path) and os.access(file_path, os.X_OK)

    fpath, fname = os.path.split(program)
    if fpath:
        if is_exe(program):
            return program
    else:
        for path in os.environ["PATH"].split(os.pathsep):
            exe_file = os.path.join(path, program)
            if is_exe(exe_file):
                return exe_file

    return None


def check_dependencies(req_path: Path) -> Tuple[Union[str, None], Sequence[str]]:
    """This methods returns a pair of (message, packages missing).
    Or None, [] if everything is OK.
    """
    log.debug("check dependencies of %s", req_path)
    # noinspection PyBroadException
    try:
        from pkg_resources import get_distribution

        missing_pkg = []

        if not req_path.is_file():
            log.debug("%s has no requirements.txt file", req_path)
            return None, missing_pkg

        with req_path.open() as f:
            for line in f:
                stripped = line.strip()
                # skip empty lines.
                if not stripped:
                    continue

                # noinspection PyBroadException
                try:
                    get_distribution(stripped)
                except Exception:
                    missing_pkg.append(stripped)
        if missing_pkg:
            return (
                f"You need these dependencies for {req_path}: " + ",".join(missing_pkg),
                missing_pkg,
            )
        return None, missing_pkg
    except Exception:
        log.exception("Problem checking for dependencies.")
        return (
            "You need to have setuptools installed for the dependency check of the plugins",
            [],
        )


class BotRepoManager(StoreMixin):
    """
    Manages the repo list, git clones/updates or the repos.
    """

    def __init__(
        self,
        storage_plugin: StoragePluginBase,
        plugin_dir: str,
        plugin_indexes: Tuple[str, ...],
    ) -> None:
        """
        Make a repo manager.
        :param storage_plugin: where the manager store its state.
        :param plugin_dir: where on disk it will git clone the repos.
        :param plugin_indexes: a list of URL / path to get the json repo index.
        """
        super().__init__()
        self.plugin_indexes = plugin_indexes
        self.storage_plugin = storage_plugin
        self.plugin_dir = plugin_dir
        self.open_storage(storage_plugin, "repomgr")

    def shutdown(self) -> None:
        self.close_storage()

    def check_for_index_update(self) -> None:
        if REPO_INDEX not in self:
            log.info("No repo index, creating it.")
            self.index_update()
            return

        if (
            datetime.fromtimestamp(self[REPO_INDEX][LAST_UPDATE])
            < datetime.now() - REPO_INDEXES_CHECK_INTERVAL
        ):
            log.info("Index is too old, update it.")
            self.index_update()

    def index_update(self) -> None:
        index = {LAST_UPDATE: datetime.now().timestamp()}
        for source in reversed(self.plugin_indexes):
            try:
                if urlparse(source).scheme in ("http", "https"):
                    req = Request(source, headers={"User-Agent": "Errbot"})
                    with urlopen(url=req, timeout=10) as request:  # nosec
                        log.debug("Update from remote source %s...", source)
                        encoding = request.headers.get_content_charset()
                        content = request.read().decode(
                            encoding if encoding else "utf-8"
                        )
                else:
                    with open(source, encoding="utf-8", mode="r") as src_file:
                        log.debug("Update from local source %s...", source)
                        content = src_file.read()
                index.update(json.loads(content))
            except (HTTPError, URLError, IOError):
                log.exception(
                    "Could not update from source %s, keep the index as it is.", source
                )
                break
        else:
            # nothing failed so ok, we can store the index.
            self[REPO_INDEX] = index
            log.debug("Stored %d repo entries.", len(index) - 1)

    def get_repo_from_index(self, repo_name: str) -> List[RepoEntry]:
        """
        Retrieve the list of plugins for the repo_name from the index.

        :param repo_name: the name of the repo
        :return: a list of RepoEntry
        """
        plugins = self[REPO_INDEX].get(repo_name, None)
        if plugins is None:
            return None
        result = []
        for name, plugin in plugins.items():
            result.append(makeEntry(repo_name, name, plugin))
        return result

    def search_repos(self, query: str) -> Generator[RepoEntry, None, None]:
        """
        A simple search feature, keywords are AND and case insensitive on all the fields.

        :param query: a string query
        :return: an iterator of RepoEntry
        """
        # first see if we are up to date.
        self.check_for_index_update()
        if REPO_INDEX not in self:
            log.error("No index.")
            return
        query_work_set = set(FIND_WORDS_RE.findall(query.lower()))
        for repo_name, plugins in self[REPO_INDEX].items():
            if repo_name == LAST_UPDATE:
                continue
            for plugin_name, plugin in plugins.items():
                if query_work_set.intersection(tokenizeJsonEntry(plugin)):
                    yield makeEntry(repo_name, plugin_name, plugin)

    def get_installed_plugin_repos(self) -> Dict[str, str]:
        return self.get(INSTALLED_REPOS, {})

    def add_plugin_repo(self, name: str, url: str) -> None:
        with self.mutable(INSTALLED_REPOS, {}) as repos:
            repos[name] = url

    def set_plugin_repos(self, repos: Dict[str, str]) -> None:
        """Used externally."""
        self[INSTALLED_REPOS] = repos

    def get_all_repos_paths(self) -> List[str]:
        return [
            os.path.join(self.plugin_dir, d)
            for d in self.get(INSTALLED_REPOS, {}).keys()
        ]

    def install_repo(self, repo: str) -> str:
        """
        Install the repository from repo

        :param repo:
            The url, git url or path on disk of a repository. It can point to either a git repo or
             a .tar.gz of a plugin
        :returns:
            The path on disk where the repo has been installed on.
        :raises: :class:`~RepoException` if an error occured.
        """
        self.check_for_index_update()

        human_name = None
        # try to find if we have something with that name in our index
        if repo in self[REPO_INDEX]:
            human_name = repo
            repo_url = next(iter(self[REPO_INDEX][repo].values()))["repo"]
        elif not repo.endswith("tar.gz"):
            # This is a repo url, make up a plugin definition for it
            human_name = human_name_for_git_url(repo)
            repo_url = repo
        else:
            repo_url = repo

        # TODO: Update download path of plugin.
        if repo_url.endswith("tar.gz"):
            fo = urlopen(repo_url)  # nosec
            tar = tarfile.open(fileobj=fo, mode="r:gz")
            tar.extractall(path=self.plugin_dir)
            s = repo_url.split(":")[-1].split("/")[-1]
            human_name = s[: -len(".tar.gz")]
        else:
            human_name = human_name or human_name_for_git_url(repo_url)
            try:
                git_clone(repo_url, os.path.join(self.plugin_dir, human_name))
            except Exception as exception:  # dulwich errors all base on exceptions.Exception
                raise RepoException(
                    f"Could not load this plugin: \n\n{repo_url}\n\n---\n\n{exception}"
                )

        self.add_plugin_repo(human_name, repo_url)
        return os.path.join(self.plugin_dir, human_name)

    def update_repos(self, repos) -> Generator[Tuple[str, int, str], None, None]:
        """
        This git pulls the specified repos on disk.
        Yields tuples like (name, success, reason)
        """
        # protects for update outside of what we know is installed
        names = set(self.get_installed_plugin_repos().keys()).intersection(set(repos))

        for d in (path.join(self.plugin_dir, name) for name in names):
            success = 1
            try:
                git_pull(d)
                feedback = "Pulled remote"
                success = 0
            except Exception as exception:
                feedback = f"Error pulling remote {exception}"
                pass

            dep_err, missing_pkgs = check_dependencies(Path(d) / "requirements.txt")
            if dep_err:
                feedback += dep_err + "\n"
            yield d, success, feedback

    def update_all_repos(self) -> Generator[Tuple[str, int, str], None, None]:
        return self.update_repos(self.get_installed_plugin_repos().keys())

    def uninstall_repo(self, name: str) -> None:
        repo_path = path.join(self.plugin_dir, name)
        # ignore errors because the DB can be desync'ed from the file tree.
        shutil.rmtree(repo_path, ignore_errors=True)
        repos = self.get_installed_plugin_repos()
        del repos[name]
        self.set_plugin_repos(repos)