1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#ifndef INCLUDED_ORCUS_DETAIL_THREAD_PARSER_TOKEN_BUFFER_HPP
#define INCLUDED_ORCUS_DETAIL_THREAD_PARSER_TOKEN_BUFFER_HPP
#include "orcus/exception.hpp"
#include <mutex>
#include <condition_variable>
namespace orcus { namespace detail { namespace thread {
/**
* Class that manages synchronization of parser tokens used in
* multi-threaded parsers.
*/
template<typename _TokensT>
class parser_token_buffer
{
enum class state_type { parsing_progress, parsing_ended, parsing_aborted };
typedef _TokensT tokens_type;
mutable std::mutex m_mtx_tokens;
std::condition_variable m_cv_tokens_empty;
std::condition_variable m_cv_tokens_ready;
tokens_type m_tokens; // token buffer used to hand over tokens to the client.
size_t m_token_size_threshold;
const size_t m_max_token_size;
state_type m_state;
bool tokens_empty() const
{
std::lock_guard<std::mutex> lock(m_mtx_tokens);
return m_tokens.empty();
}
/**
* Only to be called from the parser thread.
*
* Wait until the processor thread takes the new tokens and makes the
* token buffer empty.
*/
void wait_until_tokens_empty()
{
std::unique_lock<std::mutex> lock(m_mtx_tokens);
while (!m_tokens.empty() && m_state == state_type::parsing_progress)
m_cv_tokens_empty.wait(lock);
if (m_state == state_type::parsing_aborted)
throw detail::parsing_aborted_error();
}
public:
parser_token_buffer(size_t min_token_size, size_t max_token_size) :
m_token_size_threshold(std::max<size_t>(min_token_size, 1)),
m_max_token_size(max_token_size),
m_state(state_type::parsing_progress)
{
if (m_token_size_threshold > m_max_token_size)
throw invalid_arg_error(
"initial token size threshold is already larger than the max token size.");
}
/**
* Check the size of the parser token buffer, and if it exceeds specified
* threshold, move it to the client buffer.
*
* Call this from the parser thread.
*
* @param parser_tokens parser token buffer.
*/
void check_and_notify(tokens_type& parser_tokens)
{
if (parser_tokens.size() < m_token_size_threshold)
// Still below the threshold.
return;
if (!tokens_empty())
{
if (m_token_size_threshold < (m_max_token_size/2))
{
// Double the threshold and continue to parse.
m_token_size_threshold *= 2;
return;
}
// We cannot increase the threshold any more. Wait for the
// client to finish.
wait_until_tokens_empty();
}
std::unique_lock<std::mutex> lock(m_mtx_tokens);
m_tokens.swap(parser_tokens);
lock.unlock();
m_cv_tokens_ready.notify_one();
}
/**
* Move the current parser token buffer to the client buffer, and signal
* the end of parsing.
*
* Call this from the parser thread.
*
* @param parser_tokens parser token buffer.
*/
void notify_and_finish(tokens_type& parser_tokens)
{
// Wait until the client tokens get used up.
wait_until_tokens_empty();
{
std::lock_guard<std::mutex> lock(m_mtx_tokens);
m_tokens.swap(parser_tokens);
m_state = state_type::parsing_ended;
}
m_cv_tokens_ready.notify_one();
}
void abort()
{
{
std::lock_guard<std::mutex> lock(m_mtx_tokens);
m_tokens.clear();
m_state = state_type::parsing_aborted;
}
m_cv_tokens_empty.notify_one();
}
/**
* Retrieve the tokens currently in the client token buffer.
*
* Call this from the client (non-parser) thread.
*
* @param tokens place to move the tokens in the client token buffer to.
*
* @return true if the parsing is still in progress, therefore more tokens
* are expected, false if this is the last set of tokens.
*/
bool next_tokens(tokens_type& tokens)
{
tokens.clear();
// Wait until the parser passes a new set of tokens.
std::unique_lock<std::mutex> lock(m_mtx_tokens);
while (m_tokens.empty() && m_state == state_type::parsing_progress)
m_cv_tokens_ready.wait(lock);
// Get the new tokens and notify the parser.
tokens.swap(m_tokens);
state_type parsing_progress = m_state; // Make a copy so that lock can be released safely.
lock.unlock();
m_cv_tokens_empty.notify_one();
return parsing_progress == state_type::parsing_progress;
}
/**
* Return the current token size threshold. Call this only after the
* parsing has finished.
*
* @return current token size threshold.
*/
size_t token_size_threshold() const
{
if (m_state == state_type::parsing_progress)
return 0;
return m_token_size_threshold;
}
};
}}}
#endif
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|