1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
|
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_NODE_H_
#define MOJO_CORE_PORTS_NODE_H_
#include <stddef.h>
#include <stdint.h>
#include <unordered_map>
#include "base/component_export.h"
#include "base/containers/flat_map.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/scoped_refptr.h"
#include "base/synchronization/lock.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/name.h"
#include "mojo/core/ports/port.h"
#include "mojo/core/ports/port_ref.h"
#include "mojo/core/ports/user_data.h"
namespace mojo {
namespace core {
namespace ports {
enum : int {
OK = 0,
ERROR_PORT_UNKNOWN = -10,
ERROR_PORT_EXISTS = -11,
ERROR_PORT_STATE_UNEXPECTED = -12,
ERROR_PORT_CANNOT_SEND_SELF = -13,
ERROR_PORT_PEER_CLOSED = -14,
ERROR_PORT_CANNOT_SEND_PEER = -15,
ERROR_NOT_IMPLEMENTED = -100,
};
struct PortStatus {
bool has_messages;
bool receiving_messages;
bool peer_closed;
bool peer_remote;
size_t queued_message_count;
size_t queued_num_bytes;
size_t unacknowledged_message_count;
};
struct PendingUpdatePreviousPeer {
NodeName receiver;
PortName port;
PortName from_port;
uint64_t sequence_num;
NodeName new_prev_node;
PortName new_prev_port;
};
class MessageFilter;
class NodeDelegate;
// A Node maintains a collection of Ports (see port.h) indexed by unique 128-bit
// addresses (names), performing routing and processing of events among the
// Ports within the Node and to or from other Nodes in the system. Typically
// (and practically, in all uses today) there is a single Node per system
// process. Thus a Node boundary effectively models a process boundary.
//
// New Ports can be created uninitialized using CreateUninitializedPort (and
// later initialized using InitializePort), or created in a fully initialized
// state using CreatePortPair(). Initialized ports have exactly one conjugate
// port which is the ultimate receiver of any user messages sent by that port.
// See SendUserMessage().
//
// In addition to routing user message events, various control events are used
// by Nodes to coordinate Port behavior and lifetime within and across Nodes.
// See Event documentation for description of different types of events used by
// a Node to coordinate behavior.
class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
public:
enum class ShutdownPolicy {
DONT_ALLOW_LOCAL_PORTS,
ALLOW_LOCAL_PORTS,
};
// Does not take ownership of the delegate.
Node(const NodeName& name, NodeDelegate* delegate);
Node(const Node&) = delete;
Node& operator=(const Node&) = delete;
~Node();
// Returns true iff there are no open ports referring to another node or ports
// in the process of being transferred from this node to another. If this
// returns false, then to ensure clean shutdown, it is necessary to keep the
// node alive and continue routing messages to it via AcceptMessage. This
// method may be called again after AcceptMessage to check if the Node is now
// ready to be destroyed.
//
// If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return
// |true| even if some ports remain alive, as long as none of them are proxies
// to another node.
bool CanShutdownCleanly(
ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS);
// Lookup the named port.
int GetPort(const PortName& port_name, PortRef* port_ref);
// Creates a port on this node. Before the port can be used, it must be
// initialized using InitializePort. This method is useful for bootstrapping
// a connection between two nodes. Generally, ports are created using
// CreatePortPair instead.
int CreateUninitializedPort(PortRef* port_ref);
// Initializes a newly created port.
int InitializePort(const PortRef& port_ref,
const NodeName& peer_node_name,
const PortName& peer_port_name,
const NodeName& prev_node_name,
const PortName& prev_port_name);
// Generates a new connected pair of ports bound to this node. These ports
// are initialized and ready to go.
int CreatePortPair(PortRef* port0_ref, PortRef* port1_ref);
// User data associated with the port.
int SetUserData(const PortRef& port_ref, scoped_refptr<UserData> user_data);
int GetUserData(const PortRef& port_ref, scoped_refptr<UserData>* user_data);
// Prevents further messages from being sent from this port or delivered to
// this port. The port is removed, and the port's peer is notified of the
// closure after it has consumed all pending messages.
int ClosePort(const PortRef& port_ref);
// Returns the current status of the port.
int GetStatus(const PortRef& port_ref, PortStatus* port_status);
// Returns the next available message on the specified port or returns a null
// message if there are none available. Returns ERROR_PORT_PEER_CLOSED to
// indicate that this port's peer has closed. In such cases GetMessage may
// be called until it yields a null message, indicating that no more messages
// may be read from the port.
//
// If |filter| is non-null, the next available message is returned only if it
// is matched by the filter. If the provided filter does not match the next
// available message, GetMessage() behaves as if there is no message
// available. Ownership of |filter| is not taken, and it must outlive the
// extent of this call.
int GetMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter);
// Sends a message from the specified port to its peer. Note that the message
// notification may arrive synchronously (via PortStatusChanged() on the
// delegate) if the peer is local to this Node.
int SendUserMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent> message);
// Makes the port send acknowledge requests to its conjugate to acknowledge
// at least every |sequence_number_acknowledge_interval| messages as they're
// read from the conjugate. The number of unacknowledged messages is exposed
// in the |unacknowledged_message_count| field of PortStatus. This allows
// bounding the number of unread and/or in-transit messages from this port
// to its conjugate between zero and |unacknowledged_message_count|.
int SetAcknowledgeRequestInterval(
const PortRef& port_ref,
uint64_t sequence_number_acknowledge_interval);
// Corresponding to NodeDelegate::ForwardEvent.
int AcceptEvent(const NodeName& from_node, ScopedEvent event);
// Called to merge two ports with each other. If you have two independent
// port pairs A <=> B and C <=> D, the net result of merging B and C is a
// single connected port pair A <=> D.
//
// Note that the behavior of this operation is undefined if either port to be
// merged (B or C above) has ever been read from or written to directly, and
// this must ONLY be called on one side of the merge, though it doesn't matter
// which side.
//
// It is safe for the non-merged peers (A and D above) to be transferred,
// closed, and/or written to before, during, or after the merge.
int MergePorts(const PortRef& port_ref,
const NodeName& destination_node_name,
const PortName& destination_port_name);
// Like above but merges two ports local to this node. Because both ports are
// local this can also verify that neither port has been written to before the
// merge. If this fails for any reason, both ports are closed. Otherwise OK
// is returned and the ports' receiving peers are connected to each other.
int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref);
// Called to inform this node that communication with another node is lost
// indefinitely. This triggers cleanup of ports bound to this node.
int LostConnectionToNode(const NodeName& node_name);
private:
// Helper to ensure that a Node always calls into its delegate safely, i.e.
// without holding any internal locks.
class DelegateHolder {
public:
DelegateHolder(Node* node, NodeDelegate* delegate);
DelegateHolder(const DelegateHolder&) = delete;
DelegateHolder& operator=(const DelegateHolder&) = delete;
~DelegateHolder();
NodeDelegate* operator->() const {
EnsureSafeDelegateAccess();
return delegate_;
}
private:
#if DCHECK_IS_ON()
void EnsureSafeDelegateAccess() const;
#else
void EnsureSafeDelegateAccess() const {}
#endif
const raw_ptr<Node, DanglingUntriaged> node_;
const raw_ptr<NodeDelegate, DanglingUntriaged> delegate_;
};
int OnUserMessage(const PortRef& port_ref,
const NodeName& from_node,
std::unique_ptr<UserMessageEvent> message);
int OnPortAccepted(const PortRef& port_ref,
std::unique_ptr<PortAcceptedEvent> event);
int OnObserveProxy(const PortRef& port_ref,
std::unique_ptr<ObserveProxyEvent> event);
int OnObserveProxyAck(const PortRef& port_ref,
std::unique_ptr<ObserveProxyAckEvent> event);
int OnObserveClosure(const PortRef& port_ref,
std::unique_ptr<ObserveClosureEvent> event);
int OnMergePort(const PortRef& port_ref,
std::unique_ptr<MergePortEvent> event);
int OnUserMessageReadAckRequest(
const PortRef& port_ref,
std::unique_ptr<UserMessageReadAckRequestEvent> event);
int OnUserMessageReadAck(const PortRef& port_ref,
std::unique_ptr<UserMessageReadAckEvent> event);
int OnUpdatePreviousPeer(const PortRef& port_ref,
std::unique_ptr<UpdatePreviousPeerEvent> event);
int AddPortWithName(const PortName& port_name, scoped_refptr<Port> port);
void ErasePort(const PortName& port_name);
// Check if the event is sent by the previous peer of the port to decide if
// we can check the sequence number.
// This is not the case for example for PortAccepted or broadcasted events.
bool IsEventFromPreviousPeer(const Event& event);
int AcceptEventInternal(const PortRef& port_ref,
const NodeName& from_node,
ScopedEvent event);
int SendUserMessageInternal(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message);
int MergePortsInternal(const PortRef& port0_ref,
const PortRef& port1_ref,
bool allow_close_on_bad_state);
void ConvertToProxy(Port* port,
const NodeName& to_node_name,
PortName* port_name,
Event::PortDescriptor* port_descriptor,
PendingUpdatePreviousPeer* pending_update);
int AcceptPort(const PortName& port_name,
const Event::PortDescriptor& port_descriptor);
int PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
Port::State expected_port_state,
bool ignore_closed_peer,
UserMessageEvent* message,
NodeName* forward_to_node);
int BeginProxying(const PortRef& port_ref);
int ForwardUserMessagesFromProxy(const PortRef& port_ref);
void InitiateProxyRemoval(const PortRef& port_ref);
void TryRemoveProxy(const PortRef& port_ref);
void DestroyAllPortsWithPeer(const NodeName& node_name,
const PortName& port_name);
// Changes the peer node and port name referenced by |port|. Note that both
// |ports_lock_| MUST be held through the extent of this method.
// |local_port|'s lock must be held if and only if a reference to |local_port|
// exist in |ports_|.
void UpdatePortPeerAddress(const PortName& local_port_name,
Port* local_port,
const NodeName& new_peer_node,
const PortName& new_peer_port);
// Removes an entry from |peer_port_map_| corresponding to |local_port|'s peer
// address, if valid.
void RemoveFromPeerPortMap(const PortName& local_port_name, Port* local_port);
// Swaps the peer information for two local ports. Used during port merges.
// Note that |ports_lock_| must be held along with each of the two port's own
// locks, through the extent of this method.
void SwapPortPeers(const PortName& port0_name,
Port* port0,
const PortName& port1_name,
Port* port1);
// Sends an acknowledge request to the peer if the port has a non-zero
// |sequence_num_acknowledge_interval|. This needs to be done when the port's
// peer changes, as the previous peer proxy may not have forwarded any prior
// acknowledge request before deleting itself.
void MaybeResendAckRequest(const PortRef& port_ref);
// Forwards a stored acknowledge request to the peer if the proxy has a
// non-zero |sequence_num_acknowledge_interval|.
void MaybeForwardAckRequest(const PortRef& port_ref);
// Sends an acknowledge of the most recently read sequence number to the peer
// if any messages have been read, and the port has a non-zero
// |sequence_num_to_acknowledge|.
void MaybeResendAck(const PortRef& port_ref);
const NodeName name_;
const DelegateHolder delegate_;
// Just to clarify readability of the types below.
using LocalPortName = PortName;
using PeerPortName = PortName;
// Guards access to |ports_| and |peer_port_maps_| below.
//
// This must never be acquired while an individual port's lock is held on the
// same thread. Conversely, individual port locks may be acquired while this
// one is held.
//
// Because UserMessage events may execute arbitrary user code during
// destruction, it is also important to ensure that such events are never
// destroyed while this (or any individual Port) lock is held.
base::Lock ports_lock_;
std::unordered_map<LocalPortName, scoped_refptr<Port>> ports_;
// Maps a peer port name to a list of PortRefs for all local ports which have
// the port name key designated as their peer port. The set of local ports
// which have the same peer port is expected to always be relatively small and
// usually 1. Hence we just use a flat_map of local PortRefs keyed on each
// local port's name.
using PeerPortMap =
std::unordered_map<PeerPortName, base::flat_map<LocalPortName, PortRef>>;
// A reverse mapping which can be used to find all local ports that reference
// a given peer node or a local port that references a specific given peer
// port on a peer node. The key to this map is the corresponding peer node
// name.
std::unordered_map<NodeName, PeerPortMap> peer_port_maps_;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_NODE_H_
|