1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
|
#include "DatabaseConnectionPool.h"
// required for MYSQL error constants
#include <errmsg.h>
struct DatabaseConnectionPool::DbConnection
{
MYSQL *db;
omni_semaphore the_sema;
};
int DatabaseConnectionPool::conn_pool_size;
DatabaseConnectionPool::DatabaseConnectionPool(Tango::DeviceImpl *dev,
const char* mysql_user,
const char* mysql_password,
const char* mysql_host,
const char* mysql_db_name)
: Tango::LogAdapter(dev),
m_mysql_svr_version(0)
{
m_conn_pool = new DbConnection[conn_pool_size];
for (int loop = 0;loop < conn_pool_size;loop++)
m_conn_pool[loop].db = NULL;
create_connection_pool(mysql_user, mysql_password, mysql_host, mysql_db_name);
}
DatabaseConnectionPool::~DatabaseConnectionPool()
{
for (int loop = 0;loop < conn_pool_size;loop++)
{
if (m_conn_pool[loop].db != NULL)
mysql_close(m_conn_pool[loop].db);
}
delete [] m_conn_pool;
}
//+------------------------------------------------------------------
/**
* method: create_connection_pool()
*
* description: Create the MySQL connections pool
*
*/
//+------------------------------------------------------------------
void DatabaseConnectionPool::create_connection_pool(const char *mysql_user,
const char *mysql_password,
const char *mysql_host,
const char *mysql_db_name)
{
//
// Check on provided MySQl user and password
//
if (mysql_user != NULL && mysql_password != NULL)
{
WARN_STREAM << "DatabaseConnectionPool::create_connection_pool(): mysql database user = " << mysql_user
<< " , password = " << mysql_password << std::endl;
}
const char *host;
std::string my_host;
std::string ho,port;
unsigned int port_num = 0;
if (mysql_host != NULL)
{
my_host = mysql_host;
WARN_STREAM << "DatabaseConnectionPool::create_connection_pool(): mysql host = " << mysql_host << std::endl;
std::string::size_type pos = my_host.find(':');
if (pos != std::string::npos)
{
ho = my_host.substr(0,pos);
pos++;
port = my_host.substr(pos);
std::stringstream ss(port);
ss >> port_num;
if (!ss)
port_num = 0;
host = ho.c_str();
}
else
host = my_host.c_str();
WARN_STREAM << "DatabaseConnectionPool::create_connection_pool(): mysql host = " << host << ", port = " << port_num << std::endl;
}
else
host = NULL;
for (int loop = 0;loop < conn_pool_size;loop++)
{
base_connect(loop);
//
// Inmplement a retry. On some OS (Ubuntu 10.10), it may happens that MySQl needs some time to start.
// This retry should cover this case
// We also have to support case when this server is started while mysql is not ready yet
// (this has been experienced on Ubuntu after a reboot when the ureadahead cache being invalidated
// by a package installing file in /etc/init.d
// Bloody problem!!!
//
WARN_STREAM << "Going to connect to MySQL for conn. " << loop << std::endl;
if (!mysql_real_connect(m_conn_pool[loop].db, host, mysql_user, mysql_password, mysql_db_name, port_num, NULL, CLIENT_MULTI_STATEMENTS | CLIENT_FOUND_ROWS))
{
if (loop == 0)
{
int retry = 5;
while (retry > 0)
{
sleep(1);
int db_err = mysql_errno(m_conn_pool[loop].db);
WARN_STREAM << "Connection to MySQL failed with error " << db_err << std::endl;
if (db_err == CR_CONNECTION_ERROR || db_err == CR_CONN_HOST_ERROR)
{
mysql_close(m_conn_pool[loop].db);
m_conn_pool[loop].db = NULL;
base_connect(loop);
}
WARN_STREAM << "Going to retry to connect to MySQL for connection " << loop << std::endl;
if (!mysql_real_connect(m_conn_pool[loop].db, host, mysql_user, mysql_password, mysql_db_name, port_num, NULL, CLIENT_MULTI_STATEMENTS | CLIENT_FOUND_ROWS))
{
WARN_STREAM << "Connection to MySQL (re-try) failed with error " << mysql_errno(m_conn_pool[loop].db) << std::endl;
retry--;
if (retry == 0)
{
WARN_STREAM << "Throw exception because no MySQL connection possible after 5 re-tries" << std::endl;
TangoSys_MemStream out_stream;
out_stream << "Failed to connect to TANGO database (error = " << mysql_error(m_conn_pool[loop].db) << ")" << std::ends;
Tango::Except::throw_exception("CANNOT_CONNECT_MYSQL",
out_stream.str(),
"DatabaseConnectionPool::init_device()");
}
}
else
{
WARN_STREAM << "MySQL connection succeed after retry" << std::endl;
retry = 0;
}
}
}
else
{
WARN_STREAM << "Failed to connect to MySQL for conn. " << loop << ". No re-try in this case" << std::endl;
TangoSys_MemStream out_stream;
out_stream << "Failed to connect to TANGO database (error = " << mysql_error(m_conn_pool[loop].db) << ")" << std::ends;
Tango::Except::throw_exception("CANNOT_CONNECT_MYSQL",
out_stream.str(),
"DatabaseConnectionPool::init_device()");
}
}
}
m_mysql_svr_version = mysql_get_server_version(m_conn_pool[0].db);
m_last_sem_wait = 0;
}
//+------------------------------------------------------------------
/**
* method: base_connect()
*
* description: Basic action to build a Mysql connection
*
*/
//+------------------------------------------------------------------
void DatabaseConnectionPool::base_connect(int loop)
{
//
// Initialise mysql database structure and connect to TANGO database
//
m_conn_pool[loop].db = mysql_init(m_conn_pool[loop].db);
mysql_options(m_conn_pool[loop].db,MYSQL_READ_DEFAULT_GROUP,"client");
my_bool my_auto_reconnect=1;
if (mysql_options(m_conn_pool[loop].db,MYSQL_OPT_RECONNECT,&my_auto_reconnect) !=0)
{
ERROR_STREAM << "DatabaseConnectionPool: error setting mysql auto reconnection: " << mysql_error(m_conn_pool[loop].db) << std::endl;
}
else
{
WARN_STREAM << "DatabaseConnectionPool: set mysql auto reconnect to true" << std::endl;
}
}
//+----------------------------------------------------------------------------
//
// method : DatabaseConnectionPool::get_connection()
//
// description : Get a MySQl connection from the connection pool
//
//-----------------------------------------------------------------------------
int DatabaseConnectionPool::get_connection()
{
//
// Get a MySQL connection and lock it
// If none available, wait for one
//
int loop = 0;
while (m_conn_pool[loop].the_sema.trywait() == 0)
{
loop++;
if (loop == conn_pool_size)
{
int sem_to_wait;
{
omni_mutex_lock oml(m_sem_wait_mutex);
sem_to_wait = m_last_sem_wait++;
if (m_last_sem_wait == conn_pool_size)
m_last_sem_wait = 0;
}
loop = sem_to_wait;
WARN_STREAM << "Waiting for one free MySQL connection on semaphore " << loop << std::endl;
m_conn_pool[loop].the_sema.wait();
break;
}
}
return loop;
}
void DatabaseConnectionPool::release_connection(int con_nb)
{
m_conn_pool[con_nb].the_sema.post();
}
MYSQL* DatabaseConnectionPool::GetDatabase(int con_nb)
{
DEBUG_STREAM << "Using MySQL connection with semaphore " << con_nb << std::endl;
auto db = m_conn_pool[con_nb].db;
auto err = mysql_ping(db);
if(err)
{
ERROR_STREAM << "DatabaseConnectionPool::GetDatabase: Error pinging the database: " << mysql_error(db);
}
return db;
}
void DatabaseConnectionPool::set_conn_pool_size(int si)
{
conn_pool_size = si;
}
|