File: TestReconfig.cc

package info (click to toggle)
zookeeper 3.9.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 26,804 kB
  • sloc: java: 121,943; cpp: 13,986; ansic: 12,419; javascript: 11,754; xml: 4,965; python: 2,829; sh: 2,444; makefile: 241; perl: 114
file content (697 lines) | stat: -rw-r--r-- 22,522 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <cppunit/extensions/HelperMacros.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <errno.h>
#include <iostream>
#include <sstream>
#include <arpa/inet.h>
#include <exception>
#include <stdlib.h>

extern "C" {
#include <src/addrvec.h>
}

#include "Util.h"
#include "LibCMocks.h"
#include "ZKMocks.h"

using namespace std;

static const int portOffset = 2000;

class Client
{

private:    
    // Member variables
    zhandle_t *zh;
    unsigned int seed;

public:
    /**
     * Create a client with given connection host string and add to our internal
     * vector of clients. These are disconnected and cleaned up in tearDown().
     */
    Client(const string hosts, unsigned int seed) :
        seed((seed * seed) + 0xAFAFAFAF)
    {
        reSeed();

        zh = zookeeper_init(hosts.c_str(),0,1000,0,0,0);
        CPPUNIT_ASSERT(zh);

        // Set the flag to disable ZK from reconnecting to a different server.
        // Our reconfig test case will do explicit server shuffling through
        // zoo_cycle_next_server, and the reconnection attempts would interfere
        // with the server states the tests cases assume.
        zh->disable_reconnection_attempt = 1;
        reSeed();

        cycleNextServer();
    }

    void close()
    {
        zookeeper_close(zh);
        zh = NULL;
    }

    bool isReconfig()
    {
        return zh->reconfig != 0;
    }

    /**
     * re-seed this client with it's own previously generated seed so its
     * random choices are unique and separate from the other clients
     */
    void reSeed()
    {
        srandom(seed);
        srand48(seed);
    }

    /**
     * Get the server that this client is currently connected to.
     */
    string getServer()
    {
        const char* addrstring = zoo_get_current_server(zh);
        return string(addrstring);
    }

    /**
     * Get the server this client is currently connected to with no port
     * specification.
     */
    string getServerNoPort()
    {
        string addrstring = getServer();
        size_t found = addrstring.find_last_of(":");
        CPPUNIT_ASSERT(found != string::npos);

        // ipv6 address case (to remove leading and trailing bracket)
        if (addrstring.find("[") != string::npos)
        {
            return addrstring.substr(1, found-2);
        }
        else
        {
            return addrstring.substr(0, found);
        }
    }

    /**
     * Get the port of the server this client is currently connected to.
     */
    uint32_t getServerPort()
    {
        string addrstring = getServer();

        size_t found = addrstring.find_last_of(":");
        CPPUNIT_ASSERT(found != string::npos);

        string portStr = addrstring.substr(found+1);

        stringstream ss(portStr);
        uint32_t port;
        ss >> port;

        CPPUNIT_ASSERT(port >= portOffset);

        return port;
    }

    /**
     * Cycle to the next available server on the next connect attempt. It also
     * calls into getServer (above) to return the server connected to.
     */ 
    string cycleNextServer()
    {
        zoo_cycle_next_server(zh);
        return getServer();
    }

    void cycleUntilServer(const string requested)
    {
        // Call cycleNextServer until the one it's connected to is the one
        // specified (disregarding port).
        string first;

        while(true)
        {
            string next = cycleNextServer();
            if (first.empty())
            {
                first = next;
            } 
            // Else we've looped around!
            else if (first == next)
            {
                CPPUNIT_ASSERT(false);
            }

            // Strip port off
            string server = getServerNoPort();

            // If it matches the requested host we're now 'connected' to the right host
            if (server == requested)
            {
                break;
            }
        }
    }

    /**
     * Set servers for this client.
     */
    void setServers(const string new_hosts)
    {
        int rc = zoo_set_servers(zh, new_hosts.c_str());
        CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
    }

