1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
// vim:sw=2:ai
/*
* Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
* See COPYRIGHT.txt for details.
*/
#include <my_config.h>
#include <stdlib.h>
#include <vector>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/resource.h>
#include "hstcpsvr.hpp"
#include "hstcpsvr_worker.hpp"
#include "thread.hpp"
#include "fatal.hpp"
#include "auto_ptrcontainer.hpp"
#define DBG(x)
namespace dena {
struct worker_throbj {
worker_throbj(const hstcpsvr_worker_arg& arg)
: worker(hstcpsvr_worker_i::create(arg)) { }
void operator ()() {
worker->run();
}
hstcpsvr_worker_ptr worker;
};
struct hstcpsvr : public hstcpsvr_i, private noncopyable {
hstcpsvr(const config& c);
~hstcpsvr();
virtual std::string start_listen();
private:
hstcpsvr_shared_c cshared;
volatile hstcpsvr_shared_v vshared;
typedef thread<worker_throbj> worker_thread_type;
typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type;
threads_type threads;
std::vector<unsigned int> thread_num_conns_vec;
private:
void stop_workers();
};
namespace {
void
check_nfile(size_t nfile)
{
struct rlimit rl;
const int r = getrlimit(RLIMIT_NOFILE, &rl);
if (r != 0) {
fatal_abort("check_nfile: getrlimit failed");
}
if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) {
fprintf(stderr,
"[Warning] handlersocket: open_files_limit is too small.\n");
}
}
};
hstcpsvr::hstcpsvr(const config& c)
: cshared(), vshared()
{
vshared.shutdown = 0;
cshared.conf = c; /* copy */
if (cshared.conf["port"] == "") {
cshared.conf["port"] = "9999";
}
cshared.num_threads = cshared.conf.get_int("num_threads", 32);
cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1);
cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1);
if (cshared.sockargs.use_epoll) {
cshared.sockargs.nonblocking = 1;
}
cshared.readsize = cshared.conf.get_int("readsize", 1);
cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024);
cshared.for_write_flag = cshared.conf.get_int("for_write", 0);
cshared.plain_secret = cshared.conf.get_str("plain_secret", "");
cshared.require_auth = !cshared.plain_secret.empty();
cshared.sockargs.set(cshared.conf);
cshared.dbptr = database_i::create(c);
check_nfile(cshared.num_threads * cshared.nb_conn_per_thread);
thread_num_conns_vec.resize(cshared.num_threads);
cshared.thread_num_conns = thread_num_conns_vec.empty()
? 0 : &thread_num_conns_vec[0];
}
hstcpsvr::~hstcpsvr()
{
stop_workers();
}
std::string
hstcpsvr::start_listen()
{
std::string err;
if (threads.size() != 0) {
return "start_listen: already running";
}
if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) {
return "bind: " + err;
}
DENA_VERBOSE(20, fprintf(stderr, "bind done\n"));
const size_t stack_size = std::max(
cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024);
for (long i = 0; i < cshared.num_threads; ++i) {
hstcpsvr_worker_arg arg;
arg.cshared = &cshared;
arg.vshared = &vshared;
arg.worker_id = i;
std::auto_ptr< thread<worker_throbj> > thr(
new thread<worker_throbj>(arg, stack_size));
threads.push_back_ptr(thr);
}
DENA_VERBOSE(20, fprintf(stderr, "threads created\n"));
for (size_t i = 0; i < threads.size(); ++i) {
threads[i]->start();
}
DENA_VERBOSE(20, fprintf(stderr, "threads started\n"));
return std::string();
}
void
hstcpsvr::stop_workers()
{
vshared.shutdown = 1;
for (size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
threads.clear();
}
hstcpsvr_ptr
hstcpsvr_i::create(const config& conf)
{
return hstcpsvr_ptr(new hstcpsvr(conf));
}
};
|