1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import datetime
import logging
import threading
import os
import weakref
from irods import DEFAULT_CONNECTION_TIMEOUT
from irods.connection import Connection
from irods.ticket import Ticket
logger = logging.getLogger(__name__)
def attribute_from_return_value(attrname):
def deco(method):
def method_(self, *s, **kw):
ret = method(self, *s, **kw)
setattr(self, attrname, ret)
return ret
return method_
return deco
DEFAULT_APPLICATION_NAME = "python-irodsclient"
def _adjust_timeout_to_pool_default(conn):
set_timeout = conn.socket.gettimeout()
desired_value = conn.pool.connection_timeout
if desired_value == set_timeout:
return
conn.socket.settimeout(desired_value)
class Pool:
def __init__(
self, account, application_name="", connection_refresh_time=-1, session=None
):
"""
Pool( account , application_name='' )
Create an iRODS connection pool; 'account' is an irods.account.iRODSAccount instance and
'application_name' specifies the application name as it should appear in an 'ips' listing.
"""
self.session_ref = weakref.ref(session) if session is not None else lambda: None
self._thread_local = threading.local()
self.account = account
self._lock = threading.RLock()
self.active = set()
self.idle = set()
self.connection_timeout = DEFAULT_CONNECTION_TIMEOUT
self.application_name = (
os.environ.get("spOption", "")
or application_name
or DEFAULT_APPLICATION_NAME
)
if connection_refresh_time > 0:
self.refresh_connection = True
self.connection_refresh_time = connection_refresh_time
else:
self.refresh_connection = False
self.connection_refresh_time = None
@property
def _conn(self):
return getattr(self._thread_local, "_conn", None)
@_conn.setter
def _conn(self, conn_):
setattr(self._thread_local, "_conn", conn_)
@attribute_from_return_value("_conn")
def get_connection(self):
new_conn = False
with self._lock:
try:
conn = self.idle.pop()
curr_time = datetime.datetime.now()
# If 'refresh_connection' flag is True and the connection was
# created more than 'connection_refresh_time' seconds ago,
# release the connection (as its stale) and create a new one
if (
self.refresh_connection
and (curr_time - conn.create_time).total_seconds()
> self.connection_refresh_time
):
logger.debug(
f"Connection with id {id(conn)} was created more than {self.connection_refresh_time} seconds ago. "
"Releasing the connection and creating a new one."
)
# Since calling disconnect() repeatedly is safe, we call disconnect()
# here explicitly, instead of relying on the garbage collector to clean
# up the object and call disconnect(). This makes the behavior of the
# code more predictable as we are not relying on when garbage collector is called
conn.disconnect()
conn = Connection(self, self.account)
new_conn = True
logger.debug(f"Created new connection with id: {id(conn)}")
except KeyError:
conn = Connection(self, self.account)
new_conn = True
logger.debug(
f"No connection found in idle set. Created a new connection with id: {id(conn)}"
)
self.active.add(conn)
sess = self.session_ref()
if sess and sess.ticket__ and not sess.ticket_applied.get(conn, False):
Ticket._lowlevel_api_request(conn, "session", sess.ticket__)
sess.ticket_applied[conn] = True
logger.debug(f"Adding connection with id {id(conn)} to active set")
# If the connection we're about to make active was cached, it already has a socket object internal to it,
# so we potentially have to modify it to have the desired timeout.
if not new_conn:
_adjust_timeout_to_pool_default(conn)
logger.debug(f"num active: {len(self.active)}")
logger.debug(f"num idle: {len(self.idle)}")
return conn
def release_connection(self, conn, destroy=False):
with self._lock:
if conn in self.active:
self.active.remove(conn)
logger.debug(f"Removed connection with id: {id(conn)} from active set")
if not destroy:
# If 'refresh_connection' flag is True, update connection's 'last_used_time'
if self.refresh_connection:
conn.last_used_time = datetime.datetime.now()
self.idle.add(conn)
logger.debug(f"Added connection with id: {id(conn)} to idle set")
elif conn in self.idle and destroy:
logger.debug(f"Destroying connection with id: {id(conn)}")
self.idle.remove(conn)
logger.debug(f"num active: {len(self.active)}")
logger.debug(f"num idle: {len(self.idle)}")
|