    /**
     * Set servers for this client and validate reconfig value matches expected.
     */
    void setServersAndVerifyReconfig(const string new_hosts, bool is_reconfig)
    {
        setServers(new_hosts);
        CPPUNIT_ASSERT_EQUAL(is_reconfig, isReconfig());
    }

    /**
     * Sets the server list this client is connecting to AND if this requires
     * the client to be reconfigured (as dictated by internal client policy)
     * then it will trigger a call to cycleNextServer.
     */
    void setServersAndCycleIfNeeded(const string new_hosts)
    {
        setServers(new_hosts);
        if (isReconfig())
        {
            cycleNextServer();
        }
    }
};

class Zookeeper_reconfig : public CPPUNIT_NS::TestFixture
{
    CPPUNIT_TEST_SUITE(Zookeeper_reconfig);

    // Test cases
    CPPUNIT_TEST(testcycleNextServer);
    CPPUNIT_TEST(testMigrateOrNot);
    CPPUNIT_TEST(testMigrationCycle);
    CPPUNIT_TEST(testAddrVecContainsIPv4);
#ifdef AF_INET6
    CPPUNIT_TEST(testAddrVecContainsIPv6);
#endif

    // In threaded mode each 'create' is a thread -- it's not practical to create
    // 10,000 threads to test load balancing. The load balancing code can easily
    // be tested in single threaded mode as concurrency doesn't affect the algorithm.
#ifndef THREADED
    CPPUNIT_TEST(testMigrateProbability);
    CPPUNIT_TEST(testLoadBalancing);
#endif

    CPPUNIT_TEST_SUITE_END();

    FILE *logfile;

    double slackPercent;
    static const int numClients = 10000;
    static const int portOffset = 2000;

    vector<Client> clients;
    vector<uint32_t> numClientsPerHost;

public:
    Zookeeper_reconfig() :
        slackPercent(10.0)
    {
      logfile = openlogfile("Zookeeper_reconfig");
    }

    ~Zookeeper_reconfig() 
    {
      if (logfile) 
      {
        fflush(logfile);
        fclose(logfile);
        logfile = 0;
      }
    }

    void setUp()
    {
        zoo_set_log_stream(logfile);
        zoo_deterministic_conn_order(1);

        numClientsPerHost.resize(numClients);
    }

    void tearDown()
    {
        for (unsigned int i = 0; i < clients.size(); i++)
        {
            clients.at(i).close();
        }
    }

    /**
     * Create a client with given connection host string and add to our internal
     * vector of clients. These are disconnected and cleaned up in tearDown().
     */
    Client& createClient(const string hosts)
    {
        Client client(hosts, clients.size());
        clients.push_back(client);

        return clients.back();
    }

    /**
     * Same as createClient(hosts) only it takes a specific host that this client
     * should simulate being connected to.
     */
    Client& createClient(const string hosts, const string host)
    {
        // Ensure requested host is in the list
        size_t found = hosts.find(host);
        CPPUNIT_ASSERT(found != hosts.npos);

        Client client(hosts, clients.size());
        client.cycleUntilServer(host);
        clients.push_back(client);

        return clients.back();
    }

    /**
     * Create a connection host list starting at 'start' and stopping at 'stop'
     * where start >= stop. This creates a connection string with host:port pairs
     * separated by commas. The given 'octet' is the starting octet that is used
     * as the last octet in the host's IP. This is decremented on each iteration. 
     * Each port will be portOffset + octet.
     */
    string createHostList(uint32_t start, uint32_t stop = 1, uint32_t octet = 0)
    {
        if (octet == 0)
        {
            octet = start;
        }

        stringstream ss;

        for (uint32_t i = start; i >= stop; i--, octet--)
        {
            ss << "10.10.10." << octet << ":" << portOffset + octet;

            if (i > stop)
            {
                ss << ", ";
            }
        }

        return ss.str();
    }

    /**
     * Gets the lower bound of the number of clients per server that we expect
     * based on the probabilistic load balancing algorithm implemented by the
     * client code.
     */
    double lowerboundClientsPerServer(int numClients, int numServers)
    {
        return (1 - slackPercent/100.0) * numClients / numServers;
    }

