File: remote_logger.cc

package info (click to toggle)
pdns-recursor 4.0.4-1+deb9u3~bpo8+1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 5,484 kB
  • sloc: cpp: 36,380; sh: 11,771; makefile: 305; xml: 37
file content (125 lines) | stat: -rw-r--r-- 3,103 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <unistd.h>
#include "remote_logger.hh"
#include "config.h"
#ifdef PDNS_CONFIG_ARGS
#include "logger.hh"
#define WE_ARE_RECURSOR
#else
#include "dolog.hh"
#endif

bool RemoteLogger::reconnect()
{
  if (d_socket >= 0) {
    close(d_socket);
    d_socket = -1;
  }
  try {
    d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
    setNonBlocking(d_socket);
    SConnectWithTimeout(d_socket, d_remote, d_timeout);
  }
  catch(const std::exception& e) {
#ifdef WE_ARE_RECURSOR
    L<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
#else
    warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
#endif
    return false;
  }
  return true;
}

bool RemoteLogger::sendData(const char* buffer, size_t bufferSize)
{
  size_t pos = 0;
  while(pos < bufferSize) {
    ssize_t written = write(d_socket, buffer + pos, bufferSize - pos);
    if (written == -1) {
      int res = errno;
      if (res == EWOULDBLOCK || res == EAGAIN) {
        return false;
      }
      else if (res != EINTR) {
        reconnect();
        return false;
      }
    }
    else if (written == 0) {
      reconnect();
      return false;
    }
    else {
      pos += (size_t) written;
    }
  }

  return true;
}

void RemoteLogger::worker()
{
  if (d_asyncConnect) {
    reconnect();
  }

  while(true) {
    std::string data;
    {
      std::unique_lock<std::mutex> lock(d_writeMutex);
      d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;});
      if (d_exiting) {
        return;
      }
      data = d_writeQueue.front();
      d_writeQueue.pop();
    }

    try {
      uint16_t len = data.length();
      len = htons(len);
      writen2WithTimeout(d_socket, &len, sizeof(len), (int) d_timeout);
      writen2WithTimeout(d_socket, data.c_str(), data.length(), (int) d_timeout);
    }
    catch(const std::runtime_error& e) {
#ifdef WE_ARE_RECURSOR
      L<<Logger::Info<<"Error sending data to remote logger "<<d_remote.toStringWithPort()<<": "<< e.what()<<endl;
#else
      vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
#endif
      while (!reconnect()) {
        sleep(d_reconnectWaitTime);
      }
    }
  }
}

void RemoteLogger::queueData(const std::string& data)
{
  {
    std::unique_lock<std::mutex> lock(d_writeMutex);
    if (d_writeQueue.size() >= d_maxQueuedEntries) {
      d_writeQueue.pop();
    }
    d_writeQueue.push(data);
  }
  d_queueCond.notify_one();
}

RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_thread(&RemoteLogger::worker, this)
{
  if (!d_asyncConnect) {
    reconnect();
  }
}

RemoteLogger::~RemoteLogger()
{
  d_exiting = true;
  if (d_socket >= 0) {
    close(d_socket);
    d_socket = -1;
  }
  d_queueCond.notify_one();
  d_thread.join();
}