File: test_parameterized_builds.py

package info (click to toggle)
python-jenkinsapi 0.3.17-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,500 kB
  • sloc: python: 10,001; xml: 50; makefile: 31; sh: 26
file content (135 lines) | stat: -rw-r--r-- 4,151 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
System tests for `jenkinsapi.jenkins` module.
"""

import io
import time

from jenkinsapi_tests.test_utils.random_strings import random_string
from jenkinsapi_tests.systests.job_configs import JOB_WITH_FILE
from jenkinsapi_tests.systests.job_configs import JOB_WITH_FILE_AND_PARAMS
from jenkinsapi_tests.systests.job_configs import JOB_WITH_PARAMETERS


def test_invoke_job_with_file(jenkins):
    file_data = random_string()
    param_file = io.BytesIO(file_data.encode("utf-8"))

    job_name = "create1_%s" % random_string()
    job = jenkins.create_job(job_name, JOB_WITH_FILE)

    assert job.has_params() is True
    assert len(job.get_params_list()) != 0

    job.invoke(block=True, files={"file.txt": param_file})

    build = job.get_last_build()
    while build.is_running():
        time.sleep(0.25)

    artifacts = build.get_artifact_dict()
    assert isinstance(artifacts, dict) is True
    art_file = artifacts["file.txt"]
    assert art_file.get_data().decode("utf-8").strip() == file_data


def test_invoke_job_parameterized(jenkins):
    param_B = random_string()

    job_name = "create2_%s" % random_string()
    job = jenkins.create_job(job_name, JOB_WITH_PARAMETERS)
    job.invoke(block=True, build_params={"B": param_B})
    build = job.get_last_build()

    artifacts = build.get_artifact_dict()
    artB = artifacts["b.txt"]
    assert artB.get_data().decode("UTF-8", "replace").strip() == param_B

    assert param_B in build.get_console()


def test_parameterized_job_build_queuing(jenkins):
    """
    Accept multiple builds of parameterized jobs with unique parameters.
    """
    job_name = "create_%s" % random_string()
    job = jenkins.create_job(job_name, JOB_WITH_PARAMETERS)

    # Latest Jenkins schedules builds to run right away, so remove all
    # executors from master node to investigate queue
    master = jenkins.nodes["Built-In Node"]
    num_executors = master.get_num_executors()
    master.set_num_executors(0)

    for i in range(3):
        param_B = random_string()
        params = {"B": param_B}
        job.invoke(build_params=params)

    assert job.has_queued_build(params) is True

    master.set_num_executors(num_executors)

    while job.has_queued_build(params):
        time.sleep(0.25)

    build = job.get_last_build()
    while build.is_running():
        time.sleep(0.25)

    artifacts = build.get_artifact_dict()
    assert isinstance(artifacts, dict) is True
    artB = artifacts["b.txt"]
    assert artB.get_data().decode("utf-8").strip() == param_B

    assert param_B in build.get_console()


def test_parameterized_multiple_builds_get_the_same_queue_item(jenkins):
    """
    Multiple attempts to run the same parameterized
    build will get the same queue item.
    """
    job_name = "create_%s" % random_string()
    job = jenkins.create_job(job_name, JOB_WITH_PARAMETERS)

    # Latest Jenkins schedules builds to run right away, so remove all
    # executors from master node to investigate queue
    master = jenkins.nodes["Built-In Node"]
    num_executors = master.get_num_executors()
    master.set_num_executors(0)

    for i in range(3):
        params = {"B": random_string()}
        qq0 = job.invoke(build_params=params)

    qq1 = job.invoke(build_params=params)
    assert qq0 == qq1

    master.set_num_executors(num_executors)


def test_invoke_job_with_file_and_params(jenkins):
    file_data = random_string()
    param_data = random_string()
    param_file = io.BytesIO(file_data.encode("utf-8"))

    job_name = "create_%s" % random_string()
    job = jenkins.create_job(job_name, JOB_WITH_FILE_AND_PARAMS)

    assert job.has_params() is True
    assert len(job.get_params_list()) != 0

    qi = job.invoke(
        block=True,
        files={"file.txt": param_file},
        build_params={"B": param_data},
    )

    build = qi.get_build()
    artifacts = build.get_artifact_dict()
    assert isinstance(artifacts, dict) is True
    art_file = artifacts["file.txt"]
    assert art_file.get_data().decode("utf-8").strip() == file_data
    art_param = artifacts["file1.txt"]
    assert art_param.get_data().decode("utf-8").strip() == param_data