1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#ifndef transportflow_h__
#define transportflow_h__
#include <deque>
#include <queue>
#include <string>
#include "nscore.h"
#include "nsISupportsImpl.h"
#include "mozilla/Scoped.h"
#include "transportlayer.h"
#include "m_cpp_utils.h"
#include "nsAutoPtr.h"
// A stack of transport layers acts as a flow.
// Generally, one reads and writes to the top layer.
// This code has a confusing hybrid threading model which
// probably needs some eventual refactoring.
// TODO(ekr@rtfm.com): Bug 844891
//
// TransportFlows are not inherently bound to a thread *but*
// TransportLayers can be. If any layer in a flow is bound
// to a given thread, then all layers in the flow MUST be
// bound to that thread and you can only manipulate the
// flow (push layers, write, etc.) on that thread.
//
// The sole official exception to this is that you are
// allowed to *destroy* a flow off the bound thread provided
// that there are no listeners on its signals. This exception
// is designed to allow idioms where you create the flow
// and then something goes wrong and you destroy it and
// you don't want to bother with a thread dispatch.
//
// Eventually we hope to relax the "no listeners"
// restriction by thread-locking the signals, but previous
// attempts have caused deadlocks.
//
// Most of these invariants are enforced by hard asserts
// (i.e., those which fire even in production builds).
namespace mozilla {
class TransportFlow final : public nsISupports,
public sigslot::has_slots<> {
public:
TransportFlow()
: id_("(anonymous)"),
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
explicit TransportFlow(const std::string id)
: id_(id),
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
const std::string& id() const { return id_; }
// Layer management. Note PushLayer() is not thread protected, so
// either:
// (a) Do it in the thread handling the I/O
// (b) Do it before you activate the I/O system
//
// The flow takes ownership of the layers after a successful
// push.
nsresult PushLayer(TransportLayer *layer);
// Convenience function to push multiple layers on. Layers
// are pushed on in the order that they are in the queue.
// Any failures cause the flow to become inoperable and
// destroys all the layers including those already pushed.
// TODO(ekr@rtfm.com): Change layers to be ref-counted.
nsresult PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers);
TransportLayer *top() const;
TransportLayer *GetLayer(const std::string& id) const;
// Wrappers for whatever TLayer happens to be the top layer
// at the time. This way you don't need to do top()->Foo().
TransportLayer::State state(); // Current state
TransportResult SendPacket(const unsigned char *data, size_t len);
// State has changed. Reflects the top flow.
sigslot::signal2<TransportFlow *, TransportLayer::State>
SignalStateChange;
// Data received on the flow
sigslot::signal3<TransportFlow*, const unsigned char *, size_t>
SignalPacketReceived;
bool Contains(TransportLayer *layer) const;
NS_DECL_THREADSAFE_ISUPPORTS
private:
~TransportFlow();
DISALLOW_COPY_ASSIGN(TransportFlow);
// Check if we are on the right thread
void CheckThread() const {
if (!CheckThreadInt())
MOZ_CRASH();
}
bool CheckThreadInt() const {
bool on;
if (!target_) // OK if no thread set.
return true;
if (NS_FAILED(target_->IsOnCurrentThread(&on)))
return false;
return on;
}
void EnsureSameThread(TransportLayer *layer);
void StateChange(TransportLayer *layer, TransportLayer::State state);
void StateChangeInt(TransportLayer::State state);
void PacketReceived(TransportLayer* layer, const unsigned char *data,
size_t len);
static void DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers);
// Overload needed because we use deque internally and queue externally.
static void ClearLayers(std::deque<TransportLayer *>* layers);
static void ClearLayers(std::queue<TransportLayer *>* layers);
std::string id_;
TransportLayer::State state_;
ScopedDeletePtr<std::deque<TransportLayer *> > layers_;
nsCOMPtr<nsIEventTarget> target_;
};
} // close namespace
#endif
|