File: vine_python_hungry.py

package info (click to toggle)
cctools 1%3A7.14.5-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 36,956 kB
  • sloc: ansic: 114,614; python: 29,532; cpp: 20,313; sh: 13,675; perl: 4,056; xml: 3,688; makefile: 1,436
file content (144 lines) | stat: -rw-r--r-- 3,338 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#! /usr/bin/env python

import ndcctools.taskvine as vine
import signal
import sys


def timeout(signum, frame):
    print("hungry test did not finish in time")
    sys.exit(1)


command = "sleep 120"


signal.signal(signal.SIGALRM, timeout)
signal.alarm(600)


m = vine.Manager(port=0)

m.tune("hungry-minimum", 11)
assert m.hungry() == 11

m.tune("hungry-minimum", 2)
assert m.hungry() == 2

t_1 = vine.Task(command)
i_1 = m.submit(t_1)

assert m.hungry() == 1

t_2 = vine.Task(command)
i_2 = m.submit(t_2)

assert m.hungry() == 0

worker_cores = 12
worker_memory = 1200
worker_disk = 1200

workers = vine.Factory("local", manager=m)
workers.max_workers = 1
workers.min_workers = 1
workers.cores = worker_cores
workers.disk = worker_disk
workers.memory = worker_memory

with workers:
    while m.stats.tasks_running < 1:
        m.wait(1)

    # hungry-minimum is 2, 2 tasks submitted, one waiting, thus hungry for 1 task
    assert m.hungry() == 1

    m.tune("hungry-minimum", 5)

    # hungry-minimum is 5, 2 tasks submitted, one waiting, thus hungry for 4 tasks
    assert m.hungry() == 4

    m.cancel_by_task_id(i_1)
    m.cancel_by_task_id(i_2)

    while m.stats.tasks_running > 0:
        m.wait(1)

    # hungry-minimum is 5, no tasks submitted, thus hungry for max of 5 tasks and 2 x worker_cores
    assert m.hungry() == 2 * worker_cores

    t_3 = vine.Task(command)
    t_3.set_cores(1)
    i_3 = m.submit(t_3)

    while m.stats.tasks_running < 1:
        m.wait(1)

    # hungry-minimum is 5, and tasks with one core is running. max of 5 and 2 x worker_cores - cores running
    assert m.hungry() == worker_cores * 2 - 1

    factor = 3
    m.tune("hungry-minimum-factor", factor)

    # as previous, but now available has different hungry factor
    assert m.hungry() == worker_cores * factor - 1

    t_4 = vine.Task(command)
    t_4.set_cores(worker_cores - 1)
    i_4 = m.submit(t_4)

    while m.stats.tasks_running < 2:
        m.wait(1)

    # hungry-minimum is 5, and all cores are being used
    assert m.hungry() == 5

    m.cancel_by_task_id(i_3)
    m.cancel_by_task_id(i_4)

    while m.stats.tasks_running > 0:
        m.wait(1)

    mem_task = int(2 * worker_memory / worker_cores)
    t_5 = vine.Task(command)
    t_5.set_cores(1)
    t_5.set_memory(mem_task)
    i_5 = m.submit(t_5)

    while m.stats.tasks_running < 1:
        m.wait(1)

    # memory should be the limit factor here
    assert m.hungry() == (factor * worker_memory - mem_task) / mem_task

    m.cancel_by_task_id(i_5)

    cores_t_6 = 1
    t_6 = vine.Task(command)
    t_6.set_cores(cores_t_6)
    t_6.set_memory(1)
    t_6.set_disk(1)
    i_6 = m.submit(t_6)

    cores_t_7 = 11
    t_7 = vine.Task(command)
    t_7.set_cores(cores_t_7)
    t_7.set_memory(1)
    t_7.set_disk(1)
    i_7 = m.submit(t_7)

    while m.stats.tasks_running < 2:
        m.wait(1)

    cores_t_8 = 2
    t_8 = vine.Task(command)
    t_8.set_cores(cores_t_8)
    i_8 = m.submit(t_8)

    factor = 10
    m.tune("hungry-minimum-factor", factor)

    # avg cores waiting should be the limiting factor
    # each task would get two cores, minus one task of the already waiting task
    print(m.hungry(), (factor * worker_cores - cores_t_6 - cores_t_7) / cores_t_8 - 1)
    assert m.hungry() == (factor * worker_cores - cores_t_6 - cores_t_7) / cores_t_8 - 1