1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later
import os
import pytest
import socket
from time import sleep
from subprocess import Popen
from tango import DeviceProxy, DevState, DevFailed
# Helpers
def start_database(port, inst):
python_ver = os.environ.get('PYTHON', '/usr/bin/python')
tests_directory = os.path.abspath("tests")
cmd = f"{python_ver} -m tango.databaseds.database --port={port} --logging_level=2 {inst}"
proc = Popen(cmd.split(), cwd=tests_directory)
sleep(1)
return proc
def get_device_proxy(name, retries=400, delay=0.03):
count = 0
while count < retries:
try:
proxy = DeviceProxy(name)
return proxy
except DevFailed as exc:
last_error = str(exc)
sleep(delay)
count += 1
raise RuntimeError(
f"Database proxy did not start up within {count * delay:.1f} sec!\n"
f"Last error: {last_error}."
)
def get_open_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port
@pytest.fixture()
def tango_database_test():
port = get_open_port()
inst = 2
proc = start_database(port, inst)
proxy = get_device_proxy(f"tango://127.0.0.1:{port}/sys/database/2")
yield proxy
proc.terminate()
# Tests
@pytest.mark.skip(reason="This test is failing when multiple python versions installed and it is temporarily disabled.")
def test_ping(tango_database_test):
duration = tango_database_test.ping(wait=True)
assert isinstance(duration, int)
@pytest.mark.skip(reason="This test is failing when multiple python versions installed and it is temporarily disabled.")
def test_status(tango_database_test):
assert tango_database_test.status() == "The device is in ON state."
@pytest.mark.skip(reason="This test is failing when multiple python versions installed and it is temporarily disabled.")
def test_state(tango_database_test):
assert tango_database_test.state() == DevState.ON
@pytest.mark.skip(reason="This test is failing when multiple python versions installed and it is temporarily disabled.")
def test_device_property(tango_database_test):
test_property_name = "test property"
test_property_value = "test property text"
tango_database_test.put_property({test_property_name: test_property_value})
return_property_list = tango_database_test.get_property_list("*")
assert len(return_property_list) == 1
assert return_property_list[0] == test_property_name
return_property = tango_database_test.get_property("test property")
assert return_property[test_property_name][0] == test_property_value
@pytest.mark.skip(reason="This test is failing when multiple python versions installed and it is temporarily disabled.")
def test_info(tango_database_test):
info = tango_database_test.info()
assert info.dev_class == "DataBase"
assert info.doc_url == "Doc URL = http://www.tango-controls.org"
assert info.server_id == "DataBaseds/2"
|