1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
)
var (
errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
errMemberNotFound = fmt.Errorf("member not found")
)
// NewListener returns a listener for raft message transfer between peers.
// It uses timeout listener to identify broken streams promptly.
func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
return transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
}
// NewRoundTripper returns a roundTripper used to send requests
// to rafthttp listener of remote peers.
func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
// It uses timeout transport to pair with remote timeout listeners.
// It sets no read/write timeout, because message in requests may
// take long time to write out before reading out the response.
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
}
// newStreamRoundTripper returns a roundTripper used to send stream requests
// to rafthttp listener of remote peers.
// Read/write timeout is set for stream roundTripper to promptly
// find out broken status, which minimizes the number of messages
// sent on broken connection.
func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
}
// createPostRequest creates a HTTP POST request that sends raft message.
func createPostRequest(u url.URL, path string, body io.Reader, ct string, urls types.URLs, from, cid types.ID) *http.Request {
uu := u
uu.Path = path
req, err := http.NewRequest("POST", uu.String(), body)
if err != nil {
plog.Panicf("unexpected new request error (%v)", err)
}
req.Header.Set("Content-Type", ct)
req.Header.Set("X-Server-From", from.String())
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
setPeerURLsHeader(req, urls)
return req
}
// checkPostResponse checks the response of the HTTP POST request that sends
// raft message.
func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
switch resp.StatusCode {
case http.StatusPreconditionFailed:
switch strings.TrimSuffix(string(body), "\n") {
case errIncompatibleVersion.Error():
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
return errIncompatibleVersion
case errClusterIDMismatch.Error():
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
return errClusterIDMismatch
default:
return fmt.Errorf("unhandled error %q when precondition failed", string(body))
}
case http.StatusForbidden:
return errMemberRemoved
case http.StatusNoContent:
return nil
default:
return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
}
}
// reportCriticalError reports the given error through sending it into
// the given error channel.
// If the error channel is filled up when sending error, it drops the error
// because the fact that error has happened is reported, which is
// good enough.
func reportCriticalError(err error, errc chan<- error) {
select {
case errc <- err:
default:
}
}
// compareMajorMinorVersion returns an integer comparing two versions based on
// their major and minor version. The result will be 0 if a==b, -1 if a < b,
// and 1 if a > b.
func compareMajorMinorVersion(a, b *semver.Version) int {
na := &semver.Version{Major: a.Major, Minor: a.Minor}
nb := &semver.Version{Major: b.Major, Minor: b.Minor}
switch {
case na.LessThan(*nb):
return -1
case nb.LessThan(*na):
return 1
default:
return 0
}
}
// serverVersion returns the server version from the given header.
func serverVersion(h http.Header) *semver.Version {
verStr := h.Get("X-Server-Version")
// backward compatibility with etcd 2.0
if verStr == "" {
verStr = "2.0.0"
}
return semver.Must(semver.NewVersion(verStr))
}
// serverVersion returns the min cluster version from the given header.
func minClusterVersion(h http.Header) *semver.Version {
verStr := h.Get("X-Min-Cluster-Version")
// backward compatibility with etcd 2.0
if verStr == "" {
verStr = "2.0.0"
}
return semver.Must(semver.NewVersion(verStr))
}
// checkVersionCompability checks whether the given version is compatible
// with the local version.
func checkVersionCompability(name string, server, minCluster *semver.Version) error {
localServer := semver.Must(semver.NewVersion(version.Version))
localMinCluster := semver.Must(semver.NewVersion(version.MinClusterVersion))
if compareMajorMinorVersion(server, localMinCluster) == -1 {
return fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
}
if compareMajorMinorVersion(minCluster, localServer) == 1 {
return fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
}
return nil
}
// setPeerURLsHeader reports local urls for peer discovery
func setPeerURLsHeader(req *http.Request, urls types.URLs) {
if urls == nil {
// often not set in unit tests
return
}
peerURLs := make([]string, urls.Len())
for i := range urls {
peerURLs[i] = urls[i].String()
}
req.Header.Set("X-PeerURLs", strings.Join(peerURLs, ","))
}
// addRemoteFromRequest adds a remote peer according to an http request header
func addRemoteFromRequest(tr Transporter, r *http.Request) {
if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
tr.AddRemote(from, strings.Split(urls, ","))
}
}
}
|