1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
from __future__ import annotations
import abc
import collections
import logging
import os
import urllib
from typing import TYPE_CHECKING, Any, Optional
from consul.api.acl import ACL
from consul.api.agent import Agent
from consul.api.catalog import Catalog
from consul.api.connect import Connect
from consul.api.coordinates import Coordinate
from consul.api.event import Event
from consul.api.health import Health
from consul.api.kv import KV
from consul.api.operator import Operator
from consul.api.query import Query
from consul.api.session import Session
from consul.api.status import Status
from consul.api.txn import Txn
from consul.exceptions import ConsulException
if TYPE_CHECKING:
from types import TracebackType
log = logging.getLogger(__name__)
#
# Convenience to define checks
Response = collections.namedtuple("Response", ["code", "headers", "body"])
class HTTPClient(metaclass=abc.ABCMeta):
def __init__(
self, host: str = "127.0.0.1", port: int = 8500, scheme: str = "http", verify: bool = True, cert=None
) -> None:
self.host = host
self.port = port
self.scheme = scheme
self.verify = verify
self.base_uri = f"{self.scheme}://{self.host}:{self.port}"
self.cert = cert
def uri(self, path: str, params: list[tuple[str, Any]] | None = None):
uri = self.base_uri + urllib.parse.quote(path, safe="/:")
if params:
uri = f"{uri}?{urllib.parse.urlencode(params)}"
return uri
@abc.abstractmethod
def get(self, callback, path, params=None, headers: Optional[dict[str, str]] = None):
raise NotImplementedError
@abc.abstractmethod
def put(self, callback, path, params=None, data: str = "", headers: Optional[dict[str, str]] = None):
raise NotImplementedError
@abc.abstractmethod
def delete(self, callback, path, params=None, headers: Optional[dict[str, str]] = None):
raise NotImplementedError
@abc.abstractmethod
def post(self, callback, path, params=None, data: str = "", headers: Optional[dict[str, str]] = None):
raise NotImplementedError
@abc.abstractmethod
def close(self):
raise NotImplementedError
class Consul:
def __init__(
self,
host: str | None = None,
port: int | None = None,
token: str | None = None,
scheme: str | None = None,
consistency: str = "default",
dc=None,
verify: bool | None = None,
cert=None,
) -> None:
"""
*token* is an optional `ACL token`_. If supplied it will be used by
default for all requests made with this client session. It's still
possible to override this token by passing a token explicitly for a
request.
*consistency* sets the consistency mode to use by default for all reads
that support the consistency option. It's still possible to override
this by passing explicitly for a given request. *consistency* can be
either 'default', 'consistent' or 'stale'.
*dc* is the datacenter that this agent will communicate with.
By default the datacenter of the host is used.
*verify* is whether to verify the SSL certificate for HTTPS requests
*cert* client side certificates for HTTPS requests
"""
# TODO: Status
if os.getenv("CONSUL_HTTP_ADDR") and not (host or port):
try:
host, port = os.getenv("CONSUL_HTTP_ADDR").split(":") # type: ignore
except ValueError as err:
raise ConsulException(
f"CONSUL_HTTP_ADDR ({os.getenv('CONSUL_HTTP_ADDR')}) invalid, does not match <host>:<port>"
) from err
if not host:
host = "127.0.0.1"
if not port:
port = 8500
if scheme is None:
use_ssl = os.getenv("CONSUL_HTTP_SSL")
scheme = ("https" if use_ssl.lower() == "true" else "http") if use_ssl else "http"
if verify is None:
ssl_verify = os.getenv("CONSUL_HTTP_SSL_VERIFY")
verify = ssl_verify.lower() == "true" if ssl_verify else True
self.http = self.http_connect(host, port, scheme, verify, cert)
self.token = os.getenv("CONSUL_HTTP_TOKEN", token)
self.scheme = scheme
self.dc = dc
assert consistency in (
"default",
"consistent",
"stale",
), "consistency must be either default, consistent or state"
self.consistency = consistency
self.event = Event(self)
self.kv = KV(self)
self.txn = Txn(self)
self.agent = Agent(self)
self.catalog = Catalog(self)
self.health = Health(self)
self.session = Session(self)
self.acl = ACL(self)
self.status = Status(self)
self.query = Query(self)
self.coordinate = Coordinate(self)
self.operator = Operator(self)
self.connect = Connect(self)
def __enter__(self):
return self
async def __aenter__(self):
return self
def __exit__(
self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self.http.close()
async def __aexit__(
self, exc_type: Optional[type[BaseException]], exc: Optional[BaseException], tb: Optional[TracebackType]
) -> None:
await self.http.close()
@abc.abstractmethod
def http_connect(self, host: str, port: int, scheme, verify: bool = True, cert=None):
pass
def prepare_headers(self, token: Optional[str] = None) -> dict[str, str]:
headers = {}
if token or self.token:
headers["X-Consul-Token"] = token or self.token
return headers # type: ignore
|