1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
#include <unistd.h>
#include "remote_logger.hh"
#include <sys/uio.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef PDNS_CONFIG_ARGS
#include "logger.hh"
#define WE_ARE_RECURSOR
#else
#include "dolog.hh"
#endif
void CircularWriteBuffer::write(const std::string& str)
{
if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
flush();
if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
throw std::runtime_error("Full!");
uint16_t len = htons(str.size());
char* ptr = (char*)&len;
d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
d_buffer.insert(d_buffer.end(), str.begin(), str.end());
}
void CircularWriteBuffer::flush()
{
if(d_buffer.empty()) // not optional, we report EOF otherwise
return;
auto arr1 = d_buffer.array_one();
auto arr2 = d_buffer.array_two();
struct iovec iov[2];
int pos=0;
size_t total=0;
for(const auto& arr : {arr1, arr2}) {
if(arr.second) {
iov[pos].iov_base = arr.first;
iov[pos].iov_len = arr.second;
total += arr.second;
++pos;
}
}
int res = writev(d_fd, iov, pos);
if(res < 0) {
throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno)));
}
if(!res) {
throw std::runtime_error("EOF");
}
// cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
if((size_t)res == d_buffer.size())
d_buffer.clear();
else {
while(res--)
d_buffer.pop_front();
}
}
RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
{
if (!d_asyncConnect) {
if(reconnect())
d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
}
d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
}
bool RemoteLogger::reconnect()
{
if (d_socket >= 0) {
close(d_socket);
d_socket = -1;
}
try {
d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
setNonBlocking(d_socket);
SConnectWithTimeout(d_socket, d_remote, d_timeout);
}
catch(const std::exception& e) {
#ifdef WE_ARE_RECURSOR
L<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
#else
warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
#endif
return false;
}
return true;
}
void RemoteLogger::queueData(const std::string& data)
{
if(!d_writer) {
d_drops++;
return;
}
std::unique_lock<std::mutex> lock(d_mutex);
if(d_writer) {
try {
d_writer->write(data);
}
catch(std::exception& e) {
// cout << "Got exception writing: "<<e.what()<<endl;
d_drops++;
d_writer.reset();
close(d_socket);
d_socket = -1;
}
}
}
void RemoteLogger::maintenanceThread()
try
{
for(;;) {
if(d_exiting)
break;
if(d_writer) {
std::unique_lock<std::mutex> lock(d_mutex);
if(d_writer) { // check if it is still set
// cout<<"Flush"<<endl;
try {
d_writer->flush();
}
catch(std::exception& e) {
// cout<<"Flush failed!"<<endl;
d_writer.reset();
close(d_socket);
d_socket = -1;
}
}
}
else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
std::unique_lock<std::mutex> lock(d_mutex);
d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
}
sleep(d_reconnectWaitTime);
}
}
catch(std::exception& e)
{
cerr<<"Thead died on: "<<e.what()<<endl;
}
RemoteLogger::~RemoteLogger()
{
d_exiting = true;
if (d_socket >= 0) {
close(d_socket);
d_socket = -1;
}
d_thread.join();
}
|