1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
//=============================================================================
/**
* @file Task_Ex_Test.cpp
*
* $Id: Task_Ex_Test.cpp 93638 2011-03-24 13:16:05Z johnnyw $
*
* This test program illustrates the ACE_Task_Ex class which has the ACE_Message_Queue_Ex
* that has the capability to hold user-defined messages instead of ACE_Message_Block
*
*
*
* @author Kobi Cohen-Arazi <kobi-co@barak-online.net>
*/
//=============================================================================
#include "test_config.h"
#include "Task_Ex_Test.h"
#include "ace/Task_Ex_T.h"
#include "ace/Log_Msg.h"
#include "ace/Auto_Ptr.h"
#if defined (ACE_HAS_THREADS)
/// default params
#if defined (ACE_VXWORKS)
// this is a very expensive test on VxWorks so limit it otherwise it will never finish in time:-)
const ACE_INT32 PRODUCER_THREADS_NO=10;
const ACE_INT32 CONSUMER_THREADS_NO=10;
const ACE_INT32 NUMBER_OF_MSGS=200;
#else
const ACE_INT32 PRODUCER_THREADS_NO=20;
const ACE_INT32 CONSUMER_THREADS_NO=20;
const ACE_INT32 NUMBER_OF_MSGS=2000;
#endif
/// @class Consumer consumes user defined Msgs
class Consumer : public ACE_Task_Ex<ACE_MT_SYNCH, User_Defined_Msg>
{
public:
//FUZZ: disable check_for_lack_ACE_OS
/// activate/spawn the threads.
///FUZZ: enable check_for_lack_ACE_OS
int open (void*);
/// svc thread entry point
virtual int svc (void);
private:
};
int Consumer::open (void*)
{
if(this->activate (THR_NEW_LWP | THR_JOINABLE,
CONSUMER_THREADS_NO)==-1)
{
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT("Consumer::open Error spanwing thread %p\n"),
"err="),
-1);
}
return 0;
}
int Consumer::svc ()
{
User_Defined_Msg* pMsg=0;
while(this->getq (pMsg)!=-1)
{
ACE_TEST_ASSERT (pMsg!=0);
auto_ptr<User_Defined_Msg> pAuto(pMsg);
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("Consumer::svc got msg id=%d\n"),
pMsg->msg_id ()));
if(pMsg->msg_id ()==NUMBER_OF_MSGS-1)
break;
}
ACE_DEBUG((LM_INFO,
ACE_TEXT("Consumer::svc ended thread %t\n")));
return 0;
}
/// producer function produces user defined messages.
ACE_THR_FUNC_RETURN producer (void *arg)
{
Consumer* c = static_cast<Consumer*> (arg);
ACE_TEST_ASSERT(c!=0);
if (c==0)
{
ACE_ERROR((LM_ERROR,
ACE_TEXT("producer Error casting to consumer\n")));
return (ACE_THR_FUNC_RETURN)-1;
}
for (int i=0;i!=NUMBER_OF_MSGS;++i)
{
User_Defined_Msg* pMsg=0;
ACE_NEW_NORETURN(pMsg, User_Defined_Msg(i));
if (pMsg==0)
{
ACE_ERROR((LM_ERROR,
ACE_TEXT("producer Error allocating data %p\n"),
"err="));
return (ACE_THR_FUNC_RETURN)-1;
}
if(c->putq (pMsg)==-1)
{
ACE_ERROR((LM_ERROR,
ACE_TEXT("producer Error putq data %p\n"),
"err="));
return (ACE_THR_FUNC_RETURN)-1;
}
}
return 0;
}
#endif /* ACE_HAS_THREADS */
int
run_main (int, ACE_TCHAR *[])
{
ACE_START_TEST (ACE_TEXT ("Task_Ex_Test"));
#if defined (ACE_HAS_THREADS)
Consumer c;
if(c.open (0)==-1)
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT ("main Error opening consumer\n")),-1);
int result=ACE_Thread_Manager::instance()->spawn_n (PRODUCER_THREADS_NO,
ACE_THR_FUNC(producer),
static_cast<void*> (&c));
if (result==-1)
{
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT ("main Error spawning threads %p\n"),
"err="),-1);
}
// wait all threads
int wait_result=ACE_Thread_Manager::instance()->wait();
if (wait_result==-1)
{
ACE_ERROR((LM_ERROR,
ACE_TEXT("main Error Thread_Manager->wait %p\n"),
"err="));
return -1;
}
#else
ACE_ERROR ((LM_INFO,
ACE_TEXT ("threads not supported on this platform\n")));
#endif /* ACE_HAS_THREADS */
ACE_END_TEST;
return 0;
}
|