File: TestAsyncMessagesPerformance.java

package info (click to toggle)
tomcat11 11.0.18-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 47,520 kB
  • sloc: java: 370,500; xml: 56,763; jsp: 4,787; sh: 1,304; perl: 324; makefile: 25; ansic: 14
file content (165 lines) | stat: -rw-r--r-- 7,077 bytes parent folder | download | duplicates (9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.websocket.server;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

import jakarta.websocket.ClientEndpointConfig;
import jakarta.websocket.ContainerProvider;
import jakarta.websocket.MessageHandler;
import jakarta.websocket.Session;
import jakarta.websocket.WebSocketContainer;

import org.junit.Assert;
import org.junit.Test;

import org.apache.catalina.Context;
import org.apache.catalina.servlets.DefaultServlet;
import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.startup.TomcatBaseTest;
import org.apache.tomcat.websocket.TesterAsyncTiming;
import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint;

/*
 * This test is very timing sensitive. Any failures need to be checked to see if the thresholds just need adjusting to
 * support a wider range of platforms and/or Java versions or if the failure is an indication of a performance drop in
 * the WebSocket implementation.
 */
public class TestAsyncMessagesPerformance extends TomcatBaseTest {

    @Test
    public void testAsyncTiming() throws Exception {

        Tomcat tomcat = getTomcatInstance();
        // No file system docBase required
        Context ctx = getProgrammaticRootContext();
        ctx.addApplicationListener(TesterAsyncTiming.Config.class.getName());
        Tomcat.addServlet(ctx, "default", new DefaultServlet());
        ctx.addServletMappingDecoded("/", "default");

        tomcat.start();

        WebSocketContainer wsContainer = ContainerProvider.getWebSocketContainer();
        ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
        Session wsSession = wsContainer.connectToServer(TesterProgrammaticEndpoint.class, clientEndpointConfig,
                new URI("ws://localhost:" + getPort() + TesterAsyncTiming.Config.PATH));

        AsyncTimingClientHandler handler = new AsyncTimingClientHandler();
        wsSession.addMessageHandler(ByteBuffer.class, handler);
        wsSession.getBasicRemote().sendText("Hello");

        System.out.println("Sent Hello message, waiting for data");
        handler.waitForLatch();
        Assert.assertFalse(handler.hasFailed());
    }

    private static class AsyncTimingClientHandler implements MessageHandler.Partial<ByteBuffer> {

        private long lastMessage = 0;
        private int sequence = 0;
        private int count = 0;
        private long seqZeroTimingFailureCount = 0;
        private long seqOneTimingFailureCount = 0;
        private long seqTwoTimingFailureCount = 0;

        private CountDownLatch latch = new CountDownLatch(1);
        private volatile boolean fail = false;

        @Override
        public void onMessage(ByteBuffer message, boolean last) {
            // Expected received data is:
            // 1 * 16k message in 2 * 8k chunks
            // 1 * 4k message in 1 * 4k chunk
            // 50 ms pause
            // loop
            if (lastMessage == 0) {
                // First message. Don't check
                sequence++;
                lastMessage = System.nanoTime();
            } else {
                long newTime = System.nanoTime();
                long diff = newTime - lastMessage;
                lastMessage = newTime;

                if (sequence == 0) {
                    sequence++;
                    if (message.capacity() != 8192) {
                        System.out.println(
                                "SEQ0: Expected size 8192 but was [" + message.capacity() + "], count [" + count + "]");
                        fail = true;
                    }
                    if (diff < 40000000) {
                        System.out.println("SEQ0: Expected diff > 40ms but was [" + diff + "], count [" + count + "]");
                        seqZeroTimingFailureCount++;
                    }
                } else if (sequence == 1) {
                    sequence++;
                    if (message.capacity() != 8192) {
                        System.out.println(
                                "SEQ1: Expected size 8192 but was [" + message.capacity() + "], count [" + count + "]");
                        fail = true;
                    }
                    // Gap between 2* 8k chunks of 16k message expected to be less than 0.5ms
                    if (diff > 500000) {
                        System.out.println("SEQ1: Expected diff < 500,000 but was [" + diff + "], count [" + count + "]");
                        seqOneTimingFailureCount++;
                    }
                } else if (sequence == 2) {
                    sequence = 0;
                    if (message.capacity() != 4096) {
                        System.out.println(
                                "SEQ2: Expected size 4096 but was [" + message.capacity() + "], count [" + count + "]");
                        fail = true;
                    }
                    // Gap between 16k message and 4k message expected to be less than 0.5ms
                    if (diff > 500000) {
                        System.out.println("SEQ2: Expected diff < 500,000 but was [" + diff + "], count [" + count + "]");
                        seqTwoTimingFailureCount++;
                    }
                }
            }

            count++;
            if (count >= TesterAsyncTiming.Config.ITERATIONS * 3) {
                latch.countDown();
            }
        }

        public void waitForLatch() throws InterruptedException {
            latch.await();
        }

        public boolean hasFailed() {
            // Total iterations are 1500
            if (!fail) {
                if (seqZeroTimingFailureCount > 1) {
                    // The 50ms pause after the short message may very rarely appear to be less than 40ms
                    fail = true;
                } else if (seqOneTimingFailureCount > 10) {
                    // The two chunks of the 16k message may rarely be more than 0.5ms apart
                    fail = true;
                } else if (seqTwoTimingFailureCount > 100) {
                    // The short message may often be more than 0.5ms after the long message
                    fail = true;
                }
            }
            return fail;
        }
    }
}