1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
|
"""Manage a cache of downloaded files.
@var DECOMPRESS_EXTS: a list of file extensions that need to be decompressed
@var DECOMPRESS_FILES: a list of file names that need to be decompressed
"""
from urlparse import urlparse
import os
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.internet import defer, reactor
from twisted.trial import unittest
from twisted.web2.http import splitHostPort
from Streams import GrowingFileStream, StreamToFile
from Hash import HashObject
from apt_p2p_conf import config
DECOMPRESS_EXTS = ['.gz', '.bz2']
DECOMPRESS_FILES = ['release', 'sources', 'packages']
class CacheError(Exception):
"""Error occurred downloading a file to the cache."""
class CacheManager:
"""Manages all downloaded files and requests for cached objects.
@type cache_dir: L{twisted.python.filepath.FilePath}
@ivar cache_dir: the directory to use for storing all files
@type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
@ivar other_dirs: the other directories that have shared files in them
@type all_dirs: C{list} of L{twisted.python.filepath.FilePath}
@ivar all_dirs: all the directories that have cached files in them
@type db: L{db.DB}
@ivar db: the database to use for tracking files and hashes
@type manager: L{apt_p2p.AptP2P}
@ivar manager: the main program object to send requests to
@type scanning: C{list} of L{twisted.python.filepath.FilePath}
@ivar scanning: all the directories that are currectly being scanned or waiting to be scanned
"""
def __init__(self, cache_dir, db, manager = None):
"""Initialize the instance and remove any untracked files from the DB..
@type cache_dir: L{twisted.python.filepath.FilePath}
@param cache_dir: the directory to use for storing all files
@type db: L{db.DB}
@param db: the database to use for tracking files and hashes
@type manager: L{apt_p2p.AptP2P}
@param manager: the main program object to send requests to
(optional, defaults to not calling back with cached files)
"""
self.cache_dir = cache_dir
self.other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
self.all_dirs = self.other_dirs[:]
self.all_dirs.insert(0, self.cache_dir)
self.db = db
self.manager = manager
self.scanning = []
# Init the database, remove old files
self.db.removeUntrackedFiles(self.all_dirs)
#{ Scanning directories
def scanDirectories(self, result = None):
"""Scan the cache directories, hashing new and rehashing changed files."""
assert not self.scanning, "a directory scan is already under way"
self.scanning = self.all_dirs[:]
self._scanDirectories()
def _scanDirectories(self, result = None, walker = None):
"""Walk each directory looking for cached files.
@param result: the result of a DHT store request, not used (optional)
@param walker: the walker to use to traverse the current directory
(optional, defaults to creating a new walker from the first
directory in the L{CacheManager.scanning} list)
"""
# Need to start walking a new directory
if walker is None:
# If there are any left, get them
if self.scanning:
log.msg('started scanning directory: %s' % self.scanning[0].path)
walker = self.scanning[0].walk()
else:
log.msg('cache directory scan complete')
return
try:
# Get the next file in the directory
file = walker.next()
except StopIteration:
# No files left, go to the next directory
log.msg('done scanning directory: %s' % self.scanning[0].path)
self.scanning.pop(0)
reactor.callLater(0, self._scanDirectories)
return
# If it's not a file ignore it
if not file.isfile():
reactor.callLater(0, self._scanDirectories, None, walker)
return
# If it's already properly in the DB, ignore it
db_status = self.db.isUnchanged(file)
if db_status:
reactor.callLater(0, self._scanDirectories, None, walker)
return
# Don't hash files in the cache that are not in the DB
if self.scanning[0] == self.cache_dir:
if db_status is None:
log.msg('ignoring unknown cache file: %s' % file.path)
else:
log.msg('removing changed cache file: %s' % file.path)
file.remove()
reactor.callLater(0, self._scanDirectories, None, walker)
return
# Otherwise hash it
log.msg('start hash checking file: %s' % file.path)
hash = HashObject()
df = hash.hashInThread(file)
df.addBoth(self._doneHashing, file, walker)
def _doneHashing(self, result, file, walker):
"""If successful, add the hashed file to the DB and inform the main program."""
if isinstance(result, HashObject):
log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
# Only set a URL if this is a downloaded file
url = None
if self.scanning[0] == self.cache_dir:
url = 'http:/' + file.path[len(self.cache_dir.path):]
# Store the hashed file in the database
new_hash = self.db.storeFile(file, result.digest(), True,
''.join(result.pieceDigests()))
# Tell the main program to handle the new cache file
df = self.manager.new_cached_file(file, result, new_hash, url, True)
if df is None:
reactor.callLater(0, self._scanDirectories, None, walker)
else:
df.addBoth(self._scanDirectories, walker)
else:
# Must have returned an error
log.msg('hash check of %s failed' % file.path)
log.err(result)
reactor.callLater(0, self._scanDirectories, None, walker)
#{ Downloading files
def save_file(self, response, hash, url):
"""Save a downloaded file to the cache and stream it.
@type response: L{twisted.web2.http.Response}
@param response: the response from the download
@type hash: L{Hash.HashObject}
@param hash: the hash object containing the expected hash for the file
@param url: the URI of the actual mirror request
@rtype: L{twisted.web2.http.Response}
@return: the final response from the download
"""
if response.code != 200:
log.msg('File was not found (%r): %s' % (response, url))
return response
log.msg('Returning file: %s' % url)
# Set the destination path for the file
parsed = urlparse(url)
destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
# Make sure there's a free place for the file
if destFile.exists():
log.msg('File already exists, removing: %s' % destFile.path)
destFile.remove()
if not destFile.parent().exists():
destFile.parent().makedirs()
# Determine whether it needs to be decompressed and how
root, ext = os.path.splitext(destFile.basename())
if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
ext = ext.lower()
decFile = destFile.sibling(root)
log.msg('Decompressing to: %s' % decFile.path)
if decFile.exists():
log.msg('File already exists, removing: %s' % decFile.path)
decFile.remove()
else:
ext = None
decFile = None
# Create the new stream from the old one.
orig_stream = response.stream
f = destFile.open('w+')
new_stream = GrowingFileStream(f, orig_stream.length)
hash.new()
df = StreamToFile(hash, orig_stream, f, notify = new_stream.updateAvailable,
decompress = ext, decFile = decFile).run()
df.addCallback(self._save_complete, url, destFile, new_stream,
response.headers.getHeader('Last-Modified'), decFile)
df.addErrback(self._save_error, url, destFile, new_stream, decFile)
response.stream = new_stream
# Return the modified response with the new stream
return response
def _save_complete(self, hash, url, destFile, destStream = None,
modtime = None, decFile = None):
"""Update the modification time and inform the main program.
@type hash: L{Hash.HashObject}
@param hash: the hash object containing the expected hash for the file
@param url: the URI of the actual mirror request
@type destFile: C{twisted.python.FilePath}
@param destFile: the file where the download was written to
@type destStream: L{Streams.GrowingFileStream}
@param destStream: the stream to notify that all data is available
@type modtime: C{int}
@param modtime: the modified time of the cached file (seconds since epoch)
(optional, defaults to not setting the modification time of the file)
@type decFile: C{twisted.python.FilePath}
@param decFile: the file where the decompressed download was written to
(optional, defaults to the file not having been compressed)
"""
result = hash.verify()
if result or result is None:
if destStream:
destStream.allAvailable()
if modtime:
os.utime(destFile.path, (modtime, modtime))
if result:
log.msg('Hashes match: %s' % url)
dht = True
else:
log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
dht = False
new_hash = self.db.storeFile(destFile, hash.digest(), dht,
''.join(hash.pieceDigests()))
if self.manager:
self.manager.new_cached_file(destFile, hash, new_hash, url)
if decFile:
# Hash the decompressed file and add it to the DB
decHash = HashObject()
ext_len = len(destFile.path) - len(decFile.path)
df = decHash.hashInThread(decFile)
df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime = modtime)
df.addErrback(self._save_error, url[:-ext_len], decFile)
else:
log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
if destStream:
destStream.allAvailable(remove = True)
if decFile:
decFile.remove()
def _save_error(self, failure, url, destFile, destStream = None, decFile = None):
"""Remove the destination files."""
log.msg('Error occurred downloading %s' % url)
log.err(failure)
if destStream:
destStream.allAvailable(remove = True)
else:
destFile.restat(False)
if destFile.exists():
log.msg('Removing the incomplete file: %s' % destFile.path)
destFile.remove()
if decFile:
decFile.restat(False)
if decFile.exists():
log.msg('Removing the incomplete file: %s' % decFile.path)
decFile.remove()
def save_error(self, failure, url):
"""An error has occurred in downloading or saving the file"""
log.msg('Error occurred downloading %s' % url)
log.err(failure)
return failure
class TestMirrorManager(unittest.TestCase):
"""Unit tests for the mirror manager."""
timeout = 20
pending_calls = []
client = None
def setUp(self):
self.client = CacheManager(FilePath('/tmp/.apt-p2p'))
def tearDown(self):
for p in self.pending_calls:
if p.active():
p.cancel()
self.client = None
|