1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
// *****************************************************************************
// * This file is part of the FreeFileSync project. It is distributed under *
// * GNU General Public License: https://www.gnu.org/licenses/gpl-3.0 *
// * Copyright (C) Zenju (zenju AT freefilesync DOT org) - All Rights Reserved *
// *****************************************************************************
#ifndef STREAM_BUFFER_H_08492572089560298
#define STREAM_BUFFER_H_08492572089560298
#include <thread>
//#include <condition_variable>
#include "ring_buffer.h"
#include "string_tools.h"
//#include "thread.h"
namespace zen
{
/* implement streaming API on top of libcurl's icky callback-based design
+ curl uses READBUFFER_SIZE download buffer size, but returns via a retarded sendf.c::chop_write() writing in small junks of CURL_MAX_WRITE_SIZE (16 kB)
=> support copying arbitrarily-large files: https://freefilesync.org/forum/viewtopic.php?t=4471
=> maximum performance through async processing (prefetching + output buffer!)
=> cost per worker thread creation ~ 1/20 ms */
class AsyncStreamBuffer
{
public:
explicit AsyncStreamBuffer(size_t capacity) { ringBuf_.reserve(capacity); }
//context of input thread, blocking
size_t read(void* buffer, size_t bytesToRead) //throw <write error>; return "bytesToRead" bytes unless end of stream!
{
std::unique_lock dummy(lockStream_);
const auto bufStart = buffer;
while (bytesToRead > 0)
{
const size_t bytesRead = tryReadImpl(dummy, buffer, bytesToRead); //throw <write error>
if (bytesRead == 0) //end of file
break;
conditionBytesRead_.notify_all();
buffer = static_cast<std::byte*>(buffer) + bytesRead;
bytesToRead -= bytesRead;
}
return static_cast<std::byte*>(buffer) -
static_cast<std::byte*>(bufStart);
}
//context of input thread, blocking
size_t tryRead(void* buffer, size_t bytesToRead) //throw <write error>; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0!
{
size_t bytesRead = 0;
{
std::unique_lock dummy(lockStream_);
bytesRead = tryReadImpl(dummy, buffer, bytesToRead);
}
if (bytesRead > 0)
conditionBytesRead_.notify_all(); //...*outside* the lock
return bytesRead;
}
//context of output thread, blocking
void write(const void* buffer, size_t bytesToWrite) //throw <read error>
{
std::unique_lock dummy(lockStream_);
while (bytesToWrite > 0)
{
const size_t bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite); //throw <read error>
conditionBytesWritten_.notify_all();
buffer = static_cast<const std::byte*>(buffer) + bytesWritten;
bytesToWrite -= bytesWritten;
}
}
//context of output thread, blocking
size_t tryWrite(const void* buffer, size_t bytesToWrite) //throw <read error>; may return short! CONTRACT: bytesToWrite > 0
{
size_t bytesWritten = 0;
{
std::unique_lock dummy(lockStream_);
bytesWritten = tryWriteWhileImpl(dummy, buffer, bytesToWrite);
}
conditionBytesWritten_.notify_all(); //...*outside* the lock
return bytesWritten;
}
//context of output thread
void closeStream()
{
{
std::lock_guard dummy(lockStream_);
assert(!eof_ && !errorWrite_);
eof_ = true;
}
conditionBytesWritten_.notify_all();
}
//context of input thread
void setReadError(const std::exception_ptr& error)
{
{
std::lock_guard dummy(lockStream_);
assert(error && !errorRead_);
if (!errorRead_)
errorRead_ = error;
}
conditionBytesRead_.notify_all();
}
//context of output thread
void setWriteError(const std::exception_ptr& error)
{
{
std::lock_guard dummy(lockStream_);
assert(error && !errorWrite_);
if (!errorWrite_)
errorWrite_ = error;
}
conditionBytesWritten_.notify_all();
}
#if 0
//function not needed: when writing is completed successfully, no further error can occur!
// => caveat: writing is NOT done (yet) when closeStream() is called!
//context of *output* thread
void checkReadErrors() //throw <read error>
{
std::lock_guard dummy(lockStream_);
if (errorRead_)
std::rethrow_exception(errorRead_); //throw <read error>
}
//function not needed: when EOF is reached (without errors), reading is done => no further error can occur!
//context of *input* thread
void checkWriteErrors() //throw <write error>
{
std::lock_guard dummy(lockStream_);
if (errorWrite_)
std::rethrow_exception(errorWrite_); //throw <write error>
}
#endif
uint64_t getTotalBytesWritten() const { return totalBytesWritten_; }
uint64_t getTotalBytesRead () const { return totalBytesRead_; }
private:
AsyncStreamBuffer (const AsyncStreamBuffer&) = delete;
AsyncStreamBuffer& operator=(const AsyncStreamBuffer&) = delete;
//context of input thread, blocking
size_t tryReadImpl(std::unique_lock<std::mutex>& ul, void* buffer, size_t bytesToRead) //throw <write error>; may return short; only 0 means EOF! CONTRACT: bytesToRead > 0!
{
if (bytesToRead == 0) //"read() with a count of 0 returns zero" => indistinguishable from end of file! => check!
throw std::logic_error(std::string(__FILE__) + '[' + numberTo<std::string>(__LINE__) + "] Contract violation!");
assert(isLocked(lockStream_));
assert(!errorRead_);
conditionBytesWritten_.wait(ul, [this] { return errorWrite_ || !ringBuf_.empty() || eof_; });
if (errorWrite_)
std::rethrow_exception(errorWrite_); //throw <write error>
const size_t junkSize = std::min(bytesToRead, ringBuf_.size());
ringBuf_.extract_front(static_cast<std::byte*>(buffer),
static_cast<std::byte*>(buffer)+ junkSize);
totalBytesRead_ += junkSize;
return junkSize;
}
//context of output thread, blocking
size_t tryWriteWhileImpl(std::unique_lock<std::mutex>& ul, const void* buffer, size_t bytesToWrite) //throw <read error>; may return short! CONTRACT: bytesToWrite > 0
{
if (bytesToWrite == 0)
throw std::logic_error(std::string(__FILE__) + '[' + numberTo<std::string>(__LINE__) + "] Contract violation!");
assert(isLocked(lockStream_));
assert(!eof_ && !errorWrite_);
/* => can't use InterruptibleThread's interruptibleWait() :(
-> AsyncStreamBuffer is used for input and output streaming
=> both AsyncStreamBuffer::write()/read() would have to implement interruptibleWait()
=> one of these usually called from main thread
=> but interruptibleWait() cannot be called from main thread! */
conditionBytesRead_.wait(ul, [this] { return errorRead_ || ringBuf_.size() < ringBuf_.capacity(); });
if (errorRead_)
std::rethrow_exception(errorRead_); //throw <read error>
const size_t junkSize = std::min(bytesToWrite, ringBuf_.capacity() - ringBuf_.size());
ringBuf_.insert_back(static_cast<const std::byte*>(buffer),
static_cast<const std::byte*>(buffer) + junkSize);
totalBytesWritten_ += junkSize;
return junkSize;
}
std::mutex lockStream_;
RingBuffer<std::byte> ringBuf_; //prefetch/output buffer
bool eof_ = false;
std::exception_ptr errorWrite_;
std::exception_ptr errorRead_;
std::condition_variable conditionBytesWritten_;
std::condition_variable conditionBytesRead_;
std::atomic<uint64_t> totalBytesWritten_{0}; //std:atomic is uninitialized by default!
std::atomic<uint64_t> totalBytesRead_ {0}; //
};
}
#endif //STREAM_BUFFER_H_08492572089560298
|