File: aur.py

package info (click to toggle)
python-nvchecker 2.16-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 736 kB
  • sloc: python: 4,801; makefile: 25
file content (109 lines) | stat: -rw-r--r-- 2,986 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# MIT licensed
# Copyright (c) 2013-2020,2024 lilydjwg <lilydjwg@gmail.com>, et al.

from datetime import datetime, timezone
import asyncio
from typing import Iterable, Dict, List, Tuple, Any, Optional

from nvchecker.api import (
  session, GetVersionError, VersionResult, RichResult,
  Entry, BaseWorker, RawResult,
)

AUR_URL = 'https://aur.archlinux.org/rpc/'

class AurResults:
  cache: Dict[str, Optional[Dict[str, Any]]]

  def __init__(self) -> None:
    self.cache = {}

  async def get_multiple(
    self,
    aurnames: Iterable[str],
  ) -> Dict[str, Optional[Dict[str, Any]]]:
    params = [('v', '5'), ('type', 'info')]
    params.extend(('arg[]', name) for name in aurnames
                  if name not in self.cache)
    res = await session.get(AUR_URL, params=params)
    data = res.json()
    new_results = {r['Name']: r for r in data['results']}

    cache = self.cache
    cache.update(new_results)
    cache.update(
      (name, None)
      for name in set(aurnames) - new_results.keys()
    )

    return {name: cache[name] for name in aurnames
            if name in cache}

class Worker(BaseWorker):
  # https://wiki.archlinux.org/index.php/Aurweb_RPC_interface#Limitations
  batch_size = 100

  async def run(self) -> None:
    tasks = self.tasks
    n_batch, left = divmod(len(tasks), self.batch_size)
    if left > 0:
      n_batch += 1

    aur_results = AurResults()

    ret = []
    for i in range(n_batch):
      s = i * self.batch_size
      batch = tasks[s : s+self.batch_size]
      fu = self._run_batch(batch, aur_results)
      ret.append(fu)

    await asyncio.gather(*ret)

  async def _run_batch(
    self,
    batch: List[Tuple[str, Entry]],
    aur_results: AurResults,
  ) -> None:
    task_by_name: Dict[str, Entry] = dict(self.tasks)

    async with self.task_sem:
      results = await _run_batch_impl(batch, aur_results)
      for name, version in results.items():
        r = RawResult(name, version, task_by_name[name])
        await self.result_q.put(r)

async def _run_batch_impl(
  batch: List[Tuple[str, Entry]],
  aur_results: AurResults,
) -> Dict[str, VersionResult]:
  aurnames = {conf.get('aur', name) for name, conf in batch}
  results = await aur_results.get_multiple(aurnames)

  ret: Dict[str, VersionResult] = {}

  for name, conf in batch:
    aurname = conf.get('aur', name)
    use_last_modified = conf.get('use_last_modified', False)
    strip_release = conf.get('strip_release', False)

    result = results.get(aurname)

    if result is None:
      ret[name] = GetVersionError('AUR upstream not found')
      continue

    version = result['Version']
    if use_last_modified:
      dt = datetime.fromtimestamp(result['LastModified'], timezone.utc)
      version += '-' + dt.strftime('%Y%m%d%H%M%S')
    if strip_release and '-' in version:
      version = version.rsplit('-', 1)[0]

    ret[name] = RichResult(
      version = version,
      url = f'https://aur.archlinux.org/packages/{name}',
    )

  return ret