1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
#include <pdx/channel_handle.h>
#include <private/dvr/consumer_queue_channel.h>
#include <private/dvr/producer_channel.h>
using android::pdx::ErrorStatus;
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
using android::pdx::rpc::RemoteMethodError;
namespace android {
namespace dvr {
ConsumerQueueChannel::ConsumerQueueChannel(
BufferHubService* service, int buffer_id, int channel_id,
const std::shared_ptr<Channel>& producer, bool silent)
: BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
producer_(producer),
capacity_(0),
silent_(silent) {
GetProducer()->AddConsumer(this);
}
ConsumerQueueChannel::~ConsumerQueueChannel() {
ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
channel_id());
if (auto producer = GetProducer()) {
producer->RemoveConsumer(this);
}
}
bool ConsumerQueueChannel::HandleMessage(Message& message) {
ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
auto producer = GetProducer();
if (!producer) {
RemoteMethodError(message, EPIPE);
return true;
}
switch (message.GetOp()) {
case BufferHubRPC::CreateConsumerQueue::Opcode:
DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
*producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
return true;
case BufferHubRPC::GetQueueInfo::Opcode:
DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
*producer, &ProducerQueueChannel::OnGetQueueInfo, message);
return true;
case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
*this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
return true;
default:
return false;
}
}
std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
const {
return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
}
void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
}
BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
BufferHubChannel::BufferInfo info;
if (auto producer = GetProducer()) {
// If producer has not hung up, copy most buffer info from the producer.
info = producer->GetBufferInfo();
}
info.id = buffer_id();
info.capacity = capacity_;
return info;
}
void ConsumerQueueChannel::RegisterNewBuffer(
const std::shared_ptr<ProducerChannel>& producer_channel,
size_t producer_slot) {
ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
__FUNCTION__, buffer_id(), producer_channel->buffer_id(),
producer_slot, silent_);
// Only register buffers if the queue is not silent.
if (silent_) {
return;
}
auto status = producer_channel->CreateConsumerStateMask();
if (!status.ok()) {
ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
return;
}
uint64_t consumer_state_mask = status.get();
pending_buffer_slots_.emplace(producer_channel, producer_slot,
consumer_state_mask);
// Signal the client that there is new buffer available.
SignalAvailable();
}
Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
ATRACE_NAME(__FUNCTION__);
ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
pending_buffer_slots_.size());
// Indicate this is a silent queue that will not import buffers.
if (silent_)
return ErrorStatus(EBADR);
while (!pending_buffer_slots_.empty()) {
auto producer_channel =
pending_buffer_slots_.front().producer_channel.lock();
size_t producer_slot = pending_buffer_slots_.front().producer_slot;
uint64_t consumer_state_mask =
pending_buffer_slots_.front().consumer_state_mask;
pending_buffer_slots_.pop();
// It's possible that the producer channel has expired. When this occurs,
// ignore the producer channel.
if (producer_channel == nullptr) {
ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
continue;
}
auto status =
producer_channel->CreateConsumer(message, consumer_state_mask);
// If no buffers are imported successfully, clear available and return an
// error. Otherwise, return all consumer handles already imported
// successfully, but keep available bits on, so that the client can retry
// importing remaining consumer buffers.
if (!status) {
ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
if (buffer_handles.empty()) {
ClearAvailable();
return status.error_status();
} else {
return {std::move(buffer_handles)};
}
}
buffer_handles.emplace_back(status.take(), producer_slot);
}
ClearAvailable();
return {std::move(buffer_handles)};
}
void ConsumerQueueChannel::OnProducerClosed() {
ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
buffer_id());
producer_.reset();
Hangup();
}
} // namespace dvr
} // namespace android
|