File: gssapi_kerberos.go

package info (click to toggle)
golang-github-ibm-sarama 1.46.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,072 kB
  • sloc: makefile: 40; sh: 30
file content (321 lines) | stat: -rw-r--r-- 8,765 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package sarama

import (
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"math"
	"net"
	"strings"
	"time"

	"github.com/jcmturner/gofork/encoding/asn1"
	"github.com/jcmturner/gokrb5/v8/asn1tools"
	"github.com/jcmturner/gokrb5/v8/gssapi"
	"github.com/jcmturner/gokrb5/v8/iana/chksumtype"
	"github.com/jcmturner/gokrb5/v8/iana/keyusage"
	"github.com/jcmturner/gokrb5/v8/messages"
	"github.com/jcmturner/gokrb5/v8/types"
)

const (
	TOK_ID_KRB_AP_REQ   = 256
	GSS_API_GENERIC_TAG = 0x60
	KRB5_USER_AUTH      = 1
	KRB5_KEYTAB_AUTH    = 2
	KRB5_CCACHE_AUTH    = 3
	GSS_API_INITIAL     = 1
	GSS_API_VERIFY      = 2
	GSS_API_FINISH      = 3
)

type GSSAPIConfig struct {
	AuthType           int
	KeyTabPath         string
	CCachePath         string
	KerberosConfigPath string
	ServiceName        string
	Username           string
	Password           string
	Realm              string
	DisablePAFXFAST    bool
	BuildSpn           BuildSpnFunc
}

type GSSAPIKerberosAuth struct {
	Config                *GSSAPIConfig
	ticket                messages.Ticket
	encKey                types.EncryptionKey
	NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
	step                  int
}

type KerberosClient interface {
	Login() error
	GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
	Domain() string
	CName() types.PrincipalName
	Destroy()
}

type BuildSpnFunc func(serviceName, host string) string

// writePackage appends length in big endian before the payload, and sends it to kafka
func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
	length := uint64(len(payload))
	size := length + 4 // 4 byte length header + payload
	if size > math.MaxInt32 {
		return 0, errors.New("payload too large, will overflow int32")
	}
	finalPackage := make([]byte, size)
	copy(finalPackage[4:], payload)
	binary.BigEndian.PutUint32(finalPackage, uint32(length))
	bytes, err := broker.conn.Write(finalPackage)
	if err != nil {
		return bytes, err
	}
	return bytes, nil
}

// readPackage reads payload length (4 bytes) and then reads the payload into []byte
func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
	bytesRead := 0
	lengthInBytes := make([]byte, 4)
	bytes, err := io.ReadFull(broker.conn, lengthInBytes)
	if err != nil {
		return nil, bytesRead, err
	}
	bytesRead += bytes
	payloadLength := binary.BigEndian.Uint32(lengthInBytes)
	payloadBytes := make([]byte, payloadLength)         // buffer for read..
	bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
	if err != nil {
		return payloadBytes, bytesRead, err
	}
	bytesRead += bytes
	return payloadBytes, bytesRead, nil
}

func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
	a := make([]byte, 24)
	flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
	binary.LittleEndian.PutUint32(a[:4], 16)
	for _, i := range flags {
		f := binary.LittleEndian.Uint32(a[20:24])
		f |= uint32(i)
		binary.LittleEndian.PutUint32(a[20:24], f)
	}
	return a
}

// Construct Kerberos AP_REQ package, conforming to RFC-4120
// https://tools.ietf.org/html/rfc4120#page-84
func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
	domain string,
	cname types.PrincipalName,
	ticket messages.Ticket,
	sessionKey types.EncryptionKey,
) ([]byte, error) {
	auth, err := types.NewAuthenticator(domain, cname)
	if err != nil {
		return nil, err
	}
	auth.Cksum = types.Checksum{
		CksumType: chksumtype.GSSAPI,
		Checksum:  krbAuth.newAuthenticatorChecksum(),
	}
	APReq, err := messages.NewAPReq(
		ticket,
		sessionKey,
		auth,
	)
	if err != nil {
		return nil, err
	}
	aprBytes := make([]byte, 2)
	binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
	tb, err := APReq.Marshal()
	if err != nil {
		return nil, err
	}
	aprBytes = append(aprBytes, tb...)
	return aprBytes, nil
}

// Append the GSS-API header to the payload, conforming to RFC-2743
// Section 3.1, Mechanism-Independent Token Format
//
// https://tools.ietf.org/html/rfc2743#page-81
//
// GSSAPIHeader + <specific mechanism payload>
func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
	oidBytes, err := asn1.Marshal(gssapi.OIDKRB5.OID())
	if err != nil {
		return nil, err
	}
	tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
	GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
	GSSHeader = append(GSSHeader, oidBytes...)
	GSSPackage := append(GSSHeader, payload...)
	return GSSPackage, nil
}

