1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//
#include <Ice/Ice.h>
#include <IceUtil/Options.h>
#include <IceUtil/IceUtil.h>
#include <IceStorm/IceStorm.h>
#include <TestHelper.h>
#include <Single.h>
using namespace std;
using namespace Ice;
using namespace IceStorm;
using namespace Test;
class Publisher : public Test::TestHelper
{
public:
void run(int, char**);
};
void
Publisher::run(int argc, char** argv)
{
Ice::CommunicatorHolder communicator = initialize(argc, argv);
IceUtilInternal::Options opts;
opts.addOpt("", "cycle");
try
{
opts.parse(argc, (const char**)argv);
}
catch(const IceUtilInternal::BadOptException& e)
{
ostringstream os;
os << argv[0] << ": " << e.reason;
throw invalid_argument(os.str());
}
PropertiesPtr properties = communicator->getProperties();
string managerProxy = properties->getProperty("IceStormAdmin.TopicManager.Default");
if(managerProxy.empty())
{
ostringstream os;
os << argv[0] << ": property `IceStormAdmin.TopicManager.Default' is not set";
throw invalid_argument(os.str());
}
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
communicator->stringToProxy(managerProxy));
if(!manager)
{
ostringstream os;
os << argv[0] << ": `" << managerProxy << "' is not running";
throw invalid_argument(os.str());
}
TopicPrx topic;
while(true)
{
try
{
topic = manager->retrieve("single");
break;
}
// This can happen if the replica group loses the majority
// during retrieve. In this case we retry.
catch(const Ice::UnknownException&)
{
continue;
}
catch(const IceStorm::NoSuchTopic& e)
{
ostringstream os;
os << argv[0] << ": NoSuchTopic: " << e.name;
throw invalid_argument(os.str());
}
}
assert(topic);
//
// Get a publisher object, create a twoway proxy and then cast to
// a Single object.
//
if(opts.isSet("cycle"))
{
Ice::ObjectPrx prx = topic->getPublisher()->ice_twoway();
vector<SinglePrx> single;
Ice::EndpointSeq endpoints = prx->ice_getEndpoints();
for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
{
if((*p)->toString().substr(0, 3) != "udp")
{
Ice::EndpointSeq e;
e.push_back(*p);
single.push_back(SinglePrx::uncheckedCast(prx->ice_endpoints(e)));
}
}
if(single.size() <= 1)
{
ostringstream os;
os << argv[0] << ": Not enough endpoints in publisher proxy";
throw invalid_argument(os.str());
}
size_t which = 0;
for(size_t i = 0; i < 1000; ++i)
{
single[which]->event(static_cast<Ice::Int>(i));
which = (which + 1) % single.size();
}
}
else
{
SinglePrx single = SinglePrx::uncheckedCast(topic->getPublisher()->ice_twoway());
for(int i = 0; i < 1000; ++i)
{
single->event(i);
}
}
}
DEFINE_TEST(Publisher)
|