File: test_cfe_db.py

package info (click to toggle)
cfengine3 3.24.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 37,552 kB
  • sloc: ansic: 163,161; sh: 10,296; python: 2,950; makefile: 1,744; lex: 784; yacc: 633; perl: 211; pascal: 157; xml: 21; sed: 13
file content (76 lines) | stat: -rw-r--r-- 2,636 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import ctypes

# Based on the dbid enum from libpromises/dbm_api.h
dbs = (
    "classes",   # Deprecated
    "variables", # Deprecated
    "performance",
    "checksums", # Deprecated
    "filestats", # Deprecated
    "changes",
    "observations",
    "state",
    "lastseen",
    "audit",
    "locks",
    "history",
    "measure",
    "static",
    "scalars",
    "windows_registry",
    "cache",
    "license",
    "value",
    "agent_execution",
    "bundles",   # Deprecated
    "packages_installed", # new package promise installed packages list
    "packages_updates",   # new package promise list of available updates
    "cookies", # Enterprise reporting cookies for duplicate host detection
)

def get_db_id(db_name):
    return dbs.index(db_name)


class _SimulationStruct(ctypes.Structure):
    pass

class _FilamentStruct(ctypes.Structure):
    pass

_promises = ctypes.CDLL("libpromises.so.3.0.6")

_promises.SimulateDBLoad.restype = ctypes.POINTER(_SimulationStruct)
_promises.SimulateDBLoad.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_long, ctypes.c_long,
                                     ctypes.c_int, ctypes.c_int, ctypes.c_long, ctypes.c_long,
                                     ctypes.c_long, ctypes.c_long]
def SimulateDBLoad(db_id,
                   read_keys_refresh_s=5,
                   read_min_interval_ms=100,
                   read_max_interval_ms=200,
                   write_sample_size_pct=50,
                   write_prune_interval_s=10,
                   write_min_interval_ms=200,
                   write_max_interval_ms=400,
                   iter_min_interval_ms=1000,
                   iter_max_interval_ms=2000):
    return _promises.SimulateDBLoad(db_id,
                                    read_keys_refresh_s, read_min_interval_ms, read_max_interval_ms,
                                    write_sample_size_pct,
                                    write_prune_interval_s, write_min_interval_ms, write_max_interval_ms,
                                    iter_min_interval_ms, iter_max_interval_ms)

_promises.StopSimulation.restype = None # void
_promises.StopSimulation.argtypes = [ctypes.POINTER(_SimulationStruct)]
def StopSimulation(simulation):
    _promises.StopSimulation(simulation)

_promises.FillUpDB.restype = ctypes.POINTER(_FilamentStruct)
_promises.FillUpDB.argtypes = [ctypes.c_int, ctypes.c_int]
def FillUpDB(db_id, usage):
    return _promises.FillUpDB(db_id, usage)

_promises.RemoveFilament.restype = None # void
_promises.RemoveFilament.argtypes = [ctypes.POINTER(_FilamentStruct)]
def RemoveFilament(filament):
    _promises.RemoveFilament(filament)