File: oauth_schema_registry.py

package info (click to toggle)
python-confluent-kafka 2.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,660 kB
  • sloc: python: 30,428; ansic: 9,487; sh: 1,477; makefile: 192
file content (68 lines) | stat: -rw-r--r-- 2,876 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2025 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Examples of setting up Schema Registry with OAuth with static token,
# Client Credentials, and custom functions


# CUSTOM OAuth configuration takes in a custom function, config for that
# function, and returns the fields shown below. All fields must be returned
# for OAuth authentication to work


from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient


def main():
    static_oauth_config = {'url': 'https://psrc-123456.us-east-1.aws.confluent.cloud',
                           'bearer.auth.credentials.source': 'STATIC_TOKEN',
                           'bearer.auth.token': 'static-token',
                           'bearer.auth.logical.cluster': 'lsrc-12345',
                           'bearer.auth.identity.pool.id': 'pool-abcd'}
    static_oauth_sr_client = SchemaRegistryClient(static_oauth_config)
    print(static_oauth_sr_client.get_subjects())

    client_credentials_oauth_config = {
        'url': 'https://psrc-123456.us-east-1.aws.confluent.cloud',
        'bearer.auth.credentials.source': 'OAUTHBEARER',
        'bearer.auth.client.id': 'client-id',
        'bearer.auth.client.secret': 'client-secret',
        'bearer.auth.scope': 'schema_registry',
        'bearer.auth.issuer.endpoint.url': 'https://yourauthprovider.com/v1/token',
        'bearer.auth.logical.cluster': 'lsrc-12345',
        'bearer.auth.identity.pool.id': 'pool-abcd'}

    client_credentials_oauth_sr_client = SchemaRegistryClient(client_credentials_oauth_config)
    print(client_credentials_oauth_sr_client.get_subjects())

    def custom_oauth_function(config):
        return config

    custom_config = {'bearer.auth.token': 'example-token',
                     'bearer.auth.logical.cluster': 'lsrc-12345', 'bearer.auth.identity.pool.id': 'pool-abcd'}

    custom_sr_config = {'url': 'https://psrc-123456.us-east-1.aws.confluent.cloud',
                        'bearer.auth.credentials.source': 'CUSTOM',
                        'bearer.auth.custom.provider.function': custom_oauth_function,
                        'bearer.auth.custom.provider.config': custom_config}

    custom_sr_client = SchemaRegistryClient(custom_sr_config)
    print(custom_sr_client.get_subjects())


if __name__ == '__main__':
    main()