1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
// $Id: test_reactors.cpp 91671 2010-09-08 18:39:23Z johnnyw $
// Perform a torture test of multiple ACE_Reactors and ACE_Tasks in
// the same process... Thanks to Detlef Becker for contributing this.
#include "ace/Reactor.h"
#include "ace/Service_Config.h"
#include "ace/Task.h"
#include "ace/Atomic_Op.h"
#if defined (ACE_HAS_THREADS)
#include "ace/Recursive_Thread_Mutex.h"
static const int NUM_INVOCATIONS = 10;
static const int MAX_TASKS = 20;
class Test_Task : public ACE_Task<ACE_MT_SYNCH>
{
public:
Test_Task (void);
~Test_Task (void);
//FUZZ: disable check_for_lack_ACE_OS
virtual int open (void *args = 0);
virtual int close (u_long flags = 0);
//FUZZ: enable check_for_lack_ACE_OS
virtual int svc (void);
virtual int handle_input (ACE_HANDLE handle);
virtual int handle_close (ACE_HANDLE fd,
ACE_Reactor_Mask close_mask);
private:
int handled_;
static int task_count_;
};
int Test_Task::task_count_ = 0;
static ACE_Atomic_Op<ACE_Thread_Mutex, int> done_count = MAX_TASKS * 2;
static ACE_Recursive_Thread_Mutex reclock_;
Test_Task::Test_Task (void)
: handled_ (0)
{
ACE_GUARD (ACE_Recursive_Thread_Mutex, ace_mon, reclock_);
Test_Task::task_count_++;
ACE_DEBUG ((LM_DEBUG,
"(%t) TT+ Test_Task::task_count_ = %d\n",
Test_Task::task_count_));
}
Test_Task::~Test_Task (void)
{
ACE_GUARD (ACE_Recursive_Thread_Mutex, ace_mon, reclock_);
ACE_DEBUG ((LM_DEBUG,
"(%t) TT- Test_Task::task_count_ = %d\n",
Test_Task::task_count_));
}
int
Test_Task::open (void *args)
{
this->reactor ((ACE_Reactor *) args);
return this->activate (THR_NEW_LWP);
}
int
Test_Task::close (u_long)
{
ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, reclock_, -1);
Test_Task::task_count_--;
ACE_DEBUG ((LM_DEBUG,
"(%t) close Test_Task::task_count_ = %d\n",
Test_Task::task_count_));
return 0;
}
int
Test_Task::svc (void)
{
for (int i = 0; i < NUM_INVOCATIONS; i++)
{
ACE_OS::thr_yield ();
// ACE_DEBUG ((LM_DEBUG, "(%t) calling notify %d\n", i));
if (this->reactor ()->notify (this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notify"), -1);
// ACE_DEBUG ((LM_DEBUG, "(%t) leaving notify %d\n", i));
}
return 0;
}
int
Test_Task::handle_close (ACE_HANDLE,
ACE_Reactor_Mask)
{
ACE_DEBUG ((LM_DEBUG, "(%t) handle_close\n"));
return 0;
}
int
Test_Task::handle_input (ACE_HANDLE)
{
ACE_DEBUG ((LM_DEBUG, "(%t) handle_input\n"));
this->handled_++;
if (this->handled_ == NUM_INVOCATIONS)
{
done_count--;
ACE_DEBUG ((LM_DEBUG,
"(%t) handle_input, handled_ = %d, done_count = %d\n",
this->handled_, done_count.value ()));
}
ACE_OS::thr_yield ();
return -1;
}
static void *
worker (void *args)
{
ACE_Reactor *reactor = (ACE_Reactor *) args;
reactor->owner (ACE_Thread::self ());
ACE_Time_Value timeout (4);
for (;;)
{
//ACE_DEBUG ((LM_DEBUG, "(%t) calling handle_events\n"));
switch (reactor->handle_events (timeout))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "reactor"), 0);
/* NOTREACHED */
case 0:
ACE_ERROR_RETURN ((LM_ERROR, "(%t) timeout\n"), 0);
/* NOTREACHED */
}
// ACE_DEBUG ((LM_DEBUG, "(%t) done with handle_events\n"));
}
ACE_NOTREACHED(return 0);
}
int
ACE_TMAIN (int, ACE_TCHAR *[])
{
ACE_Reactor *react1 = ACE_Reactor::instance ();
ACE_Reactor *react2 = new ACE_Reactor ();
Test_Task tt1[MAX_TASKS];
Test_Task tt2[MAX_TASKS];
for (int i = 0; i < MAX_TASKS; i++)
{
tt1[i].open (react1);
tt2[i].open (react2);
}
if (ACE_Thread_Manager::instance ()->spawn
(ACE_THR_FUNC (worker), (void *) react1, THR_NEW_LWP) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1);
else if (ACE_Thread_Manager::instance ()->spawn
(ACE_THR_FUNC (worker), (void *) react2, THR_NEW_LWP) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1);
ACE_Thread_Manager::instance ()->wait ();
ACE_DEBUG ((LM_DEBUG, "(%t) done\n"));
return 0;
}
#else
int
ACE_TMAIN (int, ACE_TCHAR *[])
{
ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
return 0;
}
#endif /* ACE_HAS_THREADS */
|