1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
"""
GitLab API: https://docs.gitlab.com/ce/api/pipeline_schedules.html
"""
import pytest
import responses
from gitlab.v4.objects import ProjectPipelineSchedulePipeline
pipeline_content = {
"id": 48,
"iid": 13,
"project_id": 29,
"status": "pending",
"source": "scheduled",
"ref": "new-pipeline",
"sha": "eb94b618fb5865b26e80fdd8ae531b7a63ad851a",
"web_url": "https://example.com/foo/bar/pipelines/48",
"created_at": "2016-08-12T10:06:04.561Z",
"updated_at": "2016-08-12T10:09:56.223Z",
}
@pytest.fixture
def resp_create_pipeline_schedule():
content = {
"id": 14,
"description": "Build packages",
"ref": "main",
"cron": "0 1 * * 5",
"cron_timezone": "UTC",
"next_run_at": "2017-05-26T01:00:00.000Z",
"active": True,
"created_at": "2017-05-19T13:43:08.169Z",
"updated_at": "2017-05-19T13:43:08.169Z",
"last_pipeline": None,
"owner": {
"name": "Administrator",
"username": "root",
"id": 1,
"state": "active",
"avatar_url": "http://www.gravatar.com/avatar/e64c7d89f26bd1972efa854d13d7dd61?s=80&d=identicon",
"web_url": "https://gitlab.example.com/root",
},
}
with responses.RequestsMock() as rsps:
rsps.add(
method=responses.POST,
url="http://localhost/api/v4/projects/1/pipeline_schedules",
json=content,
content_type="application/json",
status=200,
)
yield rsps
@pytest.fixture
def resp_play_pipeline_schedule(created_content):
with responses.RequestsMock() as rsps:
rsps.add(
method=responses.POST,
url="http://localhost/api/v4/projects/1/pipeline_schedules/1/play",
json=created_content,
content_type="application/json",
status=201,
)
yield rsps
@pytest.fixture
def resp_list_schedule_pipelines():
with responses.RequestsMock() as rsps:
rsps.add(
method=responses.GET,
url="http://localhost/api/v4/projects/1/pipeline_schedules/1/pipelines",
json=[pipeline_content],
content_type="application/json",
status=200,
)
yield rsps
def test_create_project_pipeline_schedule(project, resp_create_pipeline_schedule):
description = "Build packages"
cronline = "0 1 * * 5"
sched = project.pipelineschedules.create(
{"ref": "main", "description": description, "cron": cronline}
)
assert sched is not None
assert description == sched.description
assert cronline == sched.cron
def test_play_project_pipeline_schedule(schedule, resp_play_pipeline_schedule):
play_result = schedule.play()
assert play_result is not None
assert "message" in play_result
assert play_result["message"] == "201 Created"
def test_list_project_pipeline_schedule_pipelines(
schedule, resp_list_schedule_pipelines
):
pipelines = schedule.pipelines.list()
assert isinstance(pipelines, list)
assert isinstance(pipelines[0], ProjectPipelineSchedulePipeline)
assert pipelines[0].source == "scheduled"
|