File: parser_token_buffer.hpp

package info (click to toggle)
liborcus 0.19.2-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,964 kB
  • sloc: xml: 68,559; cpp: 65,209; sh: 4,607; makefile: 2,639; python: 2,223; ansic: 4
file content (188 lines) | stat: -rw-r--r-- 5,453 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

#ifndef INCLUDED_ORCUS_DETAIL_THREAD_PARSER_TOKEN_BUFFER_HPP
#define INCLUDED_ORCUS_DETAIL_THREAD_PARSER_TOKEN_BUFFER_HPP

#include "orcus/exception.hpp"

#include <mutex>
#include <condition_variable>

namespace orcus { namespace detail { namespace thread {

/**
 * Class that manages synchronization of parser tokens used in
 * multi-threaded parsers.
 */
template<typename _TokensT>
class parser_token_buffer
{
    enum class state_type { parsing_progress, parsing_ended, parsing_aborted };

    typedef _TokensT tokens_type;

    mutable std::mutex m_mtx_tokens;
    std::condition_variable m_cv_tokens_empty;
    std::condition_variable m_cv_tokens_ready;

    tokens_type m_tokens; // token buffer used to hand over tokens to the client.

    size_t m_token_size_threshold;
    const size_t m_max_token_size;

    state_type m_state;

    bool tokens_empty() const
    {
        std::lock_guard<std::mutex> lock(m_mtx_tokens);
        return m_tokens.empty();
    }

    /**
     * Only to be called from the parser thread.
     *
     * Wait until the processor thread takes the new tokens and makes the
     * token buffer empty.
     */
    void wait_until_tokens_empty()
    {
        std::unique_lock<std::mutex> lock(m_mtx_tokens);
        while (!m_tokens.empty() && m_state == state_type::parsing_progress)
            m_cv_tokens_empty.wait(lock);

        if (m_state == state_type::parsing_aborted)
            throw detail::parsing_aborted_error();
    }

public:

    parser_token_buffer(size_t min_token_size, size_t max_token_size) :
        m_token_size_threshold(std::max<size_t>(min_token_size, 1)),
        m_max_token_size(max_token_size),
        m_state(state_type::parsing_progress)
    {
        if (m_token_size_threshold > m_max_token_size)
            throw invalid_arg_error(
                "initial token size threshold is already larger than the max token size.");
    }

    /**
     * Check the size of the parser token buffer, and if it exceeds specified
     * threshold, move it to the client buffer.
     *
     * Call this from the parser thread.
     *
     * @param parser_tokens parser token buffer.
     */
    void check_and_notify(tokens_type& parser_tokens)
    {
        if (parser_tokens.size() < m_token_size_threshold)
            // Still below the threshold.
            return;

        if (!tokens_empty())
        {
            if (m_token_size_threshold < (m_max_token_size/2))
            {
                // Double the threshold and continue to parse.
                m_token_size_threshold *= 2;
                return;
            }

            // We cannot increase the threshold any more.  Wait for the
            // client to finish.
            wait_until_tokens_empty();
        }

        std::unique_lock<std::mutex> lock(m_mtx_tokens);
        m_tokens.swap(parser_tokens);
        lock.unlock();
        m_cv_tokens_ready.notify_one();
    }

    /**
     * Move the current parser token buffer to the client buffer, and signal
     * the end of parsing.
     *
     * Call this from the parser thread.
     *
     * @param parser_tokens parser token buffer.
     */
    void notify_and_finish(tokens_type& parser_tokens)
    {
        // Wait until the client tokens get used up.
        wait_until_tokens_empty();

        {
            std::lock_guard<std::mutex> lock(m_mtx_tokens);
            m_tokens.swap(parser_tokens);
            m_state = state_type::parsing_ended;
        }
        m_cv_tokens_ready.notify_one();
    }

    void abort()
    {
        {
            std::lock_guard<std::mutex> lock(m_mtx_tokens);
            m_tokens.clear();
            m_state = state_type::parsing_aborted;
        }
        m_cv_tokens_empty.notify_one();
    }

    /**
     * Retrieve the tokens currently in the client token buffer.
     *
     * Call this from the client (non-parser) thread.
     *
     * @param tokens place to move the tokens in the client token buffer to.
     *
     * @return true if the parsing is still in progress, therefore more tokens
     *         are expected, false if this is the last set of tokens.
     */
    bool next_tokens(tokens_type& tokens)
    {
        tokens.clear();

        // Wait until the parser passes a new set of tokens.
        std::unique_lock<std::mutex> lock(m_mtx_tokens);
        while (m_tokens.empty() && m_state == state_type::parsing_progress)
            m_cv_tokens_ready.wait(lock);

        // Get the new tokens and notify the parser.
        tokens.swap(m_tokens);
        state_type parsing_progress = m_state; // Make a copy so that lock can be released safely.

        lock.unlock();

        m_cv_tokens_empty.notify_one();

        return parsing_progress == state_type::parsing_progress;
    }

    /**
     * Return the current token size threshold.  Call this only after the
     * parsing has finished.
     *
     * @return current token size threshold.
     */
    size_t token_size_threshold() const
    {
        if (m_state == state_type::parsing_progress)
            return 0;

        return m_token_size_threshold;
    }
};

}}}

#endif

/* vim:set shiftwidth=4 softtabstop=4 expandtab: */