/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "MessageStorePlugin.h"
#include "StorageProvider.h"
#include "StoreException.h"
#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/DataDir.h"
#include "qpid/log/Statement.h"

/*
 * The MessageStore pointer given to the Broker points to static storage.
 * Thus, it cannot be deleted, especially by the broker. To prevent deletion,
 * this no-op deleter is used with the boost::shared_ptr. When the last
 * shared_ptr is destroyed, the deleter is called rather than delete().
 */
namespace {
  class NoopDeleter {
  public:
      NoopDeleter() {}
      void operator()(qpid::broker::MessageStore * /*p*/) {}
  };
}

namespace qpid {
namespace store {

static MessageStorePlugin static_instance_registers_plugin;


MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) :
    qpid::Options(name)
{
    addOptions()
        ("storage-provider", qpid::optValue(providerName, "PROVIDER"),
         "Name of the storage provider to use.")
        ;
}


void
MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
{
    qpid::broker::Broker* b =
        dynamic_cast<qpid::broker::Broker*>(&target);
    if (0 == b)
        return;        // Only listen to Broker targets

    broker = b;

    // See if there are any storage provider plugins ready. If not, we can't
    // do a message store.
    qpid::Plugin::earlyInitAll(*this);

    if (providers.empty()) {
        QPID_LOG(warning,
                 "Message store plugin: No storage providers available.");
        provider = providers.end();
        return;
    }
    if (!options.providerName.empty()) {
        // If specific one was chosen, locate it in loaded set of providers.
        provider = providers.find(options.providerName);
        if (provider == providers.end())
            throw Exception("Message store plugin: storage provider '" +
                            options.providerName +
                            "' does not exist.");
    }
    else {
        // No specific provider chosen; if there's only one, use it. Else
        // report the need to pick one.
        if (providers.size() > 1) {
            provider = providers.end();
            throw Exception("Message store plugin: multiple provider plugins "
                            "loaded; must either load only one or select one "
                            "using --storage-provider");
        }
        provider = providers.begin();
    }

    provider->second->activate(*this);
    NoopDeleter d;
    boost::shared_ptr<qpid::broker::MessageStore> sp(this, d);
    broker->setStore(sp);
    target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this));
}

void
MessageStorePlugin::initialize(qpid::Plugin::Target& target)
{
    qpid::broker::Broker* broker =
        dynamic_cast<qpid::broker::Broker*>(&target);
    if (0 == broker)
        return;        // Only listen to Broker targets

    // Pass along the initialize step to the provider that's activated.
    if (provider != providers.end()) {
        provider->second->initialize(*this);
    }
    //    qpid::Plugin::initializeAll(*this);
}

void
MessageStorePlugin::finalizeMe()
{
    finalize();              // Call finalizers on any Provider plugins
}

void
MessageStorePlugin::providerAvailable(const std::string name,
                                      StorageProvider *be)
{
    ProviderMap::value_type newSp(name, be);
    std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp);
    if (inserted.second == false)
        QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
}

void
MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
{
    QPID_LOG(info, "Store: truncateInit");
}


/**
 * Record the existence of a durable queue
 */
void
MessageStorePlugin::create(broker::PersistableQueue& queue,
                           const framing::FieldTable& args)
{
    if (queue.getName().size() == 0)
    {
        QPID_LOG(error,
                 "Cannot create store for empty (null) queue name - "
                 "ignoring and attempting to continue.");
        return;
    }
    if (queue.getPersistenceId()) {
        THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
    }
    provider->second->create(queue, args);
}

/**
 * Destroy a durable queue
 */
void
MessageStorePlugin::destroy(broker::PersistableQueue& queue)
{
    provider->second->destroy(queue);
}

/**
 * Record the existence of a durable exchange
 */
void
MessageStorePlugin::create(const broker::PersistableExchange& exchange,
                           const framing::FieldTable& args)
{
    if (exchange.getPersistenceId()) {
        THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
    }
    provider->second->create(exchange, args);
}

/**
 * Destroy a durable exchange
 */
void
MessageStorePlugin::destroy(const broker::PersistableExchange& exchange)
{
    provider->second->destroy(exchange);
}

/**
 * Record a binding
 */
void
MessageStorePlugin::bind(const broker::PersistableExchange& exchange,
                         const broker::PersistableQueue& queue,
                         const std::string& key,
                         const framing::FieldTable& args)
{
    provider->second->bind(exchange, queue, key, args);
}

/**
 * Forget a binding
 */
void
MessageStorePlugin::unbind(const broker::PersistableExchange& exchange,
                           const broker::PersistableQueue& queue,
                           const std::string& key,
                           const framing::FieldTable& args)
{
    provider->second->unbind(exchange, queue, key, args);
}

/**
 * Record generic durable configuration
 */
void
MessageStorePlugin::create(const broker::PersistableConfig& config)
{
    if (config.getPersistenceId()) {
        THROW_STORE_EXCEPTION("Config item already created: " +
                              config.getName());
    }
    provider->second->create(config);
}

/**
 * Destroy generic durable configuration
 */
void
MessageStorePlugin::destroy(const broker::PersistableConfig& config)
{
    provider->second->destroy(config);
}

/**
 * Stores a message before it has been enqueued
 * (enqueueing automatically stores the message so this is
 * only required if storage is required prior to that
 * point).
 */
void
MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
{
    if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
        provider->second->stage(msg);
    }
}

/**
 * Destroys a previously staged message. This only needs
 * to be called if the message is never enqueued. (Once
 * enqueued, deletion will be automatic when the message
 * is dequeued from all queues it was enqueued onto).
 */
