1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//
#include <Ice/Ice.h>
#include <IceUtil/IceUtil.h>
#include <IceStorm/IceStorm.h>
#include <Single.h>
#include <Controller.h>
#include <TestHelper.h>
using namespace std;
using namespace Ice;
using namespace IceStorm;
using namespace Test;
class ControllerI : public Controller
{
public:
virtual void stop(const Ice::Current& c)
{
c.adapter->getCommunicator()->shutdown();
}
};
class PublishThread : public IceUtil::Thread, public IceUtil::Mutex
{
public:
PublishThread(const SinglePrx& single) :
_single(single),
_published(0),
_destroy(false)
{
}
virtual void run()
{
while(true)
{
{
Lock sync(*this);
if(_destroy)
{
cout << _published << endl;
break;
}
}
try
{
_single->event(_published);
IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(1));
}
catch(const Ice::UnknownException&)
{
// This is expected if we publish to a replica that is
// going down.
continue;
}
++_published;
}
}
void destroy()
{
Lock sync(*this);
_destroy = true;
}
private:
const SinglePrx _single;
int _published;
bool _destroy;
};
typedef IceUtil::Handle<PublishThread> PublishThreadPtr;
class Publisher : public Test::TestHelper
{
public:
void run(int, char**);
};
void
Publisher::run(int argc, char** argv)
{
Ice::CommunicatorHolder communicator = initialize(argc, argv);
PropertiesPtr properties = communicator->getProperties();
string managerProxy = properties->getProperty("IceStormAdmin.TopicManager.Default");
if(managerProxy.empty())
{
ostringstream os;
os << argv[0] << ": property `IceStormAdmin.TopicManager.Default' is not set";
throw invalid_argument(os.str());
}
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
communicator->stringToProxy(managerProxy));
if(!manager)
{
ostringstream os;
os << argv[0] << ": `" << managerProxy << "' is not running";
throw invalid_argument(os.str());
}
TopicPrx topic = manager->retrieve("single");
assert(topic);
//
// Get a publisher object, create a twoway proxy, disable
// connection caching and then cast to a Single object.
//
SinglePrx single = SinglePrx::uncheckedCast(topic->getPublisher()->ice_twoway()->ice_connectionCached(false));
ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "tcp");
Ice::ObjectPrx controller = adapter->addWithUUID(new ControllerI);
adapter->activate();
cout << communicator->proxyToString(controller) << endl;
PublishThreadPtr t = new PublishThread(single);
t->start();
communicator->waitForShutdown();
t->destroy();
t->getThreadControl().join();
}
DEFINE_TEST(Publisher)
|