File: consumer_queue_channel.cpp

package info (click to toggle)
android-platform-tools 34.0.5-12
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 150,900 kB
  • sloc: cpp: 805,786; java: 293,500; ansic: 128,288; xml: 127,491; python: 41,481; sh: 14,245; javascript: 9,665; cs: 3,846; asm: 2,049; makefile: 1,917; yacc: 440; awk: 368; ruby: 183; sql: 140; perl: 88; lex: 67
file content (166 lines) | stat: -rw-r--r-- 5,421 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#include <pdx/channel_handle.h>
#include <private/dvr/consumer_queue_channel.h>
#include <private/dvr/producer_channel.h>

using android::pdx::ErrorStatus;
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
using android::pdx::rpc::RemoteMethodError;

namespace android {
namespace dvr {

ConsumerQueueChannel::ConsumerQueueChannel(
    BufferHubService* service, int buffer_id, int channel_id,
    const std::shared_ptr<Channel>& producer, bool silent)
    : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
      producer_(producer),
      capacity_(0),
      silent_(silent) {
  GetProducer()->AddConsumer(this);
}

ConsumerQueueChannel::~ConsumerQueueChannel() {
  ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
           channel_id());

  if (auto producer = GetProducer()) {
    producer->RemoveConsumer(this);
  }
}

bool ConsumerQueueChannel::HandleMessage(Message& message) {
  ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
  auto producer = GetProducer();
  if (!producer) {
    RemoteMethodError(message, EPIPE);
    return true;
  }

  switch (message.GetOp()) {
    case BufferHubRPC::CreateConsumerQueue::Opcode:
      DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
          *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
      return true;

    case BufferHubRPC::GetQueueInfo::Opcode:
      DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
          *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
      return true;

    case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
      DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
          *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
      return true;

    default:
      return false;
  }
}

std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
    const {
  return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
}

void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
  ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
}

BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
  BufferHubChannel::BufferInfo info;
  if (auto producer = GetProducer()) {
    // If producer has not hung up, copy most buffer info from the producer.
    info = producer->GetBufferInfo();
  }
  info.id = buffer_id();
  info.capacity = capacity_;
  return info;
}

void ConsumerQueueChannel::RegisterNewBuffer(
    const std::shared_ptr<ProducerChannel>& producer_channel,
    size_t producer_slot) {
  ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
           __FUNCTION__, buffer_id(), producer_channel->buffer_id(),
           producer_slot, silent_);
  // Only register buffers if the queue is not silent.
  if (silent_) {
    return;
  }

  auto status = producer_channel->CreateConsumerStateMask();
  if (!status.ok()) {
    ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
          status.GetErrorMessage().c_str());
    return;
  }
  uint64_t consumer_state_mask = status.get();

  pending_buffer_slots_.emplace(producer_channel, producer_slot,
                                consumer_state_mask);
  // Signal the client that there is new buffer available.
  SignalAvailable();
}

Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
  std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
  ATRACE_NAME(__FUNCTION__);
  ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
           pending_buffer_slots_.size());

  // Indicate this is a silent queue that will not import buffers.
  if (silent_)
    return ErrorStatus(EBADR);

  while (!pending_buffer_slots_.empty()) {
    auto producer_channel =
        pending_buffer_slots_.front().producer_channel.lock();
    size_t producer_slot = pending_buffer_slots_.front().producer_slot;
    uint64_t consumer_state_mask =
        pending_buffer_slots_.front().consumer_state_mask;
    pending_buffer_slots_.pop();

    // It's possible that the producer channel has expired. When this occurs,
    // ignore the producer channel.
    if (producer_channel == nullptr) {
      ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
      continue;
    }

    auto status =
        producer_channel->CreateConsumer(message, consumer_state_mask);

    // If no buffers are imported successfully, clear available and return an
    // error. Otherwise, return all consumer handles already imported
    // successfully, but keep available bits on, so that the client can retry
    // importing remaining consumer buffers.
    if (!status) {
      ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
            status.GetErrorMessage().c_str());
      if (buffer_handles.empty()) {
        ClearAvailable();
        return status.error_status();
      } else {
        return {std::move(buffer_handles)};
      }
    }

    buffer_handles.emplace_back(status.take(), producer_slot);
  }

  ClearAvailable();
  return {std::move(buffer_handles)};
}

void ConsumerQueueChannel::OnProducerClosed() {
  ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
           buffer_id());
  producer_.reset();
  Hangup();
}

}  // namespace dvr
}  // namespace android