    /**
     * Gets the upper bound of the number of clients per server that we expect
     * based on the probabilistic load balancing algorithm implemented by the
     * client code.
     */
    double upperboundClientsPerServer(int numClients, int numServers)
    {
        return (1 + slackPercent/100.0) * numClients / numServers;
    }

    /**
     * Update all the clients to use a new list of servers. This will also cause
     * the client to cycle to the next server as needed (e.g. due to a reconfig).
     * It then updates the number of clients connected to the server based on
     * this change.
     * 
     * Afterwards it validates that all of the servers have the correct amount of
     * clients based on the probabilistic load balancing algorithm.
     */
    void updateAllClientsAndServers(int start, int stop = 1)
    {
        string newServers = createHostList(start, stop);
        int numServers = start - stop + 1;

        for (int i = 0; i < numClients; i++) {

            Client &client = clients.at(i);
            client.reSeed();

            client.setServersAndCycleIfNeeded(newServers);
            numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
        }

        int offset = stop - 1;
        for (int index = offset; index < numServers; index++) {

            if (numClientsPerHost.at(index) > upperboundClientsPerServer(numClients, numServers))
            {
                cout << "INDEX=" << index << " too many -- actual=" << numClientsPerHost.at(index) 
                     << " expected=" << upperboundClientsPerServer(numClients, numServers) << endl;
            }


            CPPUNIT_ASSERT(numClientsPerHost.at(index) <= upperboundClientsPerServer(numClients, numServers));

            if (numClientsPerHost.at(index) < lowerboundClientsPerServer(numClients, numServers))
            {
                cout << "INDEX=" << index << " too few -- actual=" << numClientsPerHost.at(index) 
                     << " expected=" << lowerboundClientsPerServer(numClients, numServers) << endl;
            }

            CPPUNIT_ASSERT(numClientsPerHost.at(index) >= lowerboundClientsPerServer(numClients, numServers));
            numClientsPerHost.at(index) = 0; // prepare for next test
        }
    }

    /*-------------------------------------------------------------------------*
     * TESTCASES
     *------------------------------------------------------------------------*/

    /**
     * Very basic sunny day test to ensure basic functionality of zoo_set_servers
     * and zoo_cycle_next_server.
     */
    void testcycleNextServer()
    {
        const string initial_hosts = createHostList(10); // 2010..2001
        const string new_hosts = createHostList(4);      // 2004..2001

        Client &client = createClient(initial_hosts);

        client.setServersAndVerifyReconfig(new_hosts, true);

        for (int i = 0; i < 10; i++)
        {
            string next = client.cycleNextServer();
        }
    }

    /**
     * Test the migration policy implicit within the probabilistic load balancing
     * algorithm the Client implements. Tests all the corner cases whereby the
     * list of servers is decreased, increased, and stays the same. Also combines
     * various combinations of the currently connected server being in the new
     * configuration and not.
     */
    void testMigrateOrNot()
    {
        const string initial_hosts = createHostList(4); // 2004..2001

        Client &client = createClient(initial_hosts, "10.10.10.3");

        // Ensemble size decreasing, my server is in the new list
        client.setServersAndVerifyReconfig(createHostList(3), false);

        // Ensemble size decreasing, my server is NOT in the new list
        client.setServersAndVerifyReconfig(createHostList(2), true);

        // Ensemble size stayed the same, my server is NOT in the new list
        client.setServersAndVerifyReconfig(createHostList(2), true);

        // Ensemble size increased, my server is not in the new ensemble
        client.setServers(createHostList(4));
        client.cycleUntilServer("10.10.10.1");
        client.setServersAndVerifyReconfig(createHostList(7,2), true);
    }

