1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2006-2008 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
Store and retrieve cookies.
"""
from linkcheck.decorators import synchronized
import linkcheck.log
import linkcheck.lock
import linkcheck.cookies
_lock = linkcheck.lock.get_lock("cookie")
class CookieJar (object):
"""
Cookie storage, implementing the default cookie handling policy for
LinkChecker.
"""
def __init__ (self):
self.cache = {}
@synchronized(_lock)
def add (self, headers, scheme, host, path):
"""
Parse cookie values, add to cache.
"""
jar = set()
for h in headers.getallmatchingheaders("Set-Cookie"):
# RFC 2109 (Netscape) cookie type
try:
c = linkcheck.cookies.NetscapeCookie(h, scheme, host, path)
jar.add(c)
except linkcheck.cookies.CookieError:
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Invalid cookie header for %s:%s%s: %r", scheme, host, path, h)
for h in headers.getallmatchingheaders("Set-Cookie2"):
# RFC 2965 cookie type
try:
c = linkcheck.cookies.Rfc2965Cookie(h, scheme, host, path)
jar.add(c)
except linkcheck.cookies.CookieError:
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Invalid cookie2 header for %s:%s%s: %r", scheme, host, path, h)
self.cache[host] = jar
return jar
@synchronized(_lock)
def get (self, scheme, host, port, path):
"""
Cookie cache getter function.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Get cookies for host %r path %r", host, path)
jar = self.cache.setdefault(host, set())
return [x for x in jar if x.check_expired() and \
x.is_valid_for(scheme, host, port, path)]
@synchronized(_lock)
def __str__ (self):
return "<CookieJar with %s>" % self.cache
|