1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
|
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_CHANNEL_H_
#define MOJO_EDK_SYSTEM_CHANNEL_H_
#include <stdint.h>
#include "base/containers/hash_tables.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/strings/string_piece.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_checker.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/system_impl_export.h"
#include "mojo/public/c/system/types.h"
namespace mojo {
namespace embedder {
class PlatformSupport;
}
namespace system {
class ChannelEndpointClient;
class ChannelManager;
class MessageInTransitQueue;
// This class is mostly thread-safe. It must be created on an I/O thread.
// |Init()| must be called on that same thread before it becomes thread-safe (in
// particular, before references are given to any other thread) and |Shutdown()|
// must be called on that same thread before destruction. Its public methods are
// otherwise thread-safe. (Many private methods are restricted to the creation
// thread.) It may be destroyed on any thread, in the sense that the last
// reference to it may be released on any thread, with the proviso that
// |Shutdown()| must have been called first (so the pattern is that a "main"
// reference is kept on its creation thread and is released after |Shutdown()|
// is called, but other threads may have temporarily "dangling" references).
//
// Note the lock order (in order of allowable acquisition):
// |ChannelEndpointClient| (e.g., |MessagePipe|), |ChannelEndpoint|, |Channel|.
// Thus |Channel| may not call into |ChannelEndpoint| with |Channel|'s lock
// held.
class MOJO_SYSTEM_IMPL_EXPORT Channel
: public base::RefCountedThreadSafe<Channel>,
public RawChannel::Delegate {
public:
// |platform_support| (typically owned by |Core|) must remain alive until
// after |Shutdown()| is called.
explicit Channel(embedder::PlatformSupport* platform_support);
// This must be called on the creation thread before any other methods are
// called, and before references to this object are given to any other
// threads. |raw_channel| should be uninitialized. Returns true on success. On
// failure, no other methods should be called (including |Shutdown()|).
bool Init(scoped_ptr<RawChannel> raw_channel);
// Sets the channel manager associated with this channel. This should be set
// at most once and only called before |WillShutdownSoon()| (and
// |Shutdown()|).
void SetChannelManager(ChannelManager* channel_manager);
// This must be called on the creation thread before destruction (which can
// happen on any thread).
void Shutdown();
// Signals that |Shutdown()| will be called soon (this may be called from any
// thread, unlike |Shutdown()|). Warnings will be issued if, e.g., messages
// are written after this is called; other warnings may be suppressed. (This
// may be called multiple times, or not at all.)
//
// If set, the channel manager associated with this channel will be reset.
void WillShutdownSoon();
// Called to set (i.e., attach and run) the bootstrap (first) endpoint on the
// channel. Both the local and remote IDs are the bootstrap ID (given by
// |ChannelEndpointId::GetBootstrap()|).
//
// (Bootstrapping is symmetric: Both sides call this, which will establish the
// first connection across a channel.)
void SetBootstrapEndpoint(scoped_refptr<ChannelEndpoint> endpoint);
// This forwards |message| verbatim to |raw_channel_|.
bool WriteMessage(scoped_ptr<MessageInTransit> message);
// See |RawChannel::IsWriteBufferEmpty()|.
// TODO(vtl): Maybe we shouldn't expose this, and instead have a
// |FlushWriteBufferAndShutdown()| or something like that.
bool IsWriteBufferEmpty();
// Removes the given endpoint from this channel (|local_id| and |remote_id|
// are specified as an optimization; the latter should be an invalid
// |ChannelEndpointId| if the endpoint is not yet running). Note: If this is
// called, the |Channel| will *not* call
// |ChannelEndpoint::DetachFromChannel()|.
void DetachEndpoint(ChannelEndpoint* endpoint,
ChannelEndpointId local_id,
ChannelEndpointId remote_id);
// Returns the size of a serialized endpoint (see |SerializeEndpoint...()| and
// |DeserializeEndpoint()| below). This value will remain constant for a given
// instance of |Channel|.
size_t GetSerializedEndpointSize() const;
// Endpoint serialization methods: From the |Channel|'s point of view, there
// are three cases (discussed further below) and thus three methods.
//
// All three methods have a |destination| argument, which should be a buffer
// to which auxiliary information will be written and which should be
// transmitted to the peer |Channel| by some other means, but using this
// |Channel|. It should be a buffer of (at least) the size returned by
// |GetSerializedEndpointSize()| (exactly that much data will be written).
//
// All three also have a |message_queue| argument, which if non-null is the
// queue of messages already received by the endpoint to be serialized.
//
// Note that "serialize" really means "send" -- the |endpoint| will be sent
// "immediately". The contents of the |destination| buffer can then be used to
// claim the rematerialized endpoint from the peer |Channel|. (|destination|
// must be sent using this |Channel|, since otherwise it may be received
// before it is valid to the peer |Channel|.)
//
// Case 1: The endpoint's peer is already closed.
//
// Case 2: The endpoint's peer is local (i.e., it has a
// |ChannelEndpointClient| but no peer |ChannelEndpoint|).
//
// Case 3: The endpoint's peer is remote (i.e., it has a peer
// |ChannelEndpoint|). (This has two subcases: the peer endpoint may be on
// this |Channel| or another |Channel|.)
void SerializeEndpointWithClosedPeer(void* destination,
MessageInTransitQueue* message_queue);
// This one returns the |ChannelEndpoint| for the serialized endpoint (which
// can be used by, e.g., a |ProxyMessagePipeEndpoint|.
scoped_refptr<ChannelEndpoint> SerializeEndpointWithLocalPeer(
void* destination,
MessageInTransitQueue* message_queue,
ChannelEndpointClient* endpoint_client,
unsigned endpoint_client_port);
void SerializeEndpointWithRemotePeer(
void* destination,
MessageInTransitQueue* message_queue,
scoped_refptr<ChannelEndpoint> peer_endpoint);
// Deserializes an endpoint that was sent from the peer |Channel| (using
// |SerializeEndpoint...()|. |source| should be (a copy of) the data that
// |SerializeEndpoint...()| wrote, and must be (at least)
// |GetSerializedEndpointSize()| bytes. This returns the deserialized
// |IncomingEndpoint| (which can be converted into a |MessagePipe|) or null on
// error.
scoped_refptr<IncomingEndpoint> DeserializeEndpoint(const void* source);
// See |RawChannel::GetSerializedPlatformHandleSize()|.
size_t GetSerializedPlatformHandleSize() const;
embedder::PlatformSupport* platform_support() const {
return platform_support_;
}
private:
friend class base::RefCountedThreadSafe<Channel>;
~Channel() override;
// |RawChannel::Delegate| implementation (only called on the creation thread):
void OnReadMessage(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) override;
void OnError(Error error) override;
// Helpers for |OnReadMessage| (only called on the creation thread):
void OnReadMessageForEndpoint(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles);
void OnReadMessageForChannel(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles);
// Handles "attach and run endpoint" messages.
bool OnAttachAndRunEndpoint(ChannelEndpointId local_id,
ChannelEndpointId remote_id);
// Handles "remove endpoint" messages.
bool OnRemoveEndpoint(ChannelEndpointId local_id,
ChannelEndpointId remote_id);
// Handles "remove endpoint ack" messages.
bool OnRemoveEndpointAck(ChannelEndpointId local_id);
// Handles errors (e.g., invalid messages) from the remote side. Callable from
// any thread.
void HandleRemoteError(const base::StringPiece& error_message);
// Handles internal errors/failures from the local side. Callable from any
// thread.
void HandleLocalError(const base::StringPiece& error_message);
// Helper for |SerializeEndpoint...()|: Attaches the given (non-bootstrap)
// endpoint to this channel and runs it. This assigns the endpoint both local
// and remote IDs. This will also send a |kSubtypeChannelAttachAndRunEndpoint|
// message to the remote side to tell it to create an endpoint as well. This
// returns the *remote* ID (one for which |is_remote()| returns true).
//
// TODO(vtl): Maybe limit the number of attached message pipes.
ChannelEndpointId AttachAndRunEndpoint(
scoped_refptr<ChannelEndpoint> endpoint);
// Helper to send channel control messages. Returns true on success. Should be
// called *without* |lock_| held. Callable from any thread.
bool SendControlMessage(MessageInTransit::Subtype subtype,
ChannelEndpointId source_id,
ChannelEndpointId destination_id);
base::ThreadChecker creation_thread_checker_;
embedder::PlatformSupport* const platform_support_;
// Note: |ChannelEndpointClient|s (in particular, |MessagePipe|s) MUST NOT be
// used under |lock_|. E.g., |lock_| can only be acquired after
// |MessagePipe::lock_|, never before. Thus to call into a
// |ChannelEndpointClient|, a reference should be acquired from
// |local_id_to_endpoint_map_| under |lock_| and then the lock released.
base::Lock lock_; // Protects the members below.
scoped_ptr<RawChannel> raw_channel_;
bool is_running_;
// Set when |WillShutdownSoon()| is called.
bool is_shutting_down_;
// Has a reference to us.
ChannelManager* channel_manager_;
typedef base::hash_map<ChannelEndpointId, scoped_refptr<ChannelEndpoint>>
IdToEndpointMap;
// Map from local IDs to endpoints. If the endpoint is null, this means that
// we're just waiting for the remove ack before removing the entry.
IdToEndpointMap local_id_to_endpoint_map_;
// Note: The IDs generated by this should be checked for existence before use.
LocalChannelEndpointIdGenerator local_id_generator_;
typedef base::hash_map<ChannelEndpointId, scoped_refptr<IncomingEndpoint>>
IdToIncomingEndpointMap;
// Map from local IDs to incoming endpoints (i.e., those received inside other
// messages, but not yet claimed via |DeserializeEndpoint()|).
IdToIncomingEndpointMap incoming_endpoints_;
// TODO(vtl): We need to keep track of remote IDs (so that we don't collide
// if/when we wrap).
RemoteChannelEndpointIdGenerator remote_id_generator_;
DISALLOW_COPY_AND_ASSIGN(Channel);
};
} // namespace system
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_CHANNEL_H_
|