File: connection.py

package info (click to toggle)
linkchecker 5.2-2
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 3,508 kB
  • ctags: 3,805
  • sloc: python: 22,666; lex: 1,114; yacc: 785; makefile: 276; ansic: 95; sh: 68; sql: 19; awk: 4
file content (143 lines) | stat: -rw-r--r-- 4,954 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2005-2009 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Store and retrieve open connections.
"""

import time
from .. import log, LOG_CACHE
from ..decorators import synchronized
from ..lock import get_lock

_lock = get_lock("connection")
_wait_lock = get_lock("connwait")

class ConnectionPool (object):
    """Thread-safe cache, storing a set of connections for URL retrieval."""

    def __init__ (self, wait=0):
        """
        Initialize an empty connection dictionary which will have entries
        of the form::
        key -> [connection, status, expiration time]

        Connection can be any open connection object (HTTP, FTP, ...).
        Status is either 'available' or 'busy'.
        Expiration time is the point of time in seconds when this
        connection will be timed out.

        The identifier key is usually a tuple (type, host, user, pass),
        but it can be any immutable Python object.
        """
        # open connections
        # {(type, host, user, pass) -> [connection, status, expiration time]}
        self.connections = {}
        # {host -> due time}
        self.times = {}
        # {host -> wait}
        self.host_waits = {}
        if wait < 0:
            raise ValueError("negative wait value %d" % wait)
        self.wait = wait

    @synchronized(_lock)
    def host_wait (self, host, wait):
        """Set a host specific time to wait between requests."""
        if wait < 0:
            raise ValueError("negative wait value %d" % wait)
        self.host_waits[host] = wait

    @synchronized(_lock)
    def add (self, key, conn, timeout):
        """Add connection to the pool with given identifier key and timeout
        in seconds."""
        self.connections[key] = [conn, 'available', time.time() + timeout]

    @synchronized(_wait_lock)
    def wait_for_host (self, host):
        t = time.time()
        if host in self.times:
            due_time = self.times[host]
            if due_time > t:
                wait = due_time - t
                log.debug(LOG_CACHE,
                  "waiting for %.01f seconds on connection to %s", wait, host)
                time.sleep(wait)
                t = time.time()
        self.times[host] = t + self.host_waits.get(host, self.wait)

    @synchronized(_lock)
    def get (self, key):
        """
        Get open connection if available.

        @param key - connection key to look for
        @ptype key - tuple (type, host, user, pass)
        @return: Open connection object or None if none is available.
        @rtype None or FTPConnection or HTTP(S)Connection
        """
        if key not in self.connections:
            # not found
            return None
        conn_data = self.connections[key]
        t = time.time()
        if t > conn_data[2]:
            # timed out
            self._remove_connection(key)
            return None
        if conn_data[1] == 'busy':
            # connection is in use
            return None
        # mark busy and return
        conn_data[1] = 'busy'
        conn_data[2] = t
        return conn_data[0]

    @synchronized(_lock)
    def release (self, key):
        """Mark an open and reusable connection as available."""
        if key in self.connections:
            self.connections[key][1] = 'available'

    @synchronized(_lock)
    def remove_expired (self):
        """Remove expired connections from this pool."""
        t = time.time()
        to_delete = []
        for key, conn_data in self.connections.items():
            if conn_data[1] == 'available' and t > conn_data[2]:
                to_delete.append(key)
        for key in to_delete:
            self._remove_connection(key)

    def _remove_connection (self, key):
        """Close and remove a connection (not thread-safe, internal use
        only)."""
        conn_data = self.connections[key]
        del self.connections[key]
        try:
            conn_data[1].close()
        except Exception:
            # ignore close errors
            pass

    @synchronized(_lock)
    def clear (self):
        """Remove all connections from this cache, even if busy."""
        keys = self.connections.keys()
        for key in keys:
            self._remove_connection(key)