File: RpcDispatcherInterruptTest.java

package info (click to toggle)
libjgroups-java 2.12.2.Final-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,712 kB
  • sloc: java: 109,098; xml: 9,423; sh: 149; makefile: 2
file content (112 lines) | stat: -rw-r--r-- 3,306 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package org.jgroups.blocks;


import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Iterator;
import java.util.Map;

/**
 * Tests interruption of a blocked call with the timeout and a thread pool
 * @author Bela Ban
 */
@Test(groups=Global.STACK_DEPENDENT)
public class RpcDispatcherInterruptTest extends ChannelTestBase {
    private RpcDispatcher disp, disp2;
    private JChannel ch, ch2;

    @BeforeMethod
    void setUp() throws Exception {
        ch=createChannel(true);
        modifyStack(ch);
        ServerObject obj=new ServerObject();
        disp=new RpcDispatcher(ch, null, null, obj);
        ch.connect("RpcDispatcherInterruptTest");

        ch2=createChannel(ch);
        ServerObject obj2=new ServerObject();
        disp2=new RpcDispatcher(ch2, null, null, obj2);
        ch2.connect("RpcDispatcherInterruptTest");
    }

    @AfterMethod
    void tearDown() throws Exception {
        ch2.close();
        disp2.stop();
        ch.close();
        disp.stop();
    }


    public void testMethodCallWithTimeoutNoInterrupt() {
        long timeout, block_time;
        RspList rsps;

        timeout=0;
        block_time=0;
        rsps=call(timeout, block_time);
        checkResults(rsps, 2, true);

        timeout=0;
        block_time=1000L;
        rsps=call(timeout, block_time);
        checkResults(rsps, 2, true);

        timeout=1000;
        block_time=0L;
        rsps=call(timeout, block_time);
        checkResults(rsps, 2, true);

        timeout=1000;
        block_time=10000L;
        rsps=call(timeout, block_time);
        checkResults(rsps, 2, false);
    }


    private static void modifyStack(JChannel ch) {
        ProtocolStack stack=ch.getProtocolStack();
        GMS gms=(GMS)stack.findProtocol(GMS.class);
        if(gms != null)
            gms.setLogCollectMessages(false);
    }

    private RspList call(long timeout, long block_time) {
        long start, stop, diff;
        System.out.println("calling with timeout=" + timeout + ", block_time=" + block_time);
        start=System.currentTimeMillis();
        RspList retval=disp.callRemoteMethods(null, "foo", new Object[]{block_time}, new Class[]{long.class}, GroupRequest.GET_ALL, timeout);
        stop=System.currentTimeMillis();
        diff=stop-start;
        System.out.println("rsps (in " + diff + "ms:)\n" + retval);
        return retval;
    }

    private static void checkResults(RspList rsps, int num, boolean received) {
        assertEquals("responses: " + rsps, num, rsps.size());        
        for(Iterator<Map.Entry<Address,Rsp>> it=rsps.entrySet().iterator(); it.hasNext();) {
            Map.Entry<Address,Rsp> entry=it.next();
            Rsp rsp=entry.getValue();
            assertEquals("rsp: " + rsp, rsp.wasReceived(), received);
        }
    }


    static class ServerObject {

        public static void foo(long timeout) {
            Util.sleep(timeout);
        }
    }
}