1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
/*
* Copyright (c) 2021, 2025, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms,
* as designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef PLUGIN_X_CLIENT_XCYCLIC_BUFFER_H_
#define PLUGIN_X_CLIENT_XCYCLIC_BUFFER_H_
#include <algorithm>
#include <memory>
#include "my_inttypes.h"
namespace xcl {
class Cyclic_buffer {
public:
Cyclic_buffer(const uint64_t size) { change_size(size); }
void change_size(const uint64_t size) {
m_buffer_size = size;
m_buffer.reset(new uint8_t[size]);
m_buffer_offset = 0;
m_buffer_data_in = 0;
}
void put(const uint8_t *data_in, uint64_t size) {
assert(size <= space_left());
uint64_t out_size;
uint8_t *out_ptr;
while (size) {
begin_direct_update(&out_ptr, &out_size);
const auto copy_block_size = std::min(size, out_size);
memcpy(out_ptr, data_in, copy_block_size);
end_direct_update(copy_block_size);
size -= copy_block_size;
out_ptr += copy_block_size;
}
}
uint64_t get(uint8_t *buffer_data_out, uint64_t buffer_data_size) {
const auto copy_without_roll =
std::min(buffer_data_size, used_without_roll());
memcpy(buffer_data_out, m_buffer.get() + m_buffer_offset,
copy_without_roll);
buffer_data_size -= copy_without_roll;
buffer_data_out += copy_without_roll;
m_buffer_data_in -= copy_without_roll;
m_buffer_offset = (m_buffer_offset + copy_without_roll) % m_buffer_size;
const auto copy_roll = std::min(buffer_data_size, m_buffer_data_in);
memcpy(buffer_data_out, m_buffer.get() + m_buffer_offset, copy_roll);
m_buffer_data_in -= copy_roll;
m_buffer_offset = (m_buffer_offset + copy_roll) % m_buffer_size;
return copy_roll + copy_without_roll;
}
uint64_t space_left() const { return m_buffer_size - m_buffer_data_in; }
uint64_t space_used() const { return m_buffer_data_in; }
bool begin_direct_update(uint8_t **out_ptr, uint64_t *out_size) {
const uint64_t data_end_offset =
(m_buffer_offset + m_buffer_data_in) % m_buffer_size;
*out_size = (m_buffer_offset > data_end_offset)
? m_buffer_offset - data_end_offset
: m_buffer_size - data_end_offset;
*out_ptr = m_buffer.get() + data_end_offset;
if (m_buffer_data_in == m_buffer_size) *out_size = 0;
return *out_size > 0;
}
void end_direct_update(uint64_t commit_size) {
assert(commit_size <= space_left());
m_buffer_data_in = m_buffer_data_in + commit_size;
}
private:
uint64_t used_without_roll() const {
const auto to_end = m_buffer_size - m_buffer_offset;
return std::min(to_end, m_buffer_data_in);
}
uint64_t m_buffer_size;
std::unique_ptr<uint8_t[]> m_buffer;
uint64_t m_buffer_offset{0};
uint64_t m_buffer_data_in{0};
};
} // namespace xcl
#endif // PLUGIN_X_CLIENT_XCYCLIC_BUFFER_H_
|