File: STATE_TRANSFER_Test.java

package info (click to toggle)
libjgroups-java 2.12.2.Final-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,712 kB
  • sloc: java: 109,098; xml: 9,423; sh: 149; makefile: 2
file content (174 lines) | stat: -rw-r--r-- 5,800 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package org.jgroups.protocols;


import static java.util.concurrent.TimeUnit.SECONDS;

import org.jgroups.*;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.Test;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterMethod;

/**
 * It's an attemp to setup Junit test case template for Protocol regression. <p>
 * Two "processes" are started, and the coord. keeps sending msg of a counter. The 2nd
 * process joins the grp and get the state from the coordinator. The subsequent msgs
 * after the setState will be validated to ensure the total ordering of msg delivery. <p>
 * This should cover the fix introduced by rev. 1.12
 * @author Wenbo Zhu
 */
@Test(groups={Global.STACK_DEPENDENT,"known-failures"})
public class STATE_TRANSFER_Test extends ChannelTestBase {
    public static final String GROUP_NAME="STATE_TRANSFER_Test";
    private Coordinator coord;



    @BeforeMethod
    protected void setUp() throws Exception {      
        coord=new Coordinator();
        coord.recvLoop();
        coord.sendLoop();
    }

    @AfterMethod
    protected void tearDown() throws Exception {
        coord.stop();
        coord=null;
    }

    class Coordinator extends ChannelListenerAdapter {

        private JChannel channel=null;
        private int cnt=0;  // the state
        private volatile boolean closed=false;
        String getProps() {
            return channel.getProperties();
        }

        public JChannel getChannel() {
            return channel;
        }

        protected Coordinator() throws Exception {
            channel=createChannel(true);
            channel.setOpt(Channel.LOCAL, Boolean.FALSE);
            channel.addChannelListener(this);
            channel.connect(GROUP_NAME);
        }


        public void recvLoop() throws Exception {
            Thread task=new Thread(new Runnable() {
                public void run() {
                    Object tmp;
                    while(!closed) {
                        try {
                            tmp=channel.receive(0);
                            if(tmp instanceof GetStateEvent) {
                                synchronized(Coordinator.this) {
                                    // System.err.println("--  GetStateEvent, cnt=" + cnt);
                                    channel.returnState(Util.objectToByteBuffer(new Integer(cnt)));
                                }
                            }
                        }
                        catch(ChannelNotConnectedException not) {
                            break;
                        }
                        catch(ChannelClosedException closed) {
                            break;
                        }
                        catch(Exception e) {
                            System.err.println(e);
                        }
                    }
                }
            });
            task.start();
        }

        public void sendLoop() throws Exception {
            Thread task=new Thread(new Runnable() {

                public void run() {
                    while(!closed) {
                        try {
                            synchronized(Coordinator.this) {
                                channel.send(null, null, new Integer(++cnt));
                                // System.err.println("send cnt=" + cnt);
                            }
                            Thread.sleep(1000);
                        }
                        catch(ChannelNotConnectedException not) {
                            break;
                        }
                        catch(ChannelClosedException closed) {
                            break;
                        }
                        catch(Exception e) {
                            System.err.println(e);
                        }
                    }
                }
            });
            task.start();
        }

        public void stop() {
            closed=true;
            channel.close();
        }
    }

    public void testBasicStateSync() throws Exception {
        Channel channel= null;
        int timeout=60; //seconds
        int counter=0;
        try {
            channel = createChannel(coord.getChannel());
            channel.setOpt(Channel.LOCAL, Boolean.FALSE);

            channel.connect(GROUP_NAME);

            Thread.sleep(1000);

            boolean join=false;
            join=channel.getState(null, 100000l);
            assertTrue(join);

            Object tmp;
            int cnt=-1;
            for(;counter < timeout;SECONDS.sleep(1),counter++) {
                try {
                    tmp=channel.receive(0);
                    if(tmp instanceof SetStateEvent) {
                        cnt=((Integer)Util.objectFromByteBuffer(((SetStateEvent)tmp).getArg())).intValue();
                        // System.err.println("--  SetStateEvent, cnt=" + cnt);
                        continue;
                    }
                    if(tmp instanceof Message) {
                        if(cnt != -1) {
                            int msg=((Integer)((Message)tmp).getObject()).intValue();
                            assertEquals(cnt, msg - 1);
                            break; // done
                        }
                    }
                }
                catch(ChannelNotConnectedException not) {
                    break;
                }
                catch(ChannelClosedException closed) {
                    break;
                }
                catch(Exception e) {
                    System.err.println(e);
                }
            }
        }
        finally {
            channel.close();
            assertTrue("Timeout reached", counter < timeout);
        }
    }
}