File: test_helpers_config.py

package info (click to toggle)
python-es-client 8.17.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 520 kB
  • sloc: python: 2,452; sh: 239; makefile: 17
file content (43 lines) | stat: -rw-r--r-- 1,342 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""Test helpers.config"""

from unittest import TestCase
import pytest
from dotmap import DotMap  # type: ignore
from elasticsearch8 import Elasticsearch
from es_client.defaults import ES_DEFAULT
from es_client.exceptions import ESClientException
from es_client.helpers import config
from . import CACRT, HOST, PASS, USER

CONFIG = {
    "elasticsearch": {
        "other_settings": {"username": USER, "password": PASS},
        "client": {"hosts": HOST, "ca_certs": CACRT},
    }
}


class TestGetClient(TestCase):
    """Test get_client functionality"""

    def test_basic_operation(self):
        """Ensure basic operation"""
        assert isinstance(config.get_client(configdict=CONFIG), Elasticsearch)

    def test_raises_when_no_connection(self):
        """
        Ensures that an exception is raised when it cannot connect to Elasticsearch
        """
        client_args = DotMap()
        other_args = DotMap()
        client_args.update(DotMap(ES_DEFAULT))
        client_args.hosts = ["http://127.0.0.123:12345"]
        client_args.request_timeout = 0.1
        cnf = {
            "elasticsearch": {
                "client": client_args.toDict(),
                "other_settings": other_args.toDict(),
            }
        }
        with pytest.raises(ESClientException):
            _ = config.get_client(configdict=cnf)