1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
|
package ldap
import (
"bytes"
"errors"
"io"
"net"
"net/http"
"net/http/httptest"
"runtime"
"sync"
"testing"
"time"
"gopkg.in/asn1-ber.v1"
)
func TestUnresponsiveConnection(t *testing.T) {
// The do-nothing server that accepts requests and does nothing
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
defer ts.Close()
c, err := net.Dial(ts.Listener.Addr().Network(), ts.Listener.Addr().String())
if err != nil {
t.Fatalf("error connecting to localhost tcp: %v", err)
}
// Create an Ldap connection
conn := NewConn(c, false)
conn.SetTimeout(time.Millisecond)
conn.Start()
defer conn.Close()
// Mock a packet
packet := ber.Encode(ber.ClassUniversal, ber.TypeConstructed, ber.TagSequence, nil, "LDAP Request")
packet.AppendChild(ber.NewInteger(ber.ClassUniversal, ber.TypePrimitive, ber.TagInteger, conn.nextMessageID(), "MessageID"))
bindRequest := ber.Encode(ber.ClassApplication, ber.TypeConstructed, ApplicationBindRequest, nil, "Bind Request")
bindRequest.AppendChild(ber.NewInteger(ber.ClassUniversal, ber.TypePrimitive, ber.TagInteger, 3, "Version"))
packet.AppendChild(bindRequest)
// Send packet and test response
msgCtx, err := conn.sendMessage(packet)
if err != nil {
t.Fatalf("error sending message: %v", err)
}
defer conn.finishMessage(msgCtx)
packetResponse, ok := <-msgCtx.responses
if !ok {
t.Fatalf("no PacketResponse in response channel")
}
packet, err = packetResponse.ReadPacket()
if err == nil {
t.Fatalf("expected timeout error")
}
if err.Error() != "ldap: connection timed out" {
t.Fatalf("unexpected error: %v", err)
}
}
// TestFinishMessage tests that we do not enter deadlock when a goroutine makes
// a request but does not handle all responses from the server.
func TestConn(t *testing.T) {
ptc := newPacketTranslatorConn()
defer ptc.Close()
conn := NewConn(ptc, false)
conn.Start()
// Test sending 5 different requests in series. Ensure that we can
// get a response packet from the underlying connection and also
// ensure that we can gracefully ignore unhandled responses.
for i := 0; i < 5; i++ {
t.Logf("serial request %d", i)
// Create a message and make sure we can receive responses.
msgCtx := testSendRequest(t, ptc, conn)
testReceiveResponse(t, ptc, msgCtx)
// Send a few unhandled responses and finish the message.
testSendUnhandledResponsesAndFinish(t, ptc, conn, msgCtx, 5)
t.Logf("serial request %d done", i)
}
// Test sending 5 different requests in parallel.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
t.Logf("parallel request %d", i)
// Create a message and make sure we can receive responses.
msgCtx := testSendRequest(t, ptc, conn)
testReceiveResponse(t, ptc, msgCtx)
// Send a few unhandled responses and finish the message.
testSendUnhandledResponsesAndFinish(t, ptc, conn, msgCtx, 5)
t.Logf("parallel request %d done", i)
}(i)
}
wg.Wait()
// We cannot run Close() in a defer because t.FailNow() will run it and
// it will block if the processMessage Loop is in a deadlock.
conn.Close()
}
func testSendRequest(t *testing.T, ptc *packetTranslatorConn, conn *Conn) (msgCtx *messageContext) {
var msgID int64
runWithTimeout(t, time.Second, func() {
msgID = conn.nextMessageID()
})
requestPacket := ber.Encode(ber.ClassUniversal, ber.TypeConstructed, ber.TagSequence, nil, "LDAP Request")
requestPacket.AppendChild(ber.NewInteger(ber.ClassUniversal, ber.TypePrimitive, ber.TagInteger, msgID, "MessageID"))
var err error
runWithTimeout(t, time.Second, func() {
msgCtx, err = conn.sendMessage(requestPacket)
if err != nil {
t.Fatalf("unable to send request message: %s", err)
}
})
// We should now be able to get this request packet out from the other
// side.
runWithTimeout(t, time.Second, func() {
if _, err = ptc.ReceiveRequest(); err != nil {
t.Fatalf("unable to receive request packet: %s", err)
}
})
return msgCtx
}
func testReceiveResponse(t *testing.T, ptc *packetTranslatorConn, msgCtx *messageContext) {
// Send a mock response packet.
responsePacket := ber.Encode(ber.ClassUniversal, ber.TypeConstructed, ber.TagSequence, nil, "LDAP Response")
responsePacket.AppendChild(ber.NewInteger(ber.ClassUniversal, ber.TypePrimitive, ber.TagInteger, msgCtx.id, "MessageID"))
runWithTimeout(t, time.Second, func() {
if err := ptc.SendResponse(responsePacket); err != nil {
t.Fatalf("unable to send response packet: %s", err)
}
})
// We should be able to receive the packet from the connection.
runWithTimeout(t, time.Second, func() {
if _, ok := <-msgCtx.responses; !ok {
t.Fatal("response channel closed")
}
})
}
func testSendUnhandledResponsesAndFinish(t *testing.T, ptc *packetTranslatorConn, conn *Conn, msgCtx *messageContext, numResponses int) {
// Send a mock response packet.
responsePacket := ber.Encode(ber.ClassUniversal, ber.TypeConstructed, ber.TagSequence, nil, "LDAP Response")
responsePacket.AppendChild(ber.NewInteger(ber.ClassUniversal, ber.TypePrimitive, ber.TagInteger, msgCtx.id, "MessageID"))
// Send extra responses but do not attempt to receive them on the
// client side.
for i := 0; i < numResponses; i++ {
runWithTimeout(t, time.Second, func() {
if err := ptc.SendResponse(responsePacket); err != nil {
t.Fatalf("unable to send response packet: %s", err)
}
})
}
// Finally, attempt to finish this message.
runWithTimeout(t, time.Second, func() {
conn.finishMessage(msgCtx)
})
}
func runWithTimeout(t *testing.T, timeout time.Duration, f func()) {
runtime.Gosched()
done := make(chan struct{})
go func() {
f()
close(done)
}()
runtime.Gosched()
select {
case <-done: // Success!
case <-time.After(timeout):
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s:%d timed out", file, line)
}
}
// packetTranslatorConn is a helful type which can be used with various tests
// in this package. It implements the net.Conn interface to be used as an
// underlying connection for a *ldap.Conn. Most methods are no-ops but the
// Read() and Write() methods are able to translate ber-encoded packets for
// testing LDAP requests and responses.
//
// Test cases can simulate an LDAP server sending a response by calling the
// SendResponse() method with a ber-encoded LDAP response packet. Test cases
// can simulate an LDAP server receiving a request from a client by calling the
// ReceiveRequest() method which returns a ber-encoded LDAP request packet.
type packetTranslatorConn struct {
lock sync.Mutex
isClosed bool
responseCond sync.Cond
requestCond sync.Cond
responseBuf bytes.Buffer
requestBuf bytes.Buffer
}
var errPacketTranslatorConnClosed = errors.New("connection closed")
func newPacketTranslatorConn() *packetTranslatorConn {
conn := &packetTranslatorConn{}
conn.responseCond = sync.Cond{L: &conn.lock}
conn.requestCond = sync.Cond{L: &conn.lock}
return conn
}
// Read is called by the reader() loop to receive response packets. It will
// block until there are more packet bytes available or this connection is
// closed.
func (c *packetTranslatorConn) Read(b []byte) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()
for !c.isClosed {
// Attempt to read data from the response buffer. If it fails
// with an EOF, wait and try again.
n, err = c.responseBuf.Read(b)
if err != io.EOF {
return n, err
}
c.responseCond.Wait()
}
return 0, errPacketTranslatorConnClosed
}
// SendResponse writes the given response packet to the response buffer for
// this conection, signalling any goroutine waiting to read a response.
func (c *packetTranslatorConn) SendResponse(packet *ber.Packet) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.isClosed {
return errPacketTranslatorConnClosed
}
// Signal any goroutine waiting to read a response.
defer c.responseCond.Broadcast()
// Writes to the buffer should always succeed.
c.responseBuf.Write(packet.Bytes())
return nil
}
// Write is called by the processMessages() loop to send request packets.
func (c *packetTranslatorConn) Write(b []byte) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()
if c.isClosed {
return 0, errPacketTranslatorConnClosed
}
// Signal any goroutine waiting to read a request.
defer c.requestCond.Broadcast()
// Writes to the buffer should always succeed.
return c.requestBuf.Write(b)
}
// ReceiveRequest attempts to read a request packet from this connection. It
// will block until it is able to read a full request packet or until this
// connection is closed.
func (c *packetTranslatorConn) ReceiveRequest() (*ber.Packet, error) {
c.lock.Lock()
defer c.lock.Unlock()
for !c.isClosed {
// Attempt to parse a request packet from the request buffer.
// If it fails with an unexpected EOF, wait and try again.
requestReader := bytes.NewReader(c.requestBuf.Bytes())
packet, err := ber.ReadPacket(requestReader)
switch err {
case io.EOF, io.ErrUnexpectedEOF:
c.requestCond.Wait()
case nil:
// Advance the request buffer by the number of bytes
// read to decode the request packet.
c.requestBuf.Next(c.requestBuf.Len() - requestReader.Len())
return packet, nil
default:
return nil, err
}
}
return nil, errPacketTranslatorConnClosed
}
// Close closes this connection causing Read() and Write() calls to fail.
func (c *packetTranslatorConn) Close() error {
c.lock.Lock()
defer c.lock.Unlock()
c.isClosed = true
c.responseCond.Broadcast()
c.requestCond.Broadcast()
return nil
}
func (c *packetTranslatorConn) LocalAddr() net.Addr {
return (*net.TCPAddr)(nil)
}
func (c *packetTranslatorConn) RemoteAddr() net.Addr {
return (*net.TCPAddr)(nil)
}
func (c *packetTranslatorConn) SetDeadline(t time.Time) error {
return nil
}
func (c *packetTranslatorConn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *packetTranslatorConn) SetWriteDeadline(t time.Time) error {
return nil
}
|