1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
from api_lib import APITest
from .publish import DefaultSigningOptions
class TaskAPITestParallelTasks(APITest):
"""
GET /api/tasks, GET /api/tasks/:id/wait, GET /api/tasks-wait
"""
def _create_mirror(self, dist):
mirror_name = self.random_name()
mirror_desc = {'Name': mirror_name,
'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/',
'Distribution': dist,
'Architectures': ["amd64", "i386"],
'Components': ['main']}
mirror_desc['IgnoreSignatures'] = True
resp = self.post("/api/mirrors", json=mirror_desc)
self.check_equal(resp.status_code, 201)
resp = self.put("/api/mirrors/" + mirror_name, json=mirror_desc, params={'_async': True})
self.check_equal(resp.status_code, 202)
# check that two mirror updates are queuedd
resp2 = self.put("/api/mirrors/" + mirror_name, json=mirror_desc, params={'_async': True})
self.check_equal(resp2.status_code, 202)
return resp.json()['ID'], mirror_name
def _create_repo(self):
repo_name = self.random_name()
distribution = self.random_name()
self.check_equal(self.post("/api/repos",
json={
"Name": repo_name,
"Comment": "fun repo",
"DefaultDistribution": distribution
}).status_code, 201)
d = self.random_name()
self.check_equal(
self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc",
"pyspi_0.6.1-1.3.diff.gz",
"pyspi_0.6.1.orig.tar.gz").status_code, 200)
resp = self.post("/api/repos/" + repo_name + "/file/" + d, params={'_async': True})
self.check_equal(resp.status_code, 202)
return resp.json()['ID'], repo_name
def _wait_for_task(self, task_id):
uri = "/api/tasks/%d/wait" % int(task_id)
task = self.get(uri)
self.check_task(task)
def _wait_for_all_tasks(self):
resp = self.get("/api/tasks-wait")
self.check_equal(resp.status_code, 200)
def _snapshot(self, res_type, name):
uri = "/api/%s/%s/snapshots" % (res_type, name)
resp = self.post(uri, json={"Name": name}, params={'_async': True})
self.check_equal(resp.status_code, 202)
return resp.json()['ID']
def _publish(self, source_kind, name):
resp = self.post("/api/publish",
json={
"SourceKind": source_kind,
"Sources": [{"Name": name}],
"Signing": DefaultSigningOptions,
}, params={'_async': True})
self.check_equal(resp.status_code, 202)
return resp.json()['ID']
def check(self):
publish_task_ids = []
mirror_task_list = []
for mirror_dist in ['squeeze', 'wheezy']:
mirror_task_id, mirror_name = self._create_mirror(mirror_dist)
mirror_task_list.append((mirror_task_id, mirror_name))
repo_task_id, repo_name = self._create_repo()
self._wait_for_task(repo_task_id)
resp = self.delete("/api/tasks/%d" % repo_task_id)
self.check_equal(resp.status_code, 200)
resp = self.get("/api/tasks/%d" % repo_task_id)
self.check_equal(resp.status_code, 404)
repo_snap_task_id = self._snapshot('repos', repo_name)
self._wait_for_task(repo_snap_task_id)
publish_task_ids.append(self._publish('snapshot', repo_name))
for mirror_task_id, mirror_name in mirror_task_list:
self._wait_for_task(mirror_task_id)
mirror_snap_task_id = self._snapshot('mirrors', mirror_name)
self._wait_for_task(mirror_snap_task_id)
publish_task_ids.append(self._publish('snapshot', mirror_name))
self._wait_for_all_tasks()
for publish_task_id in publish_task_ids:
task = self.get("/api/tasks/%d" % publish_task_id)
self.check_task(task)
|