File: test_mtls.py

package info (click to toggle)
python-pycrowdsec 0.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 332 kB
  • sloc: python: 879; sh: 6; makefile: 4
file content (64 lines) | stat: -rw-r--r-- 2,059 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import pytest
from requests.exceptions import HTTPError
import json

from pycrowdsec.client import StreamDecisionClient


def test_tls_mutual(crowdsec, certs_dir):
    """TLS with two-way bouncer/lapi authentication"""

    lapi_env = {
        "CACERT_FILE": "/etc/ssl/crowdsec/ca.crt",
        "LAPI_CERT_FILE": "/etc/ssl/crowdsec/lapi.crt",
        "LAPI_KEY_FILE": "/etc/ssl/crowdsec/lapi.key",
        "USE_TLS": "true",
        "LOCAL_API_URL": "https://localhost:8080",
    }

    certs = certs_dir(lapi_hostname="lapi")

    volumes = {
        certs: {"bind": "/etc/ssl/crowdsec", "mode": "ro"},
    }

    with crowdsec(environment=lapi_env, volumes=volumes) as cs:
        cs.wait_for_log("*CrowdSec Local API listening*")
        # TODO: wait_for_https
        cs.wait_for_http(8080, "/health", want_status=None)

        port = cs.probe.get_bound_port("8080")
        lapi_url = f"https://localhost:{port}/"

        bouncer = StreamDecisionClient(
            "",
            lapi_url,
            key_path=(certs / "bouncer.key").as_posix(),
            cert_path=(certs / "bouncer.crt").as_posix(),
            ca_cert_path=(certs / "ca.crt").as_posix(),
            user_agent="bouncer_under_test",
        )

        bouncer.cycle("true")
        res = cs.cont.exec_run("cscli bouncers list -o json")
        assert res.exit_code == 0
        bouncers = json.loads(res.output)
        assert len(bouncers) == 1
        assert bouncers[0]["name"].startswith("@")
        assert bouncers[0]["auth_type"] == "tls"
        assert bouncers[0]["type"] == "bouncer_under_test"

        bouncer = StreamDecisionClient(
            "",
            lapi_url,
            key_path=(certs / "agent.key").as_posix(),
            cert_path=(certs / "agent.crt").as_posix(),
            ca_cert_path=(certs / "ca.crt").as_posix(),
        )

        with pytest.raises(HTTPError, match="403"):
            bouncer.cycle("true")

        cs.wait_for_log(
            "*client certificate OU (?agent-ou?) doesn't match expected OU (?bouncer-ou?)*"
        )