File: msk_iam.go

package info (click to toggle)
golang-github-segmentio-kafka-go 0.4.49%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,292 kB
  • sloc: sh: 17; makefile: 10
file content (180 lines) | stat: -rw-r--r-- 5,471 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package aws_msk_iam_v2

import (
	"context"
	"encoding/json"
	"errors"
	"net/http"
	"net/url"
	"runtime"
	"strconv"
	"strings"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
	"github.com/segmentio/kafka-go/sasl"
)

const (
	// These constants come from https://github.com/aws/aws-msk-iam-auth#details and
	// https://github.com/aws/aws-msk-iam-auth/blob/main/src/main/java/software/amazon/msk/auth/iam/internals/AWS4SignedPayloadGenerator.java.
	signAction       = "kafka-cluster:Connect"
	signPayload      = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" // the hex encoded SHA-256 of an empty string
	signService      = "kafka-cluster"
	signVersion      = "2020_10_22"
	signActionKey    = "action"
	signHostKey      = "host"
	signUserAgentKey = "user-agent"
	signVersionKey   = "version"
	queryActionKey   = "Action"
	queryExpiryKey   = "X-Amz-Expires"
)

var signUserAgent = "kafka-go/sasl/aws_msk_iam_v2/" + runtime.Version()

// Mechanism implements sasl.Mechanism for the AWS_MSK_IAM mechanism, based on the official java implementation:
// https://github.com/aws/aws-msk-iam-auth
type Mechanism struct {
	// The sigv4.Signer of aws-sdk-go-v2 to use when signing the request. Required.
	Signer *signer.Signer
	// The aws.Config.Credentials or config.CredentialsProvider of aws-sdk-go-v2. Required.
	Credentials aws.CredentialsProvider
	// The region where the msk cluster is hosted, e.g. "us-east-1". Required.
	Region string
	// The time the request is planned for. Optional, defaults to time.Now() at time of authentication.
	SignTime time.Time
	// The duration for which the presigned request is active. Optional, defaults to 5 minutes.
	Expiry time.Duration
}

func (m *Mechanism) Name() string {
	return "AWS_MSK_IAM"
}

func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
	// After the initial step, the authentication is complete
	// kafka will return error if it rejected the credentials, so we'll only
	// arrive here on success.
	return true, nil, nil
}

// Start produces the authentication values required for AWS_MSK_IAM. It produces the following json as a byte array,
// making use of the aws-sdk to produce the signed output.
//
//	{
//	  "version" : "2020_10_22",
//	  "host" : "<broker host>",
//	  "user-agent": "<user agent string from the client>",
//	  "action": "kafka-cluster:Connect",
//	  "x-amz-algorithm" : "<algorithm>",
//	  "x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
//	  "x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
//	  "x-amz-security-token" : "<clientAWSSessionToken if any>",
//	  "x-amz-signedheaders" : "host",
//	  "x-amz-expires" : "<expiration in seconds>",
//	  "x-amz-signature" : "<AWS SigV4 signature computed by the client>"
//	}
func (m *Mechanism) Start(ctx context.Context) (sess sasl.StateMachine, ir []byte, err error) {
	signedMap, err := m.preSign(ctx)
	if err != nil {
		return nil, nil, err
	}

	signedJson, err := json.Marshal(signedMap)
	return m, signedJson, err
}

// preSign produces the authentication values required for AWS_MSK_IAM.
func (m *Mechanism) preSign(ctx context.Context) (map[string]string, error) {
	req, err := buildReq(ctx, defaultExpiry(m.Expiry))
	if err != nil {
		return nil, err
	}

	creds, err := m.Credentials.Retrieve(ctx)
	if err != nil {
		return nil, err
	}

	signedUrl, header, err := m.Signer.PresignHTTP(ctx, creds, req, signPayload, signService, m.Region, defaultSignTime(m.SignTime))
	if err != nil {
		return nil, err
	}

	u, err := url.Parse(signedUrl)
	if err != nil {
		return nil, err
	}
	return buildSignedMap(u, header), nil
}

// buildReq builds http.Request for aws PreSign.
func buildReq(ctx context.Context, expiry time.Duration) (*http.Request, error) {
	query := url.Values{
		queryActionKey: {signAction},
		queryExpiryKey: {strconv.FormatInt(int64(expiry/time.Second), 10)},
	}
	saslMeta := sasl.MetadataFromContext(ctx)
	if saslMeta == nil {
		return nil, errors.New("missing sasl metadata")
	}

	signUrl := url.URL{
		Scheme:   "kafka",
		Host:     saslMeta.Host,
		Path:     "/",
		RawQuery: query.Encode(),
	}

	req, err := http.NewRequest(http.MethodGet, signUrl.String(), nil)
	if err != nil {
		return nil, err
	}

	return req, nil
}

// buildSignedMap builds signed string map which will be used to authenticate with MSK.
func buildSignedMap(u *url.URL, header http.Header) map[string]string {
	signedMap := map[string]string{
		signVersionKey:   signVersion,
		signHostKey:      u.Host,
		signUserAgentKey: signUserAgent,
		signActionKey:    signAction,
	}
	// The protocol requires lowercase keys.
	for key, vals := range header {
		signedMap[strings.ToLower(key)] = vals[0]
	}
	for key, vals := range u.Query() {
		signedMap[strings.ToLower(key)] = vals[0]
	}

	return signedMap
}

// defaultExpiry set default expiration time if user doesn't define Mechanism.Expiry.
func defaultExpiry(v time.Duration) time.Duration {
	if v == 0 {
		return 5 * time.Minute
	}
	return v
}

// defaultSignTime set default sign time if user doesn't define Mechanism.SignTime.
func defaultSignTime(v time.Time) time.Time {
	if v.IsZero() {
		return time.Now()
	}
	return v
}

// NewMechanism provides
func NewMechanism(awsCfg aws.Config) *Mechanism {
	return &Mechanism{
		Signer:      signer.NewSigner(),
		Credentials: awsCfg.Credentials,
		Region:      awsCfg.Region,
	}
}