1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
|
#!/usr/bin/env python
import optparse, sys, os, tempfile, re, locale
try: import readline
except ImportError: pass
from stat import *
def show_license(*eat):
print ("""rpl - replace strings in files
Copyright (C) 2004-2005 Goran Weinholt <weinholt@debian.org>
Copyright (C) 2004 Christian Haggstrom <chm@c00.info>
Copyright (C) 2016 Kevin Coyner <kcoyner@debian.org>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
""")
sys.exit(0)
def get_files(filenames, recurse, suffixen, verbose, hidden_files):
new_files = []
for filename in filenames:
try:
perms = os.lstat(filename)
except OSError as e:
sys.stderr.write("\nrpl: Unable to read permissions of %s." % filename)
sys.stderr.write("\nrpl: Error: %s" % e)
sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename)
continue
if S_ISDIR(perms.st_mode):
if recurse:
if verbose:
sys.stderr.write("Scanning Directory: %s\n" % filename)
for f in os.listdir(filename):
if not hidden_files and f.startswith('.'):
if verbose:
sys.stderr.write("Skipping: %s (hidden)\n"
% os.path.join(filename, f))
continue
new_files += get_files([os.path.join(filename, f)],
recurse, suffixen, verbose, hidden_files)
else:
if verbose:
sys.stderr.write("Directory: %s skipped.\n" % filename)
continue
elif S_ISREG(perms.st_mode):
if suffixen != [] and \
not True in [ filename.endswith(s) for s in suffixen ]:
sys.stderr.write("Skipping: %s (suffix not in list)\n" % filename)
continue
new_files += [(filename, perms)]
else:
sys.stderr.write("Skipping: %s (not a regular file)\n" % filename)
return new_files
def unescape(s):
regex = re.compile(r'\\([0-7]{1,3}|x[0-9a-fA-F]{2}|[nrtvafb\\])')
return regex.sub(lambda match: eval('"%s"' % match.group()), s)
def blockrepl(instream, outstream, regex, before, after, blocksize=None):
encoding = locale.getdefaultlocale()[1]
patlen = len(before)
sum = 0
if not blocksize: blocksize = 2*patlen
tonext = u''
while 1:
block = instream.read(blocksize)
if not block: break
block = block.decode(encoding='utf-8', errors='ignore')
parts = regex.split(tonext+block)
sum += len(parts)-1
lastpart = parts[-1]
if lastpart:
tonext = lastpart[-patlen:]
parts[-1] = lastpart[:-len(tonext)]
else:
tonext = u''
if sys.version_info < (3,):
parts = after.join(parts)
else:
parts = (after.join(parts)).encode(encoding=encoding, errors='ignore')
outstream.write(parts)
if sys.version_info < (3,):
outstream.write(tonext)
else:
outstream.write(tonext.encode(encoding=encoding, errors='ignore'))
return sum
def main():
# First we parse the command line arguments...
usage = "usage: %prog [options] old_string new_string target_file(s)"
parser = optparse.OptionParser(usage, version="%prog 1.5.6")
parser.add_option("-L", "--license", action="callback",
callback=show_license, help="show the software license")
parser.add_option("-x", metavar="SUFFIX",
action="append", dest="suffixen", default=[],
help="specify file suffix to match")
parser.add_option("-i", "--ignore-case",
action="store_true", dest="ignore_case", default=False,
help="do a case insensitive match")
parser.add_option("-w", "--whole-words",
action="store_true", dest="whole_words", default=False,
help="whole words (old_string matches on word boundaries only)")
parser.add_option("-b", "--backup",
action="store_true", dest="do_backup", default=False,
help="make a backup before overwriting files")
parser.add_option("-q", "--quiet",
action="store_true", dest="quiet", default=False,
help="quiet mode")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="verbose mode")
parser.add_option("-s", "--dry-run",
action="store_true", dest="dry_run", default=False,
help="simulation mode")
parser.add_option("-R", "--recursive",
action="store_true", dest="recurse", default=False,
help="recurse into subdirectories")
parser.add_option("-e", "--escape",
action="store_true", dest="escapes", default=False,
help="expand escapes in old_string and new_string")
parser.add_option("-p", "--prompt",
action="store_true", dest="prompt", default=False,
help="prompt before modifying each file")
parser.add_option("-f", "--force",
action="store_true", dest="force", default=False,
help="ignore errors when trying to preserve permissions")
parser.add_option("-d", "--keep-times",
action="store_true", dest="keep_times", default=False,
help="keep the modification times on modified files")
parser.add_option("-t", "--use-tmpdir",
action="store_true", dest="use_tmpdir", default=False,
help="use $TMPDIR for storing temporary files")
parser.add_option("-a", "--all",
action="store_true", dest="hidden_files", default=False,
help="do not ignore files and directories starting with .")
(opts, args) = parser.parse_args()
# args should now contain old_str, new_str and a list of files/dirs
if len(args) < 3:
parser.error("must have at least three arguments")
if args[0] == "":
parser.error("must have something to replace")
old_str = args[0]
new_str = args[1]
files = args[2:]
# See if all the files actually exist
for file in files:
if not os.path.exists(file):
sys.stderr.write("\nrpl: File \"%s\" not found.\n" % file)
sys.exit(os.EX_DATAERR)
if new_str == "" and not opts.quiet:
sys.stderr.write("Really DELETE all occurences of %s " % old_str)
if opts.ignore_case:
sys.stderr.write("(ignoring case)? (Y/[N]) ")
else:
sys.stderr.write("(case sensitive)? (Y/[N]) ")
line = raw_input()
if line != "" and line[0] in "nN":
sys.stderr.write("\nrpl: User cancelled operation.\n")
sys.exit(os.EX_TEMPFAIL)
# Tell the user what is going to happen
if opts.dry_run:
sys.stderr.write("Simulating replacement of \"%s\" with \"%s\" "
% (old_str, new_str))
else:
sys.stderr.write("Replacing \"%s\" with \"%s\" " % (old_str, new_str))
if opts.ignore_case: sys.stderr.write("(ignoring case) ")
else: sys.stderr.write("(case sensitive) ")
if opts.whole_words: sys.stderr.write("(whole words only)\n")
else: sys.stderr.write("(partial words matched)\n")
if opts.dry_run and not opts.quiet:
sys.stderr.write("The files listed below would be modified in a replace operation.\n")
if opts.escapes:
old_str = unescape(old_str)
new_str = unescape(new_str)
if opts.whole_words:
regex = re.compile(r"(?:(?<=\s)|^)" + re.escape(old_str) + r"(?=\s|$)",
opts.ignore_case and re.I or 0)
else:
regex = re.compile(re.escape(old_str), opts.ignore_case and re.I or 0)
total_matches = 0
files = get_files(files, opts.recurse, opts.suffixen, opts.verbose, opts.hidden_files)
for filename, perms in files:
# Open the input file
try: f = open(filename, "rb")
except IOError as e:
sys.stderr.write("\nrpl: Unable to open %s for reading." % fn)
sys.stderr.write("\nrpl: Error: %s" % e)
sys.stderr.write("\nrpl: SKIPPING %s\n\n" % fn)
continue
# Find out where we should put the temporary file
if opts.use_tmpdir: tempfile.tempdir = None
else: tempfile.tempdir = os.path.dirname(filename)
# Create the output file
try:
o, tmp_path = tempfile.mkstemp("", ".tmp.")
o = os.fdopen(o, "wb")
except OSError as e:
sys.stderr.write("\nrpl: Unable to create temp file.")
sys.stderr.write("\nrpl: Error: %s" % e)
sys.stderr.write("\nrpl: (Type \"rpl -h\" and consider \"-t\" to specify temp file location.)")
sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename)
continue
# Set permissions and owner
try:
os.chown(tmp_path, perms.st_uid, perms.st_gid)
os.chmod(tmp_path, perms.st_mode)
except OSError as e:
sys.stderr.write("\nrpl: Unable to set owner/group/perms of %s" % filename)
sys.stderr.write("\nrpl: Error: %s" % e)
if opts.force:
sys.stderr.write("\nrpl: WARNING: New owner/group/perms may not match!\n\n")
else:
sys.stderr.write("\nrpl: SKIPPING %s!\n\n" % filename)
os.unlink(tmp_path)
continue
if opts.verbose and not opts.dry_run:
sys.stderr.write("Processing: %s\n" % filename)
elif not opts.quiet and not opts.dry_run:
sys.stderr.write(".")
sys.stderr.flush()
# Do the actual work now
matches = blockrepl(f, o, regex, old_str, new_str, 1024)
f.close()
o.close()
if matches == 0:
os.unlink(tmp_path)
continue
if opts.dry_run:
try:
fn = os.path.realpath(filename)
except OSError as e:
fn = filename
if not opts.quiet: sys.stderr.write(" %s\n" % fn)
os.unlink(tmp_path)
total_matches += matches
continue
if opts.prompt:
sys.stderr.write("\nSave '%s' ? ([Y]/N) " % filename)
line = ""
while line == "" or line[0] not in "Yy\nnN":
line = raw_input()
if line[0] in "nN":
sys.stderr.write("Not Saved.\n")
os.unlink(tmp_path)
continue
sys.stderr.write("Saved.\n")
if opts.do_backup:
try: os.rename(filename, filename + "~")
except OSError as e:
sys.stderr.write("rpl: An error occured renaming %s to %s." % (filename, filename + "~"))
sys.stderr.write("\nrpl: Error: %s" % e)
continue
# Rename the file
try: os.rename(tmp_path, filename)
except OSError as e:
sys.stderr.write("rpl: An error occured replacing %s with %s." % (tmp_path, filename))
sys.stderr.write("\nrpl: Error: %s" % e)
os.unlink(tmp_path)
continue
# Restore the times
if opts.keep_times:
try: os.utime(filename, (perms.st_atime, perms.st_mtime))
except OSError as e:
sys.stderr.write("\nrpl: An error occured setting the access time and mod time of the file %s.", filename)
sys.stderr.write("\nrpl: Error: %s" % e)
total_matches += matches
# We're about to exit, give a summary
if not opts.quiet:
if opts.dry_run:
sys.stderr.write("\nA Total of %lu matches found in %lu file%s searched."
% (total_matches,
len(files),
len(files) != 1 and "s" or ""))
sys.stderr.write("\nNone replaced (simulation mode).\n")
else:
sys.stderr.write("\nA Total of %lu matches replaced in %lu file%s searched.\n"
% (total_matches,
len(files),
len(files) != 1 and "s" or ""))
if __name__ == "__main__":
main()
|