1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2005-2009 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Store and retrieve open connections.
"""
import time
from .. import log, LOG_CACHE
from ..decorators import synchronized
from ..lock import get_lock
_lock = get_lock("connection")
_wait_lock = get_lock("connwait")
class ConnectionPool (object):
"""Thread-safe cache, storing a set of connections for URL retrieval."""
def __init__ (self, wait=0):
"""
Initialize an empty connection dictionary which will have entries
of the form::
key -> [connection, status, expiration time]
Connection can be any open connection object (HTTP, FTP, ...).
Status is either 'available' or 'busy'.
Expiration time is the point of time in seconds when this
connection will be timed out.
The identifier key is usually a tuple (type, host, user, pass),
but it can be any immutable Python object.
"""
# open connections
# {(type, host, user, pass) -> [connection, status, expiration time]}
self.connections = {}
# {host -> due time}
self.times = {}
# {host -> wait}
self.host_waits = {}
if wait < 0:
raise ValueError("negative wait value %d" % wait)
self.wait = wait
@synchronized(_lock)
def host_wait (self, host, wait):
"""Set a host specific time to wait between requests."""
if wait < 0:
raise ValueError("negative wait value %d" % wait)
self.host_waits[host] = wait
@synchronized(_lock)
def add (self, key, conn, timeout):
"""Add connection to the pool with given identifier key and timeout
in seconds."""
self.connections[key] = [conn, 'available', time.time() + timeout]
@synchronized(_wait_lock)
def wait_for_host (self, host):
t = time.time()
if host in self.times:
due_time = self.times[host]
if due_time > t:
wait = due_time - t
log.debug(LOG_CACHE,
"waiting for %.01f seconds on connection to %s", wait, host)
time.sleep(wait)
t = time.time()
self.times[host] = t + self.host_waits.get(host, self.wait)
@synchronized(_lock)
def get (self, key):
"""
Get open connection if available.
@param key - connection key to look for
@ptype key - tuple (type, host, user, pass)
@return: Open connection object or None if none is available.
@rtype None or FTPConnection or HTTP(S)Connection
"""
if key not in self.connections:
# not found
return None
conn_data = self.connections[key]
t = time.time()
if t > conn_data[2]:
# timed out
self._remove_connection(key)
return None
if conn_data[1] == 'busy':
# connection is in use
return None
# mark busy and return
conn_data[1] = 'busy'
conn_data[2] = t
return conn_data[0]
@synchronized(_lock)
def release (self, key):
"""Mark an open and reusable connection as available."""
if key in self.connections:
self.connections[key][1] = 'available'
@synchronized(_lock)
def remove_expired (self):
"""Remove expired connections from this pool."""
t = time.time()
to_delete = []
for key, conn_data in self.connections.items():
if conn_data[1] == 'available' and t > conn_data[2]:
to_delete.append(key)
for key in to_delete:
self._remove_connection(key)
def _remove_connection (self, key):
"""Close and remove a connection (not thread-safe, internal use
only)."""
conn_data = self.connections[key]
del self.connections[key]
try:
conn_data[1].close()
except Exception:
# ignore close errors
pass
@synchronized(_lock)
def clear (self):
"""Remove all connections from this cache, even if busy."""
keys = self.connections.keys()
for key in keys:
self._remove_connection(key)
|