1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
// $Id: Default_Dispatcher_Impl.cpp 91670 2010-09-08 18:02:26Z johnnyw $
#include "Default_Dispatcher_Impl.h"
#include "ace/Sched_Params.h"
#if ! defined (__ACE_INLINE__)
#include "Default_Dispatcher_Impl.inl"
#endif /* __ACE_INLINE__ */
namespace Kokyu
{
Default_Dispatcher_Impl::Default_Dispatcher_Impl ()
: activated_ (0)
{
}
int
Default_Dispatcher_Impl::init_i (const Dispatcher_Attributes& attrs)
{
//create and init the dispatcher tasks here
ACE_DEBUG ((LM_DEBUG, "entering init_t\n" ));
int size = ACE_Utils::truncate_cast<int> (attrs.config_info_set_.size ());
if (size == 0)
{
return -1;
}
this->ntasks_ = size;
Dispatcher_Task_Auto_Ptr * tasks_array=0;
ACE_NEW_RETURN (tasks_array, Dispatcher_Task_Auto_Ptr[ntasks_], -1);
//ACE_DEBUG ((LM_DEBUG, "after new on task array\n" ));
tasks_.reset(tasks_array);
//ACE_DEBUG ((LM_DEBUG, "task array auto_ptr set\n" ));
ConfigInfoSet& config_set =
const_cast<ConfigInfoSet&> (attrs.config_info_set_);
ConfigInfoSet::ITERATOR iter(config_set);
int i=0;
ConfigInfo* config;
for (;i<size && iter.next (config);iter.advance ())
{
//ACE_DEBUG ((LM_DEBUG, "iter = %d\n", i));
Dispatcher_Task* task=0;
ACE_NEW_RETURN (task,
Dispatcher_Task (*config,
ACE_Thread_Manager::instance()),
-1);
auto_ptr<Dispatcher_Task> tmp_task_auto_ptr (task);
tasks_[i++] = tmp_task_auto_ptr;
//I couldn't use reset because MSVC6 auto_ptr does not have reset method.
//So in configurations where the auto_ptr maps to the std::auto_ptr instead
//of ACE auto_ptr, this would be a problem.
//tasks_[i++].reset (task);
}
this->thr_creation_flags_ = attrs.thread_creation_flags ();
if (attrs.immediate_activation_ && !this->activated_)
{
this->activate_i ();
}
curr_config_info_ = attrs.config_info_set_;
return 0;
}
int
Default_Dispatcher_Impl::activate_i ()
{
int i;
if (this->activated_)
return 0;
for(i=0; i<ntasks_; ++i)
{
Priority_t priority =
tasks_[i]->get_curr_config_info ().thread_priority_;
if (this->tasks_[i]->activate (this->thr_creation_flags_,
1, 1, priority) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("EC (%P|%t) cannot activate queue.")
ACE_TEXT ("Need superuser privilege to run in RT class\n")),
-1);
}
}
this->activated_ = 1;
return 0;
}
Dispatcher_Task*
Default_Dispatcher_Impl::find_task_with_preemption_prio (Priority_t prio)
{
int i;
if (prio >=0)
{
for( i=0; i<ntasks_; ++i)
{
if ( tasks_[i]->preemption_priority () == prio)
return tasks_[i].get();
}
}
return 0;
}
int
Default_Dispatcher_Impl::dispatch_i (const Dispatch_Command* cmd,
const QoSDescriptor& qos_info)
{
//delegate to the appropriate task
if (qos_info.preemption_priority_ < 0)
return -1;
Dispatcher_Task* task =
find_task_with_preemption_prio (qos_info.preemption_priority_);
//@@VS - We should insert this into the lowest prio queue.
//How do we know that the last queue is the lowest prio queue.
if (task == 0)
task = tasks_[ntasks_-1].get ();
return task->enqueue (cmd, qos_info);
}
int
Default_Dispatcher_Impl::shutdown_i ()
{
//This needs to be revisited based on mode transition and
//consistent cut through the queues
//post shutdown command to all tasks
int i;
for(i=0; i<ntasks_; ++i)
{
QoSDescriptor qos_info;
Shutdown_Task_Command* shutdown_cmd = 0;
ACE_NEW_RETURN (shutdown_cmd, Shutdown_Task_Command, -1);
tasks_[i]->enqueue (shutdown_cmd, qos_info);
}
//wait for all tasks to exit
for (i=0; i<ntasks_; ++i)
{
tasks_[i]->wait ();
}
return 0;
}
int
Shutdown_Task_Command::execute ()
{
return -1;
}
}
|