1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
//=============================================================================
/**
* @file Future_Stress_Test.cpp
*
* This example tests the ACE Future set() and get() operations in
* multithreaded environment and concurrent access.
*
* Usage: Future_Stress_Test [-t <duration in seconds>]
* [-n <number of threads>]
*
* @see https://github.com/DOCGroup/ACE_TAO/issues/2163
*
* @author Andres Kruse <Frank.Hilliger@cs-sol.de>
*/
//=============================================================================
#include "test_config.h"
#include <ace/Time_Value.h>
#include <ace/Countdown_Time.h>
#include <ace/Future.h>
#include <ace/Get_Opt.h>
#include <random>
#if defined (ACE_HAS_THREADS)
struct Worker_Config
{
int a;
int b;
int c;
ACE_Future<int> result;
};
void* worker (void* args)
{
Worker_Config* config = static_cast<Worker_Config*>(args);
int r = config->a + config->b * config->c;
config->result.set(r);
return 0;
}
void* runner (void* args)
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) runner start\n")));
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1,1000000);
ACE_Time_Value* duration = static_cast<ACE_Time_Value*>(args);
ACE_Countdown_Time timer(duration);
timer.start();
uint64_t runNum = 0;
do
{
if( ++runNum % 5000 == 0 )
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) runner iteration %u\n"), runNum));
}
ACE_Future<int> result;
Worker_Config config;
config.a = dis(gen);
config.b = dis(gen);
config.c = dis(gen);
config.result = result;
ACE_hthread_t thread_id;
int expected_res = config.a+config.b*config.c;
int actual_res = -1;
if (ACE_Thread::spawn((ACE_THR_FUNC)worker,
static_cast<void*>(&config), THR_NEW_LWP | THR_JOINABLE, 0,
&thread_id) == -1)
{
ACE_ERROR ((LM_INFO,
ACE_TEXT ("worker thread spawn failed\n")));
}
result.get(actual_res);
if( actual_res != expected_res )
{
// hit the bug...
ACE_ERROR ((LM_INFO,
ACE_TEXT ("unexpected ACE_Future result\n")));
abort();
}
ACE_THR_FUNC_RETURN status;
ACE_Thread::join(thread_id, &status);
timer.update();
} while( *duration != ACE_Time_Value::zero );
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) runner done\n"), runNum));
return 0;
}
int
run_main (int argc, ACE_TCHAR *argv[])
{
ACE_START_TEST (ACE_TEXT ("Future_Stress_Test"));
ACE_Time_Value duration(5);
long n_threads = 5;
ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("t:n:"));
bool valid = true;
int c;
while ((c = getopt ()) != -1 && valid)
{
//FUZZ: enable check_for_lack_ACE_OS
switch (c)
{
case 't':
duration.set(ACE_OS::atoi (getopt.opt_arg ()));
break;
case 'n':
n_threads = ACE_OS::atoi (getopt.opt_arg ());
break;
default:
ACE_ERROR ((LM_ERROR,
"Usage: Future_Stress_Test [-t <duration in seconds>]"
"\t[-n <number of threads>]\n"));
valid = false;
break;
}
}
if (valid)
{
ACE_Thread_Manager::instance ()->spawn_n (n_threads,
ACE_THR_FUNC (runner),
static_cast<void*>(&duration),
THR_NEW_LWP | THR_DETACHED);
ACE_Thread_Manager::instance ()->wait ();
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) All threads finished, cleanup and exit\n")));
}
ACE_END_TEST;
return 0;
}
#else
int
run_main (int, ACE_TCHAR *[])
{
ACE_ERROR ((LM_INFO,
ACE_TEXT ("threads not supported on this platform\n")));
}
#endif /* ACE_HAS_THREADS */
|