func (krbAuth *GSSAPIKerberosAuth) initSecContext(
	client KerberosClient,
	bytes []byte,
) ([]byte, error) {
	switch krbAuth.step {
	case GSS_API_INITIAL:
		aprBytes, err := krbAuth.createKrb5Token(
			client.Domain(),
			client.CName(),
			krbAuth.ticket,
			krbAuth.encKey)
		if err != nil {
			return nil, err
		}
		krbAuth.step = GSS_API_VERIFY
		return krbAuth.appendGSSAPIHeader(aprBytes)
	case GSS_API_VERIFY:
		wrapTokenReq := gssapi.WrapToken{}
		if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
			return nil, err
		}
		// Validate response.
		isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
		if !isValid {
			return nil, err
		}

		wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
		if err != nil {
			return nil, err
		}
		krbAuth.step = GSS_API_FINISH
		return wrapTokenResponse.Marshal()
	}
	return nil, nil
}

func (krbAuth *GSSAPIKerberosAuth) spn(broker *Broker) string {
	host, _, _ := net.SplitHostPort(broker.addr)
	var spn string
	if krbAuth.Config.BuildSpn != nil {
		spn = krbAuth.Config.BuildSpn(broker.conf.Net.SASL.GSSAPI.ServiceName, host)
	} else {
		spn = fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
	}
	return spn
}

// Login will use the given KerberosClient to login and get a ticket for the given spn.
func (krbAuth *GSSAPIKerberosAuth) Login(
	client KerberosClient,
	spn string,
) (*messages.Ticket, error) {
	if err := client.Login(); err != nil {
		Logger.Printf("Kerberos client login error: %s", err)
		return nil, err
	}

	ticket, encKey, err := client.GetServiceTicket(spn)
	if err != nil {
		Logger.Printf("Kerberos service ticket error for %s: %s", spn, err)
		return nil, err
	}
	krbAuth.ticket = ticket
	krbAuth.encKey = encKey
	krbAuth.step = GSS_API_INITIAL

	return &ticket, nil
}

// Authorize performs the kerberos auth handshake for authorization
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
	client, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
	if err != nil {
		Logger.Printf("Kerberos client initialization error: %s", err)
		return err
	}
	defer client.Destroy()

	ticket, err := krbAuth.Login(client, krbAuth.spn(broker))
	if err != nil {
		return err
	}

	principal := strings.Join(ticket.SName.NameString, "/") + "@" + ticket.Realm
	var receivedBytes []byte

	for {
		packBytes, err := krbAuth.initSecContext(client, receivedBytes)
		if err != nil {
			Logger.Printf("Kerberos init error as %s: %s", principal, err)
			return err
		}

		requestTime := time.Now()
		bytesWritten, err := krbAuth.writePackage(broker, packBytes)
		if err != nil {
			Logger.Printf("Kerberos write error as %s: %s", principal, err)
			return err
		}
		broker.updateOutgoingCommunicationMetrics(bytesWritten)

		switch krbAuth.step {
		case GSS_API_VERIFY:
			var bytesRead int
			receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
			requestLatency := time.Since(requestTime)
			broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
			if err != nil {
				Logger.Printf("Kerberos read error as %s: %s", principal, err)
				return err
			}
		case GSS_API_FINISH:
			return nil
		}
	}
}

// AuthorizeV2 performs the SASL v2 GSSAPI authentication with the Kafka broker.
func (krbAuth *GSSAPIKerberosAuth) AuthorizeV2(
	broker *Broker,
	authSendReceiver func(authBytes []byte) (*SaslAuthenticateResponse, error),
) error {
	client, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
	if err != nil {
		Logger.Printf("Kerberos client initialization error: %s", err)
		return err
	}
	defer client.Destroy()

	ticket, err := krbAuth.Login(client, krbAuth.spn(broker))
	if err != nil {
		return err
	}

	principal := strings.Join(ticket.SName.NameString, "/") + "@" + ticket.Realm
	var receivedBytes []byte

	for {
		token, err := krbAuth.initSecContext(client, receivedBytes)
		if err != nil {
			Logger.Printf("SASL Kerberos init error as %s: %s", principal, err)
			return err
		}

		authResponse, err := authSendReceiver(token)
		if err != nil {
			Logger.Printf("SASL Kerberos authenticate error as %s: %s", principal, err)
			return err
		}

		receivedBytes = authResponse.SaslAuthBytes

		if krbAuth.step == GSS_API_FINISH {
			return nil
		}
	}
}