File: test_apply.py

package info (click to toggle)
python-openshift 0.13.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 332 kB
  • sloc: python: 1,824; makefile: 20; sh: 14
file content (406 lines) | stat: -rw-r--r-- 15,252 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
from openshift.dynamic.apply import merge, apply_patch

tests = [
    dict(
        last_applied = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", two="2")
        ),
        desired = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", two="2")
        ),
        expected = {}
    ),
    dict(
        last_applied = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", two="2")
        ),
        desired = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", two="2", three="3")
        ),
        expected = dict(data=dict(three="3"))
    ),
    dict(
        last_applied = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", two="2")
        ),
        desired = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", three="3")
        ),
        expected = dict(data=dict(two=None, three="3"))
    ),
    dict(
        last_applied = dict(
            kind="ConfigMap",
            metadata=dict(name="foo", annotations=dict(this="one", hello="world")),
            data=dict(one="1", two="2")
        ),
        desired = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", three="3")
        ),
        expected = dict(metadata=dict(annotations=None), data=dict(two=None, three="3"))
    ),

    dict(
        last_applied = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, name="http")])
        ),
        actual = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, protocol='TCP', name="http")])
        ),
        desired = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, name="http")])
        ),
        expected = dict(spec=dict(ports=[dict(port=8080, protocol='TCP', name="http")]))
    ),
    dict(
        last_applied = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, name="http")])
        ),
        actual = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, protocol='TCP', name="http")])
        ),
        desired = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8081, name="http")])
        ),
        expected = dict(spec=dict(ports=[dict(port=8081, name="http")]))
    ),
    dict(
        last_applied = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, name="http")])
        ),
        actual = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, protocol='TCP', name="http")])
        ),
        desired = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8443, name="https"), dict(port=8080, name="http")])
        ),
        expected = dict(spec=dict(ports=[dict(port=8443, name="https"), dict(port=8080, name="http", protocol='TCP')]))
    ),
    dict(
        last_applied = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8443, name="https"), dict(port=8080, name="http")])
        ),
        actual = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8443, protocol='TCP', name="https"), dict(port=8080, protocol='TCP', name='http')])
        ),
        desired = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8080, name="http")])
        ),
        expected = dict(spec=dict(ports=[dict(port=8080, name="http", protocol='TCP')]))
    ),
    dict(
        last_applied = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8443, name="https", madeup="xyz"), dict(port=8080, name="http")])
        ),
        actual = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8443, protocol='TCP', name="https", madeup="xyz"), dict(port=8080, protocol='TCP', name='http')])
        ),
        desired = dict(
            kind="Service",
            metadata=dict(name="foo"),
            spec=dict(ports=[dict(port=8443, name="https")])
        ),
        expected = dict(spec=dict(ports=[dict(madeup=None, port=8443, name="https", protocol='TCP')]))
    ),
    dict(
        last_applied = dict(
            kind="Pod",
            metadata=dict(name="foo"),
            spec=dict(containers=[dict(name="busybox", image="busybox",
                                       resources=dict(requests=dict(cpu="100m", memory="100Mi"), limits=dict(cpu="100m", memory="100Mi")))])
        ),
        actual = dict(
            kind="Pod",
            metadata=dict(name="foo"),
            spec=dict(containers=[dict(name="busybox", image="busybox",
                                       resources=dict(requests=dict(cpu="100m", memory="100Mi"), limits=dict(cpu="100m", memory="100Mi")))])
        ),
        desired = dict(
            kind="Pod",
            metadata=dict(name="foo"),
            spec=dict(containers=[dict(name="busybox", image="busybox",
                                       resources=dict(requests=dict(cpu="50m", memory="50Mi"), limits=dict(memory="50Mi")))])
        ),
        expected=dict(spec=dict(containers=[dict(name="busybox", image="busybox",
                                                 resources=dict(requests=dict(cpu="50m", memory="50Mi"), limits=dict(cpu=None, memory="50Mi")))]))
    ),
    dict(
        desired = dict(kind='Pod',
                       spec=dict(containers=[
                           dict(name='hello',
                                volumeMounts=[dict(name="test", mountPath="/test")])
                           ],
                           volumes=[
                               dict(name="test", configMap=dict(name="test")),
                           ])),
        last_applied = dict(kind='Pod',
                       spec=dict(containers=[
                           dict(name='hello',
                                volumeMounts=[dict(name="test", mountPath="/test")])
                           ],
                           volumes=[
                               dict(name="test", configMap=dict(name="test")),
                           ])),
        actual = dict(kind='Pod',
                       spec=dict(containers=[
                           dict(name='hello',
                                volumeMounts=[dict(name="test", mountPath="/test"),
                                              dict(mountPath="/var/run/secrets/kubernetes.io/serviceaccount", name="default-token-xyz")])
                           ],
                           volumes=[
                               dict(name="test", configMap=dict(name="test")),
                               dict(name="default-token-xyz", secret=dict(secretName="default-token-xyz")),
                           ])),
        expected = dict(spec=dict(containers=[
                           dict(name='hello',
                                volumeMounts=[dict(name="test", mountPath="/test"),
                                              dict(mountPath="/var/run/secrets/kubernetes.io/serviceaccount", name="default-token-xyz")])
                           ],
                           volumes=[
                               dict(name="test", configMap=dict(name="test")),
                               dict(name="default-token-xyz", secret=dict(secretName="default-token-xyz")),
                           ])),
    ),

    # This next one is based on a real world case where definition was mostly
    # str type and everything else was mostly unicode type (don't ask me how)
    dict(
        last_applied = {
            u'kind': u'ConfigMap',
            u'data': {u'one': '1', 'three': '3', 'two': '2'},
            u'apiVersion': u'v1',
            u'metadata': {u'namespace': u'apply', u'name': u'apply-configmap'}
        },
        actual = {
            u'kind': u'ConfigMap',
            u'data': {u'one': '1', 'three': '3', 'two': '2'},
            u'apiVersion': u'v1',
            u'metadata': {u'namespace': u'apply', u'name': u'apply-configmap',
                          u'resourceVersion': '1714994',
                          u'creationTimestamp': u'2019-08-17T05:08:05Z', u'annotations': {},
                          u'selfLink': u'/api/v1/namespaces/apply/configmaps/apply-configmap',
                          u'uid': u'fed45fb0-c0ac-11e9-9d95-025000000001'}
        },
        desired = {
            'kind': u'ConfigMap',
            'data': {'one': '1', 'three': '3', 'two': '2'},
            'apiVersion': 'v1',
            'metadata': {'namespace': 'apply', 'name': 'apply-configmap'}
        },
        expected = dict()
    ),
    # apply a Deployment, then scale the Deployment (which doesn't affect last-applied)
    # then apply the Deployment again. Should un-scale the Deployment
    dict(
        last_applied = {
            'kind': u'Deployment',
            'spec': {
                'replicas': 1,
                'template': {
                    'spec': {
                        'containers': [
                            {
                                'name': 'this_must_exist',
                                'envFrom': [
                                    {
                                        'configMapRef': {
                                            'name': 'config-xyz'
                                        }
                                    },
                                    {
                                        'secretRef': {
                                            'name': 'config-wxy'
                                        }
                                    }
                                ]
                            }
                        ]
                    }
                }
            },
            'metadata': {
                'namespace': 'apply',
                'name': u'apply-deployment'
            }
        },
        actual = {
            'kind': u'Deployment',
            'spec': {
                'replicas': 0,
                'template': {
                    'spec': {
                        'containers': [
                            {
                                'name': 'this_must_exist',
                                'envFrom': [
                                    {
                                        'configMapRef': {
                                            'name': 'config-xyz'
                                        }
                                    },
                                    {
                                        'secretRef': {
                                            'name': 'config-wxy'
                                        }
                                    }
                                ]
                            }
                        ]
                    }
                }
            },
            'metadata': {
                'namespace': 'apply',
                'name': u'apply-deployment'
            }
        },
        desired = {
            'kind': u'Deployment',
            'spec': {
                'replicas': 1,
                'template': {
                    'spec': {
                        'containers': [
                            {
                                'name': 'this_must_exist',
                                'envFrom': [
                                    {
                                        'configMapRef': {
                                            'name': 'config-abc'
                                        }
                                    }
                                ]
                            }
                        ]
                    }
                }
            },
            'metadata': {
                'namespace': 'apply',
                'name': u'apply-deployment'
            }
        },
        expected = {
            'spec' : {
                'replicas': 1,
                'template': {
                    'spec': {
                        'containers': [
                            {
                                'name': 'this_must_exist',
                                'envFrom': [
                                    {
                                        'configMapRef': {
                                            'name': 'config-abc'
                                        }
                                    }
                                ]
                            }
                        ]
                    }
                }
            }
        }
    ),
    dict(
        last_applied = {
            'kind': 'MadeUp',
            'toplevel': {
                'original': 'entry'
            }
        },
        actual = {
            'kind': 'MadeUp',
            'toplevel': {
                'original': 'entry',
                'another': {
                    'nested': {
                         'entry': 'value'
                    }
                }
            }
        },
        desired = {
            'kind': 'MadeUp',
            'toplevel': {
                'original': 'entry',
                'another': {
                    'nested': {
                         'entry': 'value'
                    }
                }
            }
        },
        expected = {}
    )
]


def test_merges():
    for test in tests:
        assert(merge(test['last_applied'], test['desired'], test.get('actual', test['last_applied'])) == test['expected'])


def test_apply_patch():
        actual = dict(
            kind="ConfigMap",
            metadata=dict(name="foo",
                          annotations={'kubectl.kubernetes.io/last-applied-configuration':
                                       '{"data":{"one":"1","two":"2"},"kind":"ConfigMap",'
                                       '"metadata":{"annotations":{"hello":"world","this":"one"},"name":"foo"}}',
                                       'this': 'one', 'hello': 'world'}),
            data=dict(one="1", two="2")
        )
        desired = dict(
            kind="ConfigMap",
            metadata=dict(name="foo"),
            data=dict(one="1", three="3")
        )
        expected = dict(
            metadata=dict(
                annotations={'kubectl.kubernetes.io/last-applied-configuration': '{"data":{"one":"1","three":"3"},"kind":"ConfigMap","metadata":{"name":"foo"}}',
                             'this': None, 'hello': None}),
                data=dict(two=None, three="3"))
        assert(apply_patch(actual, desired) == (actual, expected))