File: ZKMocks.h

package info (click to toggle)
zookeeper 3.4.13-6%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 17,744 kB
  • sloc: java: 60,531; xml: 18,356; cpp: 11,970; ansic: 9,192; sh: 3,112; python: 2,382; makefile: 261; perl: 114; javascript: 29
file content (509 lines) | stat: -rw-r--r-- 15,521 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef ZKMOCKS_H_
#define ZKMOCKS_H_

#include <zookeeper.h>
#include "src/zk_adaptor.h"

#include "Util.h"
#include "LibCMocks.h"
#include "MocksBase.h"

// *****************************************************************************
// sets internal zhandle_t members to certain values to simulate the client 
// connected state. This function should only be used with the single-threaded
// Async API tests!
void forceConnected(zhandle_t* zh); 

/**
 * Gracefully terminates zookeeper I/O and completion threads. 
 */
void terminateZookeeperThreads(zhandle_t* zh);

// *****************************************************************************
// Abstract watcher action
struct SyncedBoolCondition;

class WatcherAction{
public:
    WatcherAction():triggered_(false){}
    virtual ~WatcherAction(){}
    
    virtual void onSessionExpired(zhandle_t*){}
    virtual void onConnectionEstablished(zhandle_t*){}
    virtual void onConnectionLost(zhandle_t*){}
    virtual void onNodeValueChanged(zhandle_t*,const char* path){}
    virtual void onNodeDeleted(zhandle_t*,const char* path){}
    virtual void onChildChanged(zhandle_t*,const char* path){}
    
    SyncedBoolCondition isWatcherTriggered() const;
    void setWatcherTriggered(){
        synchronized(mx_);
        triggered_=true;
    }

protected:
    mutable Mutex mx_;
    bool triggered_;
};
// zh->context is a pointer to a WatcherAction instance
// based on the event type and state, the watcher calls a specific watcher 
// action method
void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx);

// *****************************************************************************
// a set of async completion signatures
class AsyncCompletion{
public:
    virtual ~AsyncCompletion(){}
    virtual void aclCompl(int rc, ACL_vector *acl,Stat *stat){}
    virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){}
    virtual void statCompl(int rc, const Stat *stat){}
    virtual void stringCompl(int rc, const char *value){}
    virtual void stringsCompl(int rc,const String_vector *strings){}
    virtual void voidCompl(int rc){}
};
void asyncCompletion(int rc, ACL_vector *acl,Stat *stat, const void *data);
void asyncCompletion(int rc, const char *value, int len, const Stat *stat, 
        const void *data);
void asyncCompletion(int rc, const Stat *stat, const void *data);
void asyncCompletion(int rc, const char *value, const void *data);
void asyncCompletion(int rc,const String_vector *strings, const void *data);
void asyncCompletion(int rc, const void *data);

// *****************************************************************************
// some common predicates to use with ensureCondition():
// checks if the connection is established
struct ClientConnected{
    ClientConnected(zhandle_t* zh):zh_(zh){}
    bool operator()() const{
        return zoo_state(zh_)==ZOO_CONNECTED_STATE;
    }
    zhandle_t* zh_;
};
// check in the session expired
struct SessionExpired{
    SessionExpired(zhandle_t* zh):zh_(zh){}
    bool operator()() const{
        return zoo_state(zh_)==ZOO_EXPIRED_SESSION_STATE;
    }
    zhandle_t* zh_;
};
// checks if the IO thread has stopped; CheckedPthread must be active
struct IOThreadStopped{
    IOThreadStopped(zhandle_t* zh):zh_(zh){}
    bool operator()() const;
    zhandle_t* zh_;
};

// a synchronized boolean condition
struct SyncedBoolCondition{
    SyncedBoolCondition(const bool& cond,Mutex& mx):cond_(cond),mx_(mx){}
    bool operator()() const{
        synchronized(mx_);
        return cond_;
    }
    const bool& cond_;
    Mutex& mx_;
};

// a synchronized integer comparison
struct SyncedIntegerEqual{
    SyncedIntegerEqual(const int& cond,int expected,Mutex& mx):
        cond_(cond),expected_(expected),mx_(mx){}
    bool operator()() const{
        synchronized(mx_);
        return cond_==expected_;
    }
    const int& cond_;
    const int expected_;
    Mutex& mx_;
};

// *****************************************************************************
// make sure to call zookeeper_close() even in presence of exceptions 
struct CloseFinally{
    CloseFinally(zhandle_t** zh):zh_(zh){}
    ~CloseFinally(){
        execute();
    }
    int execute(){
        if(zh_==0)return ZOK;
        zhandle_t* lzh=*zh_;
        *zh_=0;
        disarm();
        return zookeeper_close(lzh);
    }
    void disarm(){zh_=0;}
    zhandle_t ** zh_;
};

struct TestClientId: clientid_t{
    static const int SESSION_ID=123456789;
    static const char* PASSWD;
    TestClientId(){
        client_id=SESSION_ID;
        memcpy(passwd,PASSWD,sizeof(passwd));
    }
};

