1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
|
from __future__ import annotations
import json
from typing import Any, Optional
from consul import Check
from consul.callback import CB
class Agent:
"""
The Agent endpoints are used to interact with a local Consul agent.
Usually, services and checks are registered with an agent, which then
takes on the burden of registering with the Catalog and performing
anti-entropy to recover from outages.
"""
def __init__(self, agent) -> None:
self.agent = agent
self.service = Agent.Service(agent)
self.check = Agent.Check(agent)
self.connect = Agent.Connect(agent)
def self(self):
"""
Returns configuration of the local agent and member information.
"""
return self.agent.http.get(CB.json(), "/v1/agent/self")
def services(self) -> Any:
"""
Returns all the services that are registered with the local agent.
These services were either provided through configuration files, or
added dynamically using the HTTP API. It is important to note that
the services known by the agent may be different than those
reported by the Catalog. This is usually due to changes being made
while there is no leader elected. The agent performs active
anti-entropy, so in most situations everything will be in sync
within a few seconds.
"""
return self.agent.http.get(CB.json(), "/v1/agent/services")
def service_definition(self, service_id):
"""
Returns a service definition for a single instance that is registered
with the local agent.
"""
return self.agent.http.get(CB.json(), f"/v1/agent/service/{service_id}")
def checks(self) -> Any:
"""
Returns all the checks that are registered with the local agent.
These checks were either provided through configuration files, or
added dynamically using the HTTP API. Similar to services,
the checks known by the agent may be different than those
reported by the Catalog. This is usually due to changes being made
while there is no leader elected. The agent performs active
anti-entropy, so in most situations everything will be in sync
within a few seconds.
"""
return self.agent.http.get(CB.json(), "/v1/agent/checks")
def members(self, wan: bool = False):
"""
Returns all the members that this agent currently sees. This may
vary by agent, use the nodes api of Catalog to retrieve a cluster
wide consistent view of members.
For agents running in server mode, setting *wan* to *True* returns
the list of WAN members instead of the LAN members which is
default.
"""
params = []
if wan:
params.append(("wan", 1))
return self.agent.http.get(CB.json(), "/v1/agent/members", params=params)
def maintenance(self, enable: bool, reason: Optional[str] = None, token: str | None = None):
"""
The node maintenance endpoint can place the agent into
"maintenance mode".
*enable* is either 'true' or 'false'. 'true' enables maintenance
mode, 'false' disables maintenance mode.
*reason* is an optional string. This is simply to aid human
operators.
"""
params: list[tuple[str, Any]] = []
params.append(("enable", enable))
if reason:
params.append(("reason", reason))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), "/v1/agent/maintenance", params=params, headers=headers)
def join(self, address: str, wan: bool = False, token: str | None = None):
"""
This endpoint instructs the agent to attempt to connect to a
given address.
*address* is the ip to connect to.
*wan* is either 'true' or 'false'. For agents running in server
mode, 'true' causes the agent to attempt to join using the WAN
pool. Default is 'false'.
"""
params = []
if wan:
params.append(("wan", 1))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/join/{address}", params=params, headers=headers)
def force_leave(self, node: str, token: str | None = None):
"""
This endpoint instructs the agent to force a node into the left
state. If a node fails unexpectedly, then it will be in a failed
state. Once in the failed state, Consul will attempt to reconnect,
and the services and checks belonging to that node will not be
cleaned up. Forcing a node into the left state allows its old
entries to be removed.
*node* is the node to change state for.
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/force-leave/{node}", headers=headers)
class Service:
def __init__(self, agent) -> None:
self.agent = agent
def register(
self,
name: str,
service_id=None,
address=None,
port: Optional[int] = None,
tags=None,
check=None,
token: str | None = None,
meta=None,
weights=None,
# *deprecated* use check parameter
script=None,
interval=None,
ttl: Optional[int] = None,
http=None,
timeout=None,
enable_tag_override: bool = False,
extra_checks=None,
replace_existing_checks=False,
):
"""
Add a new service to the local agent. There is more
documentation on services
`here <http://www.consul.io/docs/agent/services.html>`_.
*name* is the name of the service.
If the optional *service_id* is not provided it is set to
*name*. You cannot have duplicate *service_id* entries per
agent, so it may be necessary to provide one.
*address* will default to the address of the agent if not
provided.
An optional health *check* can be created for this service is
one of `Check.script`_, `Check.http`_, `Check.tcp`_,
`Check.ttl`_ or `Check.docker`_.
*token* is an optional `ACL token`_ to apply to this request.
Note this call will return successful even if the token doesn't
have permissions to register this service.
*meta* specifies arbitrary KV metadata linked to the service
formatted as {k1:v1, k2:v2}.
*weights* specifies weights for the service; default to
{"Passing": 1, "Warning": 1}.
*script*, *interval*, *ttl*, *http*, and *timeout* arguments
are deprecated. use *check* instead.
*replace_existing_checks* Missing health checks from the request will
be deleted from the agent.
Using this parameter allows to idempotently register a service and its
checks without having to manually deregister checks.
*enable_tag_override* is an optional bool that enable you
to modify a service tags from servers(consul agent role server)
Default is set to False.
This option is only for >=v0.6.0 version on both agent and
servers.
for more information
https://www.consul.io/docs/agent/services.html
"""
if extra_checks is None:
extra_checks = []
payload: dict[str, Any] = {}
payload["name"] = name
if enable_tag_override:
payload["enabletagoverride"] = enable_tag_override
if service_id:
payload["id"] = service_id
if address:
payload["address"] = address
if port:
payload["port"] = port
if tags:
payload["tags"] = tags
if meta:
payload["meta"] = meta
if check:
payload["checks"] = [check] + extra_checks
if weights:
payload["weights"] = weights
else:
payload.update(
Check._compat( # pylint: disable=protected-access
script=script, interval=interval, ttl=ttl, http=http, timeout=timeout
)
)
params = []
if replace_existing_checks:
params.append(("replace-existing-checks", "true"))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.boolean(), "/v1/agent/service/register", params=params, headers=headers, data=json.dumps(payload)
)
def deregister(self, service_id: str, token: str | None = None):
"""
Used to remove a service from the local agent. The agent will
take care of deregistering the service with the Catalog. If
there is an associated check, that is also deregistered.
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/service/deregister/{service_id}", headers=headers)
def maintenance(self, service_id: str, enable: bool, reason: Optional[str] = None, token: str | None = None):
"""
The service maintenance endpoint allows placing a given service
into "maintenance mode".
*service_id* is the id of the service that is to be targeted
for maintenance.
*enable* is either 'true' or 'false'. 'true' enables
maintenance mode, 'false' disables maintenance mode.
*reason* is an optional string. This is simply to aid human
operators.
"""
params: list[tuple[str, Any]] = []
params.append(("enable", enable))
if reason:
params.append(("reason", reason))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.boolean(), f"/v1/agent/service/maintenance/{service_id}", params=params, headers=headers
)
class Check:
def __init__(self, agent) -> None:
self.agent = agent
def register(
self,
name: str,
check=None,
check_id=None,
notes=None,
service_id=None,
token: str | None = None,
# *deprecated* use check parameter
script=None,
interval=None,
ttl: Optional[int] = None,
http=None,
timeout=None,
):
"""
Register a new check with the local agent. More documentation
on checks can be found `here
<http://www.consul.io/docs/agent/checks.html>`_.
*name* is the name of the check.
*check* is one of `Check.script`_, `Check.http`_, `Check.tcp`_
`Check.ttl`_ or `Check.docker`_ and is required.
If the optional *check_id* is not provided it is set to *name*.
*check_id* must be unique for this agent.
*notes* is not used by Consul, and is meant to be human
readable.
Optionally, a *service_id* can be specified to associate a
registered check with an existing service.
*token* is an optional `ACL token`_ to apply to this request.
Note this call will return successful even if the token doesn't
have permissions to register this check.
*script*, *interval*, *ttl*, *http*, and *timeout* arguments
are deprecated. use *check* instead.
Returns *True* on success.
"""
payload = {"name": name}
assert check or script or ttl or http, "check is required"
if check:
payload.update(check)
else:
payload.update(
Check._compat(script=script, interval=interval, ttl=ttl, http=http, timeout=timeout)["check"]
)
if check_id:
payload["id"] = check_id
if notes:
payload["notes"] = notes
if service_id:
payload["serviceid"] = service_id
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.boolean(), "/v1/agent/check/register", headers=headers, data=json.dumps(payload)
)
def deregister(self, check_id: str, token: str | None = None):
"""
Remove a check from the local agent.
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/deregister/{check_id}", headers=headers)
def ttl_pass(self, check_id: str, notes=None, token: str | None = None):
"""
Mark a ttl based check as passing. Optional notes can be
attached to describe the status of the check.
"""
params = []
if notes:
params.append(("note", notes))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/pass/{check_id}", params=params, headers=headers)
def ttl_fail(self, check_id: str, notes=None, token: str | None = None):
"""
Mark a ttl based check as failing. Optional notes can be
attached to describe why check is failing. The status of the
check will be set to critical and the ttl clock will be reset.
"""
params = []
if notes:
params.append(("note", notes))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/fail/{check_id}", params=params, headers=headers)
def ttl_warn(self, check_id: str, notes=None, token: str | None = None):
"""
Mark a ttl based check with warning. Optional notes can be
attached to describe the warning. The status of the
check will be set to warn and the ttl clock will be reset.
"""
params = []
if notes:
params.append(("note", notes))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/warn/{check_id}", params=params, headers=headers)
class Connect:
def __init__(self, agent) -> None:
self.agent = agent
self.ca = Agent.Connect.CA(agent)
def authorize(self, target, client_cert_uri, client_cert_serial, token: str | None = None):
"""
Tests whether a connection attempt is authorized between
two services.
More information is available
`here <https://www.consul.io/api-docs/agent/connect>`_.
*target* is the name of the service that is being requested.
*client_cert_uri* The unique identifier for the requesting
client.
*client_cert_serial* The colon-hex-encoded serial number for
the requesting client cert.
"""
payload = {"Target": target, "ClientCertURI": client_cert_uri, "ClientCertSerial": client_cert_serial}
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.json(), "/v1/agent/connect/authorize", headers=headers, data=json.dumps(payload)
)
class CA:
def __init__(self, agent) -> None:
self.agent = agent
def roots(self):
return self.agent.http.get(CB.json(), "/v1/agent/connect/ca/roots")
def leaf(self, service, token: str | None = None):
headers = self.agent.prepare_headers(token)
return self.agent.http.get(CB.json(), f"/v1/agent/connect/ca/leaf/{service}", headers=headers)
|