File: test_throttle.py

package info (click to toggle)
ansible-core 2.19.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 32,944 kB
  • sloc: python: 181,408; cs: 4,929; sh: 4,661; xml: 34; makefile: 21
file content (33 lines) | stat: -rwxr-xr-x 1,023 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3

from __future__ import annotations

import os
import sys
import time

# read the args from sys.argv
throttledir, inventory_hostname, max_throttle = sys.argv[1:]
# format/create additional vars
max_throttle = int(max_throttle)
throttledir = os.path.expanduser(throttledir)
throttlefile = os.path.join(throttledir, inventory_hostname)
try:
    # create the file
    with open(throttlefile, 'a'):
        os.utime(throttlefile, None)
    # count the number of files in the dir
    throttlelist = os.listdir(throttledir)
    print("tasks: %d/%d" % (len(throttlelist), max_throttle))
    # if we have too many files, fail
    if len(throttlelist) > max_throttle:
        print(throttlelist)
        raise ValueError("Too many concurrent tasks: %d/%d" % (len(throttlelist), max_throttle))
    time.sleep(1.5)
finally:
    # remove the file, then wait to make sure it's gone
    os.unlink(throttlefile)
    while True:
        if not os.path.exists(throttlefile):
            break
        time.sleep(0.1)