1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
|
"""
Integration tests for authentication.
Unit tests for auth classes also exist in tests/test_auth.py
"""
import hashlib
import netrc
import os
import sys
import threading
import typing
from urllib.request import parse_keqv_list
import anyio
import pytest
import httpx
from ..common import FIXTURES_DIR
class App:
"""
A mock app to test auth credentials.
"""
def __init__(self, auth_header: str = "", status_code: int = 200) -> None:
self.auth_header = auth_header
self.status_code = status_code
def __call__(self, request: httpx.Request) -> httpx.Response:
headers = {"www-authenticate": self.auth_header} if self.auth_header else {}
data = {"auth": request.headers.get("Authorization")}
return httpx.Response(self.status_code, headers=headers, json=data)
class DigestApp:
def __init__(
self,
algorithm: str = "SHA-256",
send_response_after_attempt: int = 1,
qop: str = "auth",
regenerate_nonce: bool = True,
) -> None:
self.algorithm = algorithm
self.send_response_after_attempt = send_response_after_attempt
self.qop = qop
self._regenerate_nonce = regenerate_nonce
self._response_count = 0
def __call__(self, request: httpx.Request) -> httpx.Response:
if self._response_count < self.send_response_after_attempt:
return self.challenge_send(request)
data = {"auth": request.headers.get("Authorization")}
return httpx.Response(200, json=data)
def challenge_send(self, request: httpx.Request) -> httpx.Response:
self._response_count += 1
nonce = (
hashlib.sha256(os.urandom(8)).hexdigest()
if self._regenerate_nonce
else "ee96edced2a0b43e4869e96ebe27563f369c1205a049d06419bb51d8aeddf3d3"
)
challenge_data = {
"nonce": nonce,
"qop": self.qop,
"opaque": (
"ee6378f3ee14ebfd2fff54b70a91a7c9390518047f242ab2271380db0e14bda1"
),
"algorithm": self.algorithm,
"stale": "FALSE",
}
challenge_str = ", ".join(
'{}="{}"'.format(key, value)
for key, value in challenge_data.items()
if value
)
headers = {
"www-authenticate": f'Digest realm="httpx@example.org", {challenge_str}',
}
return httpx.Response(401, headers=headers)
class RepeatAuth(httpx.Auth):
"""
A mock authentication scheme that requires clients to send
the request a fixed number of times, and then send a last request containing
an aggregation of nonces that the server sent in 'WWW-Authenticate' headers
of intermediate responses.
"""
requires_request_body = True
def __init__(self, repeat: int) -> None:
self.repeat = repeat
def auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
nonces = []
for index in range(self.repeat):
request.headers["Authorization"] = f"Repeat {index}"
response = yield request
nonces.append(response.headers["www-authenticate"])
key = ".".join(nonces)
request.headers["Authorization"] = f"Repeat {key}"
yield request
class ResponseBodyAuth(httpx.Auth):
"""
A mock authentication scheme that requires clients to send an 'Authorization'
header, then send back the contents of the response in the 'Authorization'
header.
"""
requires_response_body = True
def __init__(self, token: str) -> None:
self.token = token
def auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
request.headers["Authorization"] = self.token
response = yield request
data = response.text
request.headers["Authorization"] = data
yield request
class SyncOrAsyncAuth(httpx.Auth):
"""
A mock authentication scheme that uses a different implementation for the
sync and async cases.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._async_lock = anyio.Lock()
def sync_auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
with self._lock:
request.headers["Authorization"] = "sync-auth"
yield request
async def async_auth_flow(
self, request: httpx.Request
) -> typing.AsyncGenerator[httpx.Request, httpx.Response]:
async with self._async_lock:
request.headers["Authorization"] = "async-auth"
yield request
@pytest.mark.anyio
async def test_basic_auth() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_basic_auth_with_stream() -> None:
"""
See: https://github.com/encode/httpx/pull/1312
"""
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), auth=auth
) as client:
async with client.stream("GET", url) as response:
await response.aread()
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_basic_auth_in_url() -> None:
url = "https://user:password123@example.org/"
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_basic_auth_on_session() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), auth=auth
) as client:
response = await client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_custom_auth() -> None:
url = "https://example.org/"
app = App()
def auth(request: httpx.Request) -> httpx.Request:
request.headers["Authorization"] = "Token 123"
return request
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Token 123"}
def test_netrc_auth_credentials_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
in the netrc file, then the relevant credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://netrcexample.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"auth": "Basic ZXhhbXBsZS11c2VybmFtZTpleGFtcGxlLXBhc3N3b3Jk"
}
def test_netrc_auth_credentials_do_not_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
not in the netrc file, then no credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://example.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": None}
@pytest.mark.skipif(
sys.version_info >= (3, 11),
reason="netrc files without a password are valid from Python >= 3.11",
)
def test_netrc_auth_nopassword_parse_error() -> None: # pragma: no cover
"""
Python has different netrc parsing behaviours with different versions.
For Python < 3.11 a netrc file with no password is invalid. In this case
we want to allow the parse error to be raised.
"""
netrc_file = str(FIXTURES_DIR / ".netrc-nopassword")
with pytest.raises(netrc.NetrcParseError):
httpx.NetRCAuth(netrc_file)
@pytest.mark.anyio
async def test_auth_disable_per_request() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), auth=auth
) as client:
response = await client.get(url, auth=None)
assert response.status_code == 200
assert response.json() == {"auth": None}
def test_auth_hidden_url() -> None:
url = "http://example-username:example-password@example.org/"
expected = "URL('http://example-username:[secure]@example.org/')"
assert url == httpx.URL(url)
assert expected == repr(httpx.URL(url))
@pytest.mark.anyio
async def test_auth_hidden_header() -> None:
url = "https://example.org/"
auth = ("example-username", "example-password")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert "'authorization': '[secure]'" in str(response.request.headers)
@pytest.mark.anyio
async def test_auth_property() -> None:
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
assert client.auth is None
client.auth = ("user", "password123") # type: ignore
assert isinstance(client.auth, httpx.BasicAuth)
url = "https://example.org/"
response = await client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_auth_invalid_type() -> None:
app = App()
with pytest.raises(TypeError):
client = httpx.AsyncClient(
transport=httpx.MockTransport(app),
auth="not a tuple, not a callable", # type: ignore
)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(TypeError):
await client.get(auth="not a tuple, not a callable") # type: ignore
with pytest.raises(TypeError):
client.auth = "not a tuple, not a callable" # type: ignore
@pytest.mark.anyio
async def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": None}
assert len(response.history) == 0
def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
auth_header = "Token ..."
app = App(auth_header=auth_header, status_code=401)
client = httpx.Client(transport=httpx.MockTransport(app))
response = client.get(url, auth=auth)
assert response.status_code == 401
assert response.json() == {"auth": None}
assert len(response.history) == 0
@pytest.mark.anyio
async def test_digest_auth_200_response_including_digest_auth_header() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
auth_header = 'Digest realm="realm@host.com",qop="auth",nonce="abc",opaque="xyz"'
app = App(auth_header=auth_header, status_code=200)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": None}
assert len(response.history) == 0
@pytest.mark.anyio
async def test_digest_auth_401_response_without_digest_auth_header() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header="", status_code=401)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 401
assert response.json() == {"auth": None}
assert len(response.history) == 0
@pytest.mark.parametrize(
"algorithm,expected_hash_length,expected_response_length",
[
("MD5", 64, 32),
("MD5-SESS", 64, 32),
("SHA", 64, 40),
("SHA-SESS", 64, 40),
("SHA-256", 64, 64),
("SHA-256-SESS", 64, 64),
("SHA-512", 64, 128),
("SHA-512-SESS", 64, 128),
],
)
@pytest.mark.anyio
async def test_digest_auth(
algorithm: str, expected_hash_length: int, expected_response_length: int
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(algorithm=algorithm)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)
assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert "nonce" in digest_data
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes
assert len(digest_data["opaque"]) == expected_hash_length + 2
assert digest_data["algorithm"] == algorithm
assert digest_data["qop"] == "auth"
assert digest_data["nc"] == "00000001"
assert len(digest_data["cnonce"]) == 16 + 2
@pytest.mark.anyio
async def test_digest_auth_no_specified_qop() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)
assert "qop" not in digest_data
assert "nc" not in digest_data
assert "cnonce" not in digest_data
assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert len(digest_data["nonce"]) == 64 + 2 # extra quotes
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == 64 + 2
assert len(digest_data["opaque"]) == 64 + 2
assert digest_data["algorithm"] == "SHA-256"
@pytest.mark.parametrize("qop", ("auth, auth-int", "auth,auth-int", "unknown,auth"))
@pytest.mark.anyio
async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop=qop)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
@pytest.mark.anyio
async def test_digest_auth_qop_auth_int_not_implemented() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="auth-int")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(NotImplementedError):
await client.get(url, auth=auth)
@pytest.mark.anyio
async def test_digest_auth_qop_must_be_auth_or_auth_int() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="not-auth")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
await client.get(url, auth=auth)
@pytest.mark.anyio
async def test_digest_auth_incorrect_credentials() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(send_response_after_attempt=2)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 401
assert len(response.history) == 1
@pytest.mark.anyio
async def test_digest_auth_reuses_challenge() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response_1 = await client.get(url, auth=auth)
response_2 = await client.get(url, auth=auth)
assert response_1.status_code == 200
assert response_2.status_code == 200
assert len(response_1.history) == 1
assert len(response_2.history) == 0
@pytest.mark.anyio
async def test_digest_auth_resets_nonce_count_after_401() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response_1 = await client.get(url, auth=auth)
assert response_1.status_code == 200
assert len(response_1.history) == 1
first_nonce = parse_keqv_list(
response_1.request.headers["Authorization"].split(", ")
)["nonce"]
first_nc = parse_keqv_list(
response_1.request.headers["Authorization"].split(", ")
)["nc"]
# with this we now force a 401 on a subsequent (but initial) request
app.send_response_after_attempt = 2
# we expect the client again to try to authenticate,
# i.e. the history length must be 1
response_2 = await client.get(url, auth=auth)
assert response_2.status_code == 200
assert len(response_2.history) == 1
second_nonce = parse_keqv_list(
response_2.request.headers["Authorization"].split(", ")
)["nonce"]
second_nc = parse_keqv_list(
response_2.request.headers["Authorization"].split(", ")
)["nc"]
assert first_nonce != second_nonce # ensures that the auth challenge was reset
assert (
first_nc == second_nc
) # ensures the nonce count is reset when the authentication failed
@pytest.mark.parametrize(
"auth_header",
[
'Digest realm="httpx@example.org", qop="auth"', # missing fields
'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
],
)
@pytest.mark.anyio
async def test_async_digest_auth_raises_protocol_error_on_malformed_header(
auth_header: str,
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header=auth_header, status_code=401)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
await client.get(url, auth=auth)
@pytest.mark.parametrize(
"auth_header",
[
'Digest realm="httpx@example.org", qop="auth"', # missing fields
'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
],
)
def test_sync_digest_auth_raises_protocol_error_on_malformed_header(
auth_header: str,
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header=auth_header, status_code=401)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
client.get(url, auth=auth)
@pytest.mark.anyio
async def test_async_auth_history() -> None:
"""
Test that intermediate requests sent as part of an authentication flow
are recorded in the response history.
"""
url = "https://example.org/"
auth = RepeatAuth(repeat=2)
app = App(auth_header="abc")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Repeat abc.abc"}
assert len(response.history) == 2
resp1, resp2 = response.history
assert resp1.json() == {"auth": "Repeat 0"}
assert resp2.json() == {"auth": "Repeat 1"}
assert len(resp2.history) == 1
assert resp2.history == [resp1]
assert len(resp1.history) == 0
def test_sync_auth_history() -> None:
"""
Test that intermediate requests sent as part of an authentication flow
are recorded in the response history.
"""
url = "https://example.org/"
auth = RepeatAuth(repeat=2)
app = App(auth_header="abc")
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Repeat abc.abc"}
assert len(response.history) == 2
resp1, resp2 = response.history
assert resp1.json() == {"auth": "Repeat 0"}
assert resp2.json() == {"auth": "Repeat 1"}
assert len(resp2.history) == 1
assert resp2.history == [resp1]
assert len(resp1.history) == 0
class ConsumeBodyTransport(httpx.MockTransport):
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
assert isinstance(request.stream, httpx.AsyncByteStream)
[_ async for _ in request.stream]
return self.handler(request) # type: ignore[return-value]
@pytest.mark.anyio
async def test_digest_auth_unavailable_streaming_body():
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
async def streaming_body() -> typing.AsyncIterator[bytes]:
yield b"Example request body" # pragma: no cover
async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client:
with pytest.raises(httpx.StreamConsumed):
await client.post(url, content=streaming_body(), auth=auth)
@pytest.mark.anyio
async def test_async_auth_reads_response_body() -> None:
"""
Test that we can read the response body in an auth flow if `requires_response_body`
is set.
"""
url = "https://example.org/"
auth = ResponseBodyAuth("xyz")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": '{"auth":"xyz"}'}
def test_sync_auth_reads_response_body() -> None:
"""
Test that we can read the response body in an auth flow if `requires_response_body`
is set.
"""
url = "https://example.org/"
auth = ResponseBodyAuth("xyz")
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": '{"auth":"xyz"}'}
@pytest.mark.anyio
async def test_async_auth() -> None:
"""
Test that we can use an auth implementation specific to the async case, to
support cases that require performing I/O or using concurrency primitives (such
as checking a disk-based cache or fetching a token from a remote auth server).
"""
url = "https://example.org/"
auth = SyncOrAsyncAuth()
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "async-auth"}
def test_sync_auth() -> None:
"""
Test that we can use an auth implementation specific to the sync case.
"""
url = "https://example.org/"
auth = SyncOrAsyncAuth()
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "sync-auth"}
|