1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2005-2010 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
File and path utilities.
"""
import os
import locale
import stat
import fnmatch
def write_file (filename, content, backup=False, callback=None):
"""Overwrite a possibly existing file with new content. Do this
in a manner that does not leave truncated or broken files behind.
@param filename: name of file to write
@type filename: string
@param content: file content to write
@type content: string
@param backup: if backup file should be left
@type backup: bool
@param callback: non-default storage function
@type callback: None or function taking two parameters (fileobj, content)
"""
# first write in a temp file
f = file(filename+".tmp", 'wb')
if callback is None:
f.write(content)
else:
callback(f, content)
f.close()
# move orig file to backup
if os.path.exists(filename):
os.rename(filename, filename+".bak")
# move temp file to orig
os.rename(filename+".tmp", filename)
# remove backup
if not backup and os.path.exists(filename+".bak"):
os.remove(filename+".bak")
def has_module (name):
"""Test if given module can be imported.
@return: flag if import is successful
@rtype: bool
"""
try:
exec "import %s" % name
return True
except ImportError:
return False
class GlobDirectoryWalker (object):
"""A forward iterator that traverses a directory tree."""
def __init__ (self, directory, pattern="*"):
"""Set start directory and pattern matcher."""
self.stack = [directory]
self.pattern = pattern
self.files = []
self.index = 0
def __getitem__ (self, index):
"""Search for next filename."""
while True:
try:
filename = self.files[self.index]
self.index += 1
except IndexError:
# Pop next directory from stack. This effectively
# stops the iteration if stack is empty.
self.directory = self.stack.pop()
self.files = os.listdir(self.directory)
self.index = 0
else:
# got a filename
fullname = os.path.join(self.directory, filename)
if os.path.isdir(fullname) and not os.path.islink(fullname):
self.stack.append(fullname)
if fnmatch.fnmatch(filename, self.pattern):
return fullname
# alias
rglob = GlobDirectoryWalker
class Buffer (object):
"""Holds buffered data"""
def __init__ (self, empty=''):
"""Initialize buffer."""
self.empty = self.buf = empty
self.tmpbuf = []
self.pos = 0
def __len__ (self):
"""Buffer length."""
return self.pos
def write (self, data):
"""Write data to buffer."""
self.tmpbuf.append(data)
self.pos += len(data)
def flush (self, overlap=0):
"""Flush buffered data and return it."""
self.buf += self.empty.join(self.tmpbuf)
self.tmpbuf = []
if overlap and overlap < self.pos:
data = self.buf[:-overlap]
self.buf = self.buf[-overlap:]
else:
data = self.buf
self.buf = self.empty
return data
def get_mtime (filename):
"""Return modification time of filename or zero on errors."""
try:
return os.stat(filename)[stat.ST_MTIME]
except os.error:
return 0
# http://developer.gnome.org/doc/API/2.0/glib/glib-running.html
if "G_FILENAME_ENCODING" in os.environ:
FSCODING = os.environ["G_FILENAME_ENCODING"].split(",")[0]
if FSCODING == "@locale":
FSCODING = locale.getpreferredencoding()
elif "G_BROKEN_FILENAMES" in os.environ:
FSCODING = locale.getpreferredencoding()
else:
FSCODING = "utf-8"
def pathencode (path):
if isinstance(path, unicode) and not os.path.supports_unicode_filenames:
path = path.encode(FSCODING, "replace")
return path
# cache for modified check {absolute filename -> mtime}
_mtime_cache = {}
def has_changed (filename):
"""Check if filename has changed since the last check. If this
is the first check, assume the file is changed."""
key = os.path.abspath(filename)
mtime = get_mtime(key)
if key not in _mtime_cache:
_mtime_cache[key] = mtime
return True
return mtime > _mtime_cache[key]
|