    /**
     * This tests that as a client is in reconfig mode it will properly try to
     * connect to all the new servers first. Then it will try to connect to all
     * the 'old' servers that are staying in the new configuration. Finally it
     * will fallback to the normal behavior of trying servers in round-robin.
     */
    void testMigrationCycle()
    {
        int num_initial = 4;
        const string initial_hosts = createHostList(num_initial); // {2004..2001}

        int num_new = 10;
        string new_hosts = createHostList(12, 3);      // {2012..2003}

        // servers from the old list that appear in the new list {2004..2003}
        int num_staying = 2;
        string oldStaying = createHostList(4, 3);

        // servers in the new list that are not in the old list  {2012..2005}
        int num_coming = 8;
        string newComing = createHostList(12, 5);

        // Ensemble in increasing in size, my server is not in the new ensemble
        // load on the old servers must be decreased, so must connect to one of
        // new servers (pNew = 1)
        Client &client = createClient(initial_hosts, "10.10.10.1");
        client.setServersAndVerifyReconfig(new_hosts, true);

        // Since we're in reconfig mode, next connect should be from new list
        // We should try all the new servers *BEFORE* trying any old servers
        string seen;
        for (int i = 0; i < num_coming; i++) {
            client.cycleNextServer();

            // Assert next server is in the 'new' list
            stringstream next;
            next << client.getServerNoPort() << ":" << client.getServerPort();
            size_t found = newComing.find(next.str());
            CPPUNIT_ASSERT_MESSAGE(next.str() + " not in newComing list",
                                   found != string::npos);

            // Assert not in seen list then append
            found = seen.find(next.str());
            CPPUNIT_ASSERT_MESSAGE(next.str() + " in seen list",
                                   found == string::npos);
            seen += next.str() + ", ";
        }

        // Now it should start connecting to the old servers
        seen.clear();
        for (int i = 0; i < num_staying; i++) {
            client.cycleNextServer();

            // Assert it's in the old list
            stringstream next;
            next << client.getServerNoPort() << ":" << client.getServerPort();
            size_t found = oldStaying.find(next.str());
            CPPUNIT_ASSERT(found != string::npos);

            // Assert not in seen list then append
            found = seen.find(next.str());
            CPPUNIT_ASSERT(found == string::npos);
            seen += next.str() + ", ";
        }

        // NOW it goes back to normal as we've tried all the new and old
        string first = client.cycleNextServer();
        for (int i = 0; i < num_new - 1; i++) {
            client.cycleNextServer();
        }

        CPPUNIT_ASSERT_EQUAL(first, client.cycleNextServer());
    }

    /**
     * Test the migration probability to ensure that it conforms to our expected
     * lower and upper bounds of the number of clients per server as we are 
     * reconfigured.
     * 
     * In this case, the list of servers is increased and the client's server is
     * in the new list. Whether to move or not depends on the difference of
     * server sizes with probability 1 - |old|/|new| the client disconnects.
     * 
     * In the test below 1-9/10 = 1/10 chance of disconnecting
     */
    void testMigrateProbability()
    {
        const string initial_hosts = createHostList(9); // 10.10.10.9:2009...10.10.10.1:2001
        string new_hosts = createHostList(10); // 10.10.10.10:2010...10.10.10.1:2001

        uint32_t numDisconnects = 0;
        for (int i = 0; i < numClients; i++) {
            Client &client = createClient(initial_hosts, "10.10.10.3");
            client.setServers(new_hosts);
            if (client.isReconfig())
            {
                numDisconnects++;
            }
        }

        // should be numClients/10 in expectation, we test that it's numClients/10 +- slackPercent
        CPPUNIT_ASSERT(numDisconnects < upperboundClientsPerServer(numClients, 10));
    }

