|
27 | 27 | make_sync_client, |
28 | 28 | parse_client_id, |
29 | 29 | parse_client_secret, |
| 30 | + parse_device_code_client_id, |
| 31 | + parse_device_code_client_secret, |
30 | 32 | parse_resource_metadata_url, |
31 | 33 | parse_use_id_token_as_bearer, |
32 | 34 | ) |
@@ -129,6 +131,10 @@ def authenticate(req: falcon.Request) -> AuthContext: |
129 | 131 | _METADATA, client_id="my-client-id", client_secret="my-client-secret" |
130 | 132 | ) |
131 | 133 | _METADATA_WITH_ID_TOKEN = dataclasses.replace(_METADATA, use_id_token_as_bearer=True) |
| 134 | +_METADATA_WITH_DEVICE_CODE_CLIENT_ID = dataclasses.replace(_METADATA, device_code_client_id="device-client-id") |
| 135 | +_METADATA_WITH_DEVICE_CODE_CLIENT_SECRET = dataclasses.replace( |
| 136 | + _METADATA, device_code_client_id="device-client-id", device_code_client_secret="device-client-secret" |
| 137 | +) |
132 | 138 |
|
133 | 139 |
|
134 | 140 | # --------------------------------------------------------------------------- |
@@ -195,6 +201,8 @@ def test_well_known_omits_default_fields(self) -> None: |
195 | 201 | assert "resource_name" not in d |
196 | 202 | assert "client_id" not in d |
197 | 203 | assert "client_secret" not in d |
| 204 | + assert "device_code_client_id" not in d |
| 205 | + assert "device_code_client_secret" not in d |
198 | 206 |
|
199 | 207 | def test_well_known_exempt_from_auth(self) -> None: |
200 | 208 | """Well-known endpoint is accessible even with auth enabled.""" |
@@ -544,6 +552,184 @@ def test_client_discovery_round_trip_with_use_id_token_as_bearer(self) -> None: |
544 | 552 | assert meta is not None |
545 | 553 | assert meta.use_id_token_as_bearer is True |
546 | 554 |
|
| 555 | + def test_device_code_client_id_rejects_unsafe_characters(self) -> None: |
| 556 | + """device_code_client_id with non-URL-safe characters raises ValueError.""" |
| 557 | + with pytest.raises(ValueError, match="URL-safe"): |
| 558 | + OAuthResourceMetadata( |
| 559 | + resource="https://example.com/vgi", |
| 560 | + authorization_servers=("https://auth.example.com",), |
| 561 | + device_code_client_id='bad"id', |
| 562 | + ) |
| 563 | + with pytest.raises(ValueError, match="URL-safe"): |
| 564 | + OAuthResourceMetadata( |
| 565 | + resource="https://example.com/vgi", |
| 566 | + authorization_servers=("https://auth.example.com",), |
| 567 | + device_code_client_id="has space", |
| 568 | + ) |
| 569 | + |
| 570 | + def test_device_code_client_id_in_well_known_json(self) -> None: |
| 571 | + """device_code_client_id appears in well-known JSON when set.""" |
| 572 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 573 | + client = make_sync_client( |
| 574 | + server, signing_key=b"k", oauth_resource_metadata=_METADATA_WITH_DEVICE_CODE_CLIENT_ID |
| 575 | + ) |
| 576 | + resp = client.get("/.well-known/oauth-protected-resource") |
| 577 | + body = json.loads(resp.content) |
| 578 | + assert body["device_code_client_id"] == "device-client-id" |
| 579 | + |
| 580 | + def test_device_code_client_id_in_www_authenticate(self) -> None: |
| 581 | + """device_code_client_id appears in WWW-Authenticate header when set.""" |
| 582 | + _priv, pub = _make_rsa_key() |
| 583 | + auth_fn = _make_local_auth(pub) |
| 584 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 585 | + client = make_sync_client( |
| 586 | + server, |
| 587 | + signing_key=b"k", |
| 588 | + authenticate=auth_fn, |
| 589 | + oauth_resource_metadata=_METADATA_WITH_DEVICE_CODE_CLIENT_ID, |
| 590 | + ) |
| 591 | + resp = client.post( |
| 592 | + "/vgi/echo", |
| 593 | + content=b"garbage", |
| 594 | + headers={"Content-Type": "application/octet-stream"}, |
| 595 | + ) |
| 596 | + assert resp.status_code == 401 |
| 597 | + www_auth = resp.headers.get("www-authenticate", "") |
| 598 | + assert 'device_code_client_id="device-client-id"' in www_auth |
| 599 | + |
| 600 | + def test_device_code_client_id_absent_from_www_authenticate(self) -> None: |
| 601 | + """device_code_client_id absent from WWW-Authenticate when not set.""" |
| 602 | + _priv, pub = _make_rsa_key() |
| 603 | + auth_fn = _make_local_auth(pub) |
| 604 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 605 | + client = make_sync_client( |
| 606 | + server, |
| 607 | + signing_key=b"k", |
| 608 | + authenticate=auth_fn, |
| 609 | + oauth_resource_metadata=_METADATA, |
| 610 | + ) |
| 611 | + resp = client.post( |
| 612 | + "/vgi/echo", |
| 613 | + content=b"garbage", |
| 614 | + headers={"Content-Type": "application/octet-stream"}, |
| 615 | + ) |
| 616 | + assert resp.status_code == 401 |
| 617 | + www_auth = resp.headers.get("www-authenticate", "") |
| 618 | + assert "device_code_client_id" not in www_auth |
| 619 | + |
| 620 | + def test_parse_device_code_client_id_extracts_value(self) -> None: |
| 621 | + """parse_device_code_client_id() extracts value from header.""" |
| 622 | + header = ( |
| 623 | + 'Bearer resource_metadata="https://example.com/.well-known/oauth-protected-resource/vgi"' |
| 624 | + ', device_code_client_id="my-device-app"' |
| 625 | + ) |
| 626 | + assert parse_device_code_client_id(header) == "my-device-app" |
| 627 | + |
| 628 | + def test_parse_device_code_client_id_returns_none_when_absent(self) -> None: |
| 629 | + """parse_device_code_client_id() returns None when not present.""" |
| 630 | + assert parse_device_code_client_id("Bearer") is None |
| 631 | + assert parse_device_code_client_id('Bearer resource_metadata="https://example.com"') is None |
| 632 | + assert parse_device_code_client_id("") is None |
| 633 | + |
| 634 | + def test_client_discovery_round_trip_with_device_code_client_id(self) -> None: |
| 635 | + """Client discovers device_code_client_id set on server.""" |
| 636 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 637 | + client = make_sync_client( |
| 638 | + server, signing_key=b"k", oauth_resource_metadata=_METADATA_WITH_DEVICE_CODE_CLIENT_ID |
| 639 | + ) |
| 640 | + meta = http_oauth_metadata(client=client) |
| 641 | + assert meta is not None |
| 642 | + assert meta.device_code_client_id == "device-client-id" |
| 643 | + |
| 644 | + def test_device_code_client_secret_rejects_unsafe_characters(self) -> None: |
| 645 | + """device_code_client_secret with non-URL-safe characters raises ValueError.""" |
| 646 | + with pytest.raises(ValueError, match="URL-safe"): |
| 647 | + OAuthResourceMetadata( |
| 648 | + resource="https://example.com/vgi", |
| 649 | + authorization_servers=("https://auth.example.com",), |
| 650 | + device_code_client_secret='bad"secret', |
| 651 | + ) |
| 652 | + with pytest.raises(ValueError, match="URL-safe"): |
| 653 | + OAuthResourceMetadata( |
| 654 | + resource="https://example.com/vgi", |
| 655 | + authorization_servers=("https://auth.example.com",), |
| 656 | + device_code_client_secret="has space", |
| 657 | + ) |
| 658 | + |
| 659 | + def test_device_code_client_secret_in_well_known_json(self) -> None: |
| 660 | + """device_code_client_secret appears in well-known JSON when set.""" |
| 661 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 662 | + client = make_sync_client( |
| 663 | + server, signing_key=b"k", oauth_resource_metadata=_METADATA_WITH_DEVICE_CODE_CLIENT_SECRET |
| 664 | + ) |
| 665 | + resp = client.get("/.well-known/oauth-protected-resource") |
| 666 | + body = json.loads(resp.content) |
| 667 | + assert body["device_code_client_secret"] == "device-client-secret" |
| 668 | + |
| 669 | + def test_device_code_client_secret_in_www_authenticate(self) -> None: |
| 670 | + """device_code_client_secret appears in WWW-Authenticate header when set.""" |
| 671 | + _priv, pub = _make_rsa_key() |
| 672 | + auth_fn = _make_local_auth(pub) |
| 673 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 674 | + client = make_sync_client( |
| 675 | + server, |
| 676 | + signing_key=b"k", |
| 677 | + authenticate=auth_fn, |
| 678 | + oauth_resource_metadata=_METADATA_WITH_DEVICE_CODE_CLIENT_SECRET, |
| 679 | + ) |
| 680 | + resp = client.post( |
| 681 | + "/vgi/echo", |
| 682 | + content=b"garbage", |
| 683 | + headers={"Content-Type": "application/octet-stream"}, |
| 684 | + ) |
| 685 | + assert resp.status_code == 401 |
| 686 | + www_auth = resp.headers.get("www-authenticate", "") |
| 687 | + assert 'device_code_client_secret="device-client-secret"' in www_auth |
| 688 | + |
| 689 | + def test_device_code_client_secret_absent_from_www_authenticate(self) -> None: |
| 690 | + """device_code_client_secret absent from WWW-Authenticate when not set.""" |
| 691 | + _priv, pub = _make_rsa_key() |
| 692 | + auth_fn = _make_local_auth(pub) |
| 693 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 694 | + client = make_sync_client( |
| 695 | + server, |
| 696 | + signing_key=b"k", |
| 697 | + authenticate=auth_fn, |
| 698 | + oauth_resource_metadata=_METADATA, |
| 699 | + ) |
| 700 | + resp = client.post( |
| 701 | + "/vgi/echo", |
| 702 | + content=b"garbage", |
| 703 | + headers={"Content-Type": "application/octet-stream"}, |
| 704 | + ) |
| 705 | + assert resp.status_code == 401 |
| 706 | + www_auth = resp.headers.get("www-authenticate", "") |
| 707 | + assert "device_code_client_secret" not in www_auth |
| 708 | + |
| 709 | + def test_parse_device_code_client_secret_extracts_value(self) -> None: |
| 710 | + """parse_device_code_client_secret() extracts value from header.""" |
| 711 | + header = ( |
| 712 | + 'Bearer resource_metadata="https://example.com/.well-known/oauth-protected-resource/vgi"' |
| 713 | + ', device_code_client_id="my-device-app", device_code_client_secret="my-device-secret"' |
| 714 | + ) |
| 715 | + assert parse_device_code_client_secret(header) == "my-device-secret" |
| 716 | + |
| 717 | + def test_parse_device_code_client_secret_returns_none_when_absent(self) -> None: |
| 718 | + """parse_device_code_client_secret() returns None when not present.""" |
| 719 | + assert parse_device_code_client_secret("Bearer") is None |
| 720 | + assert parse_device_code_client_secret('Bearer resource_metadata="https://example.com"') is None |
| 721 | + assert parse_device_code_client_secret("") is None |
| 722 | + |
| 723 | + def test_client_discovery_round_trip_with_device_code_client_secret(self) -> None: |
| 724 | + """Client discovers device_code_client_secret set on server.""" |
| 725 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 726 | + client = make_sync_client( |
| 727 | + server, signing_key=b"k", oauth_resource_metadata=_METADATA_WITH_DEVICE_CODE_CLIENT_SECRET |
| 728 | + ) |
| 729 | + meta = http_oauth_metadata(client=client) |
| 730 | + assert meta is not None |
| 731 | + assert meta.device_code_client_secret == "device-client-secret" |
| 732 | + |
547 | 733 | def test_401_discovery_flow(self) -> None: |
548 | 734 | """Full 401-based discovery: get 401, parse header, fetch metadata.""" |
549 | 735 | _priv, pub = _make_rsa_key() |
|
0 commit comments