// *****************************************************************************
// special client id recongnized by the ZK server simulator 
extern TestClientId testClientId;
#define TEST_CLIENT_ID &testClientId

// *****************************************************************************
//
struct HandshakeRequest: public connect_req
{
    static HandshakeRequest* parse(const std::string& buf);
    static bool isValid(const std::string& buf){
        // this is just quick and dirty check before we go and parse the request
        return buf.size()==HANDSHAKE_REQ_SIZE;
    }
};

// *****************************************************************************
// flush_send_queue
class Mock_flush_send_queue: public Mock
{
public:
    Mock_flush_send_queue():counter(0),callReturns(ZOK){mock_=this;}
    ~Mock_flush_send_queue(){mock_=0;}
    
    int counter;
    int callReturns;
    virtual int call(zhandle_t* zh, int timeout){
        counter++;
        return callReturns;
    }

    static Mock_flush_send_queue* mock_;
};

// *****************************************************************************
// get_xid
class Mock_get_xid: public Mock
{
public:
    static const int32_t XID=123456;
    Mock_get_xid(int retValue=XID):callReturns(retValue){mock_=this;}
    ~Mock_get_xid(){mock_=0;}
    
    int callReturns;
    virtual int call(){
        return callReturns;
    }

    static Mock_get_xid* mock_;
};

// *****************************************************************************
// activateWatcher
class Mock_activateWatcher: public Mock{
public:
    Mock_activateWatcher(){mock_=this;}
    virtual ~Mock_activateWatcher(){mock_=0;}
    
    virtual void call(zhandle_t *zh, watcher_registration_t* reg, int rc){}
    static Mock_activateWatcher* mock_;
};

class ActivateWatcherWrapper;
class WatcherActivationTracker{
public:
    WatcherActivationTracker();
    ~WatcherActivationTracker();
    
    void track(void* ctx);
    SyncedBoolCondition isWatcherActivated() const;
private:
    ActivateWatcherWrapper* wrapper_;
};

// *****************************************************************************
// deliverWatchers
class Mock_deliverWatchers: public Mock{
public:
    Mock_deliverWatchers(){mock_=this;}
    virtual ~Mock_deliverWatchers(){mock_=0;}
    
    virtual void call(zhandle_t* zh,int type,int state, const char* path, watcher_object_list **){}
    static Mock_deliverWatchers* mock_;
};

class DeliverWatchersWrapper;
class WatcherDeliveryTracker{
public:
    // filters deliveries by state and type
    WatcherDeliveryTracker(int type,int state,bool terminateCompletionThread=true);
    ~WatcherDeliveryTracker();
    
    // if the thread termination requested (see the ctor params)
    // this function will wait for the I/O and completion threads to 
    // terminate before returning a SyncBoolCondition instance
    SyncedBoolCondition isWatcherProcessingCompleted() const;
    void resetDeliveryCounter();
    SyncedIntegerEqual deliveryCounterEquals(int expected) const;
private:
    DeliverWatchersWrapper* deliveryWrapper_;
};

// *****************************************************************************
// a zookeeper Stat wrapper
struct NodeStat: public Stat
{
    NodeStat(){
        czxid=0;
        mzxid=0;
        ctime=0;
        mtime=0;
        version=1;
        cversion=0;
        aversion=0;
        ephemeralOwner=0;
    }
    NodeStat(const Stat& other){
        memcpy(this,&other,sizeof(*this));
    }
};

// *****************************************************************************
// Abstract server Response
class Response
{
public:
    virtual ~Response(){}
    
    virtual void setXID(int32_t xid){}
    // this method is used by the ZookeeperServer class to serialize 
    // the instance of Response
    virtual std::string toString() const =0;
};

// *****************************************************************************
// Handshake response
class HandshakeResponse: public Response
{
public:
    HandshakeResponse(int64_t sessId=1)
        :protocolVersion(1),timeOut(10000),sessionId(sessId),passwd_len(sizeof(passwd))
    {
        memcpy(passwd,"1234567890123456",sizeof(passwd));
    }
    int32_t protocolVersion;
    int32_t timeOut;
    int64_t sessionId;
    int32_t passwd_len;
    char passwd[16];
    virtual std::string toString() const ;
};

// zoo_get() response
class ZooGetResponse: public Response
{
public:
    ZooGetResponse(const char* data, int len,int32_t xid=0,int rc=ZOK,const Stat& stat=NodeStat())
        :xid_(xid),data_(data,len),rc_(rc),stat_(stat)
    {
    }
    virtual std::string toString() const;
    virtual void setXID(int32_t xid) {xid_=xid;}
    
private:
    int32_t xid_;
    std::string data_;
    int rc_;
    Stat stat_;
};

// zoo_exists(), zoo_set() response
class ZooStatResponse: public Response
{
public:
    ZooStatResponse(int32_t xid=0,int rc=ZOK,const Stat& stat=NodeStat())
        :xid_(xid),rc_(rc),stat_(stat)
    {
    }
    virtual std::string toString() const;
    virtual void setXID(int32_t xid) {xid_=xid;}
    
private:
    int32_t xid_;
    int rc_;
    Stat stat_;
};

