1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
// $Id: Consumer_Router.cpp 91671 2010-09-08 18:39:23Z johnnyw $
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_string.h"
#include "ace/Truncate.h"
#include "Consumer_Router.h"
#include "Options.h"
#if defined (ACE_HAS_THREADS)
typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
int
Consumer_Handler::open (void *a)
{
CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
this->router_task_ = af->router ();
return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
}
Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
: Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
{
}
// Create a new handler that will interact with a consumer and point
// its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
: CONSUMER_ROUTER (tm)
{
}
// Initialize the Router..
int
Consumer_Router::open (void *)
{
ACE_ASSERT (this->is_reader ());
ACE_TCHAR *argv[3];
argv[0] = (ACE_TCHAR *) this->name ();
argv[1] = (ACE_TCHAR *) options.consumer_file ();
argv[2] = 0;
if (this->init (1, &argv[1]) == -1)
return -1;
// Make this an active object.
// return this->activate (options.t_flags ());
// Until that's done, return 1 to indicate that the object wasn't activated.
return 1;
}
int
Consumer_Router::close (u_long)
{
ACE_ASSERT (this->is_reader ());
this->peer_map_.close ();
this->msg_queue ()->deactivate();
return 0;
}
// Handle incoming messages in a separate thread..
int
Consumer_Router::svc (void)
{
ACE_Message_Block *mb = 0;
ACE_ASSERT (this->is_reader ());
if (options.debug ())
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting svc in %s\n"),
this->name ()));
while (this->getq (mb) > 0)
if (this->put_next (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) put_next failed in %s\n"),
this->name ()), -1);
return 0;
// Note the implicit ACE_OS::thr_exit() via destructor.
}
// Send a MESSAGE_BLOCK to the supplier(s)..
int
Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
ACE_ASSERT (this->is_reader ());
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
this->control (mb);
return this->put_next (mb);
}
else
{
//printf("consumer-Router is routing : send_peers\n");
return this->send_peers (mb);
}
}
// Return information about the Client_Router ACE_Module..
int
Consumer_Router::info (ACE_TCHAR **strp, size_t length) const
{
ACE_TCHAR buf[BUFSIZ];
ACE_UPIPE_Addr addr;
const ACE_TCHAR *module_name = this->name ();
ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_;
if (sa.get_local_addr (addr) == -1)
return -1;
#if !defined (ACE_WIN32) && defined (ACE_USES_WCHAR)
# define FMTSTR ACE_TEXT ("%ls\t %ls/ %ls")
#else
# define FMTSTR ACE_TEXT ("%s\t %s/ %s")
#endif
ACE_OS::sprintf (buf, FMTSTR,
module_name, ACE_TEXT ("upipe"),
ACE_TEXT ("# consumer router\n"));
if (*strp == 0 && (*strp = ACE_OS::strdup (module_name)) == 0)
return -1;
else
ACE_OS::strncpy (*strp, module_name, length);
return ACE_Utils::truncate_cast<int> (ACE_OS::strlen (module_name));
}
#endif /* ACE_HAS_THREADS */
|