File: client.go

package info (click to toggle)
golang-github-lensesio-schema-registry 0.1.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 152 kB
  • sloc: makefile: 4
file content (166 lines) | stat: -rw-r--r-- 5,069 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Package schemaregistry provides a client for Confluent's Kafka Schema Registry REST API.
package schemaregistry

import (
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"path"
)

// DefaultUrl is the address where a local schema registry listens by default.
var DefaultUrl = "http://localhost:8081"

// These numbers are used by the schema registry to communicate errors.
const (
	subjectNotFound = 40401
	schemaNotFound  = 40403
)

// The Schema type is an object produced by the schema registry.
type Schema struct {
	Schema  string `json:"schema"`  // The actual AVRO schema
	Subject string `json:"subject"` // Subject where the schema is registered for
	Version int    `json:"version"` // Version within this subject
	Id      int    `json:"id"`      // Registry's unique id
}

type simpleSchema struct {
	Schema string `json:"schema"`
}

// A ConfluentError is an error as communicated by the schema registry.
// Some day this type might be exposed so that callers can do type assertions on it.
type confluentError struct {
	ErrorCode int    `json:"error_code"`
	Message   string `json:"message"`
}

// Error makes confluentError implement the error interface.
func (ce confluentError) Error() string {
	return fmt.Sprintf("%s (%d)", ce.Message, ce.ErrorCode)
}

type httpDoer interface {
	Do(req *http.Request) (resp *http.Response, err error)
}

// A Client is a client for the schema registry.
type Client interface {
	Subjects() (subjects []string, err error)
	Versions(subject string) (versions []int, err error)
	RegisterNewSchema(subject, schema string) (int, error)
	IsRegistered(subject, schema string) (bool, Schema, error)
	GetSchemaById(id int) (string, error)
	GetSchemaBySubject(subject string, ver int) (s Schema, err error)
	GetLatestSchema(subject string) (s Schema, err error)
}

type client struct {
	url    url.URL
	client httpDoer
}

func parseSchemaRegistryError(resp *http.Response) error {
	var ce confluentError
	if err := json.NewDecoder(resp.Body).Decode(&ce); err != nil {
		return err
	}
	return ce
}

// do performs http requests and json (de)serialization.
func (c *client) do(method, urlPath string, in interface{}, out interface{}) error {
	u := c.url
	u.Path = path.Join(u.Path, urlPath)
	var rdp io.Reader
	if in != nil {
		var wr *io.PipeWriter
		rdp, wr = io.Pipe()
		go func() {
			wr.CloseWithError(json.NewEncoder(wr).Encode(in))
		}()
	}
	req, err := http.NewRequest(method, u.String(), rdp)
	req.Header.Add("Accept", "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json")
	if err != nil {
		return err
	}
	resp, err := c.client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return parseSchemaRegistryError(resp)
	}
	return json.NewDecoder(resp.Body).Decode(out)
}

// Subjects returns all registered subjects.
func (c *client) Subjects() (subjects []string, err error) {
	err = c.do("GET", "subjects", nil, &subjects)
	return
}

// Versions returns all schema version numbers registered for this subject.
func (c *client) Versions(subject string) (versions []int, err error) {
	err = c.do("GET", fmt.Sprintf("subjects/%s/versions", subject), nil, &versions)
	return
}

// RegisterNewSchema registers the given schema for this subject.
func (c *client) RegisterNewSchema(subject, schema string) (int, error) {
	var resp struct {
		Id int `json:"id"`
	}
	err := c.do("POST", fmt.Sprintf("/subjects/%s/versions", subject), simpleSchema{schema}, &resp)
	return resp.Id, err
}

// IsRegistered tells if the given schema is registred for this subject.
func (c *client) IsRegistered(subject, schema string) (bool, Schema, error) {
	var fs Schema
	err := c.do("POST", fmt.Sprintf("/subjects/%s", subject), simpleSchema{schema}, &fs)
	// schema not found?
	if ce, confluentErr := err.(confluentError); confluentErr && ce.ErrorCode == schemaNotFound {
		return false, fs, nil
	}
	// error?
	if err != nil {
		return false, fs, err
	}
	// so we have a schema then
	return true, fs, nil
}

// GetSchemaById returns the schema for some id.
// The schema registry only provides the schema itself, not the id, subject or version.
func (c *client) GetSchemaById(id int) (string, error) {
	var s Schema
	err := c.do("GET", fmt.Sprintf("/schemas/ids/%d", id), nil, &s)
	return s.Schema, err
}

// GetSchemaBySubject returns the schema for a particular subject and version.
func (c *client) GetSchemaBySubject(subject string, ver int) (s Schema, err error) {
	err = c.do("GET", fmt.Sprintf("/subjects/%s/versions/%d", subject, ver), nil, &s)
	return
}

// GetLatestSchema returns the latest version of the subject's schema.
func (c *client) GetLatestSchema(subject string) (s Schema, err error) {
	err = c.do("GET", fmt.Sprintf("/subjects/%s/versions/latest", subject), nil, &s)
	return
}

// NewClient returns a new Client that connects to baseurl.
func NewClient(baseurl string) (Client, error) {
	u, err := url.Parse(baseurl)
	if err != nil {
		return nil, err
	}
	return &client{*u, http.DefaultClient}, nil
}