void
MessageStorePlugin::destroy(broker::PersistableMessage& msg)
{
    if (msg.getPersistenceId())
        provider->second->destroy(msg);
}

/**
 * Appends content to a previously staged message
 */
void
MessageStorePlugin::appendContent
  (const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
   const std::string& data)
{
    if (msg->getPersistenceId())
        provider->second->appendContent(msg, data);
    else
        THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
}

/**
 * Loads (a section) of content data for the specified
 * message (previously stored through a call to stage or
 * enqueue) into data. The offset refers to the content
 * only (i.e. an offset of 0 implies that the start of the
 * content should be loaded, not the headers or related
 * meta-data).
 */
void
MessageStorePlugin::loadContent(const broker::PersistableQueue& queue,
                                const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
                                std::string& data,
                                uint64_t offset,
                                uint32_t length)
{
    if (msg->getPersistenceId())
        provider->second->loadContent(queue, msg, data, offset, length);
    else
        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}

/**
 * Enqueues a message, storing the message if it has not
 * been previously stored and recording that the given
 * message is on the given queue.
 *
 * Note: The operation is asynchronous so the return of this function does
 * not mean the operation is complete.
 */
void
MessageStorePlugin::enqueue(broker::TransactionContext* ctxt,
                            const boost::intrusive_ptr<broker::PersistableMessage>& msg,
                            const broker::PersistableQueue& queue)
{
    if (queue.getPersistenceId() == 0) {
        THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
    }
    provider->second->enqueue(ctxt, msg, queue);
}

/**
 * Dequeues a message, recording that the given message is
 * no longer on the given queue and deleting the message
 * if it is no longer on any other queue.
 *
 * Note: The operation is asynchronous so the return of this function does
 * not mean the operation is complete.
 */
void
MessageStorePlugin::dequeue(broker::TransactionContext* ctxt,
                            const boost::intrusive_ptr<broker::PersistableMessage>& msg,
                            const broker::PersistableQueue& queue)
{
    provider->second->dequeue(ctxt, msg, queue);
}

/**
 * Flushes all async messages to disk for the specified queue
 *
 * Note: The operation is asynchronous so the return of this function does
 * not mean the operation is complete.
 */
void
MessageStorePlugin::flush(const broker::PersistableQueue& queue)
{
    provider->second->flush(queue);
}

/**
 * Returns the number of outstanding AIO's for a given queue
 *
 * If 0, than all the enqueue / dequeues have been stored
 * to disk.
 */
uint32_t
MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue)
{
    return provider->second->outstandingQueueAIO(queue);
}

std::auto_ptr<broker::TransactionContext>
MessageStorePlugin::begin()
{
    return provider->second->begin();
}

std::auto_ptr<broker::TPCTransactionContext>
MessageStorePlugin::begin(const std::string& xid)
{
    return provider->second->begin(xid);
}

void
MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt)
{
    provider->second->prepare(ctxt);
}

void
MessageStorePlugin::commit(broker::TransactionContext& ctxt)
{
    provider->second->commit(ctxt);
}

void
MessageStorePlugin::abort(broker::TransactionContext& ctxt)
{
    provider->second->abort(ctxt);
}

void
MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids)
{
    provider->second->collectPreparedXids(xids);
}

/**
 * Request recovery of queue and message state; inherited from Recoverable
 */
void
MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
{
    ExchangeMap exchanges;
    QueueMap queues;
    MessageMap messages;
    MessageQueueMap messageQueueMap;
    std::vector<std::string> xids;
    PreparedTransactionMap dtxMap;

    provider->second->recoverConfigs(recoverer);
    provider->second->recoverExchanges(recoverer, exchanges);
    provider->second->recoverQueues(recoverer, queues);
    provider->second->recoverBindings(recoverer, exchanges, queues);
    // Important to recover messages before transactions in the SQL-CLFS
    // case. If this becomes a problem, it may be possible to resolve it.
    // If in doubt please raise a jira and notify Steve Huston
    // <shuston@riverace.com>.
    provider->second->recoverMessages(recoverer, messages, messageQueueMap);
    provider->second->recoverTransactions(recoverer, dtxMap);
    // Enqueue msgs where needed.
    for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
         i != messageQueueMap.end();
         ++i) {
        // Locate the message corresponding to the current message Id
        MessageMap::const_iterator iMsg = messages.find(i->first);
        if (iMsg == messages.end()) {
            std::ostringstream oss;
            oss << "No matching message trying to re-enqueue message "
                << i->first;
            THROW_STORE_EXCEPTION(oss.str());
        }
        broker::RecoverableMessage::shared_ptr msg = iMsg->second;
        // Now for each queue referenced in the queue map, locate it
        // and re-enqueue the message.
        for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
             j != i->second.end();
             ++j) {
            // Locate the queue corresponding to the current queue Id
            QueueMap::const_iterator iQ = queues.find(j->queueId);
            if (iQ == queues.end()) {
                std::ostringstream oss;
                oss << "No matching queue trying to re-enqueue message "
                    << " on queue Id " << j->queueId;
                THROW_STORE_EXCEPTION(oss.str());
            }
            // Messages involved in prepared transactions have their status
            // updated accordingly. First, though, restore a message that
            // is expected to be on a queue, including non-transacted
            // messages and those pending dequeue in a dtx.
            if (j->tplStatus != QueueEntry::ADDING)
                iQ->second->recover(msg);
            switch(j->tplStatus) {
            case QueueEntry::ADDING:
                dtxMap[j->xid]->enqueue(iQ->second, msg);
                break;
            case QueueEntry::REMOVING:
                dtxMap[j->xid]->dequeue(iQ->second, msg);
                break;
            default:
                break;
            }
        }
    }
}

}} // namespace qpid::store