    /**
     * Tests the probabilistic load balancing algorithm implemented by the Client
     * code. 
     * 
     * Test strategy:
     * 
     * (1) Start with 9 servers and 10,000 clients. Remove a server, update
     *     everything, and ensure that the clients are redistributed properly.
     * 
     * (2) Remove two more nodes and repeat the same validations of proper client
     *     redistribution. Ensure no clients are connected to the two removed
     *     nodes.
     * 
     * (3) Remove the first server in the list and simultaneously add the three
     *     previously removed servers. Ensure everything is redistributed and
     *     no clients are connected to the one missing node.
     * 
     * (4) Add the one missing server back into the mix and validate.
     */
    void testLoadBalancing()
    {
        zoo_deterministic_conn_order(0);

        uint32_t numServers = 9;
        const string initial_hosts = createHostList(numServers); // 10.10.10.9:2009...10.10.10.1:2001

        // Create connections to servers
        for (int i = 0; i < numClients; i++) {
            Client &client = createClient(initial_hosts);
            numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
        }

        for (uint32_t i = 0; i < numServers; i++) {
            CPPUNIT_ASSERT(numClientsPerHost.at(i) <= upperboundClientsPerServer(numClients, numServers));
            CPPUNIT_ASSERT(numClientsPerHost.at(i) >= lowerboundClientsPerServer(numClients, numServers));
            numClientsPerHost.at(i) = 0; // prepare for next test
        }

        // remove last server
        numServers = 8;
        updateAllClientsAndServers(numServers);
        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));

        // Remove two more nodes
        numServers = 6;
        updateAllClientsAndServers(numServers);
        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+1));
        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+2));

        // remove host 0 (first one in list) and add back 6, 7, and 8
        numServers = 8;
        updateAllClientsAndServers(numServers, 1);
        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(0));

        // add back host number 0
        numServers = 9;
        updateAllClientsAndServers(numServers);
    }

    /**
     * This tests that client can detect server's ipv4 address change.
     *
     * (1) We generate some address and put in addr, which saddr point to
     * (2) Add all addresses that differ by one bit from the source
     * (3) Add same address, but set ipv6 protocol
     * (4) Ensure, that our address is not equal to any of generated,
     *     and that it equals to itself
     */
    void testAddrVecContainsIPv4() {
        addrvec_t vec;
        addrvec_init(&vec);

        sockaddr_storage addr;
        sockaddr_in* saddr = (sockaddr_in*)&addr;
        saddr->sin_family = AF_INET;
        saddr->sin_port = htons((u_short)1234);
        saddr->sin_addr.s_addr = INADDR_ANY;

        CPPUNIT_ASSERT(sizeof(saddr->sin_addr.s_addr) == 4);

        for (int i = 0; i < 32; i++) {
            saddr->sin_addr.s_addr ^= (1 << i);
            addrvec_append(&vec, &addr);
            saddr->sin_addr.s_addr ^= (1 << i);
        }

        saddr->sin_family = AF_INET6;
        addrvec_append(&vec, &addr);
        saddr->sin_family = AF_INET;

        CPPUNIT_ASSERT(!addrvec_contains(&vec, &addr));
        addrvec_append(&vec, &addr);
        CPPUNIT_ASSERT(addrvec_contains(&vec, &addr));
        addrvec_free(&vec);
    }

    /**
     * This tests that client can detect server's ipv6 address change.
     *
     * Same logic as in previous testAddrVecContainsIPv4 method,
     * but we keep in mind, that ipv6 is 128-bit long.
     */
#ifdef AF_INET6
    void testAddrVecContainsIPv6() {
        addrvec_t vec;
        addrvec_init(&vec);

        sockaddr_storage addr;
        sockaddr_in6* saddr = (sockaddr_in6*)&addr;
        saddr->sin6_family = AF_INET6;
        saddr->sin6_port = htons((u_short)1234);
        saddr->sin6_addr = in6addr_any;

        CPPUNIT_ASSERT(sizeof(saddr->sin6_addr.s6_addr) == 16);

        for (int i = 0; i < 16; i++) {
            for (int j = 0; j < 8; j++) {
                saddr->sin6_addr.s6_addr[i] ^= (1 << j);
                addrvec_append(&vec, &addr);
                saddr->sin6_addr.s6_addr[i] ^= (1 << j);
            }
        }

        saddr->sin6_family = AF_INET;
        addrvec_append(&vec, &addr);
        saddr->sin6_family = AF_INET6;

        CPPUNIT_ASSERT(!addrvec_contains(&vec, &addr));
        addrvec_append(&vec, &addr);
        CPPUNIT_ASSERT(addrvec_contains(&vec, &addr));
        addrvec_free(&vec);
    }
#endif
};

CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_reconfig);