1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <pv/pvData.h>
#include <pv/pipelineServer.h>
using namespace epics::pvData;
using namespace epics::pvAccess;
static Structure::const_shared_pointer dataStructure =
getFieldCreate()->createFieldBuilder()->
add("count", pvInt)->
createStructure();
class PipelineSessionImpl :
public PipelineSession
{
public:
PipelineSessionImpl(
epics::pvData::PVStructure::shared_pointer const & pvRequest
) :
m_counter(0),
m_max(0)
{
PVStructure::shared_pointer pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVString::shared_pointer pvString = pvOptions->getSubField<PVString>("limit");
if (pvString)
{
// note: this throws an exception if conversion fails
m_max = pvString->getAs<int32>();
}
}
}
size_t getMinQueueSize() const {
return 16; //1024;
}
Structure::const_shared_pointer getStructure() const {
return dataStructure;
}
virtual void request(PipelineControl::shared_pointer const & control, size_t elementCount) {
// blocking in this call is not a good thing
// but generating a simple counter data is fast
// we will generate as much elements as we can
size_t count = control->getFreeElementCount();
for (size_t i = 0; i < count; i++) {
MonitorElement::shared_pointer element = control->getFreeElement();
element->pvStructurePtr->getSubField<PVInt>(1 /*"count"*/)->put(m_counter++);
control->putElement(element);
// we reached the limit, no more data
if (m_max != 0 && m_counter == m_max)
{
control->done();
break;
}
}
}
virtual void cancel() {
// noop, no need to clean any data-source resources
}
private:
// NOTE: all the request calls will be made from the same thread, so we do not need sync m_counter
int32 m_counter;
int32 m_max;
};
class PipelineServiceImpl :
public PipelineService
{
PipelineSession::shared_pointer createPipeline(
epics::pvData::PVStructure::shared_pointer const & pvRequest
)
{
return PipelineSession::shared_pointer(new PipelineSessionImpl(pvRequest));
}
};
int main()
{
PipelineServer server;
server.registerService("counterPipe", PipelineService::shared_pointer(new PipelineServiceImpl()));
// you can register as many services as you want here ...
server.printInfo();
server.run();
return 0;
}
|