// zoo_get_children()
class ZooGetChildrenResponse: public Response
{
public:
    typedef std::vector<std::string> StringVector;
    ZooGetChildrenResponse(const StringVector& v,int rc=ZOK):
        xid_(0),strings_(v),rc_(rc)
    {
    }
    
    virtual std::string toString() const;
    virtual void setXID(int32_t xid) {xid_=xid;}

    int32_t xid_;
    StringVector strings_;
    int rc_;
};

// PING response
class PingResponse: public Response
{
public:
    virtual std::string toString() const;    
};

// watcher znode event
class ZNodeEvent: public Response
{
public:
    ZNodeEvent(int type,const char* path):type_(type),path_(path){}
    
    virtual std::string toString() const;
    
private:
    int type_;
    std::string path_;
};

// ****************************************************************************
// Zookeeper server simulator

class ZookeeperServer: public Mock_socket
{
public:
    ZookeeperServer():
        serverDownSkipCount_(-1),sessionExpired(false),connectionLost(false)
    {
        connectReturns=-1;
        connectErrno=EWOULDBLOCK;        
    }
    virtual ~ZookeeperServer(){
        clearRecvQueue();
        clearRespQueue();
    }
    virtual int callClose(int fd){
        if(fd!=FD)
            return LIBC_SYMBOLS.close(fd);
        clearRecvQueue();
        clearRespQueue();
        return Mock_socket::callClose(fd);
    }
    // connection handling
    // what to do when the handshake request comes in?
    int serverDownSkipCount_;
    // this will cause getsockopt(zh->fd,SOL_SOCKET,SO_ERROR,&error,&len) return 
    // a failure after skipCount dropped to zero, thus simulating a server down 
    // condition
    // passing skipCount==-1 will make every connect attempt succeed
    void setServerDown(int skipCount=0){ 
        serverDownSkipCount_=skipCount;
        optvalSO_ERROR=0;            
    }
    virtual void setSO_ERROR(void *optval,socklen_t len){
        if(serverDownSkipCount_!=-1){
            if(serverDownSkipCount_==0)
                optvalSO_ERROR=ECONNREFUSED;
            else
                serverDownSkipCount_--;
        }
        Mock_socket::setSO_ERROR(optval,len);
    }

    // this is a trigger that gets reset back to false
    // a connect request will return a non-matching session id thus causing 
    // the client throw SESSION_EXPIRED
    volatile bool sessionExpired;
    void returnSessionExpired(){ sessionExpired=true; }
    
    // this is a one shot trigger that gets reset back to false
    // next recv call will return 0 length, thus simulating a connecton loss
    volatile bool connectionLost;
    void setConnectionLost() {connectionLost=true;}
    
    // recv
    // this queue is used for server responses: client's recv() system call 
    // returns next available message from this queue
    typedef std::pair<Response*,int> Element;
    typedef std::deque<Element> ResponseList;
    ResponseList recvQueue;
    mutable Mutex recvQMx;
    AtomicInt recvHasMore;
    ZookeeperServer& addRecvResponse(Response* resp, int errnum=0){
        synchronized(recvQMx);
        recvQueue.push_back(Element(resp,errnum));
        ++recvHasMore;
        return *this;
    }
    ZookeeperServer& addRecvResponse(int errnum){
        synchronized(recvQMx);
        recvQueue.push_back(Element(0,errnum));
        ++recvHasMore;
        return *this;
    }
    ZookeeperServer& addRecvResponse(const Element& e){
        synchronized(recvQMx);
        recvQueue.push_back(e);
        ++recvHasMore;
        return *this;
    }
    void clearRecvQueue(){
        synchronized(recvQMx);
        recvHasMore=0;
        for(unsigned i=0; i<recvQueue.size();i++)
            delete recvQueue[i].first;
        recvQueue.clear();
    }

    virtual ssize_t callRecv(int s,void *buf,size_t len,int flags);
    virtual bool hasMoreRecv() const;
    
    // the operation response queue holds zookeeper operation responses till the
    // operation request has been sent to the server. After that, the operation
    // response gets moved on to the recv queue (see above) ready to be read by 
    // the next recv() system call
    // send operation doesn't try to match request to the response
    ResponseList respQueue;
    mutable Mutex respQMx;
    ZookeeperServer& addOperationResponse(Response* resp, int errnum=0){
        synchronized(respQMx);
        respQueue.push_back(Element(resp,errnum));
        return *this;
    }
    void clearRespQueue(){
        synchronized(respQMx);
        for(unsigned i=0; i<respQueue.size();i++)
            delete respQueue[i].first;
        respQueue.clear();
    }
    AtomicInt closeSent;
    virtual void notifyBufferSent(const std::string& buffer);
    // simulates an arrival of a client request
    // a callback to be implemented by subclasses (no-op by default)
    virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia);
};

#endif /*ZKMOCKS_H_*/