#!/usr/bin/python import optparse, sys, os, tempfile, re try: import readline except ImportError: pass from stat import * def show_license(*eat): print """rpl - replace strings in files Copyright (C) 2004-2005 Goran Weinholt Copyright (C) 2004 Christian Haggstrom This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ sys.exit(0) def get_files(filenames, recurse, suffixen, verbose, hidden_files): new_files = [] for filename in filenames: try: perms = os.lstat(filename) except OSError, e: sys.stderr.write("\nrpl: Unable to read permissions of %s." % filename) sys.stderr.write("\nrpl: Error: %s" % e) sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename) continue if S_ISDIR(perms.st_mode): if recurse: if verbose: sys.stderr.write("Scanning Directory: %s\n" % filename) for f in os.listdir(filename): if not hidden_files and f.startswith('.'): if verbose: sys.stderr.write("Skipping: %s (hidden)\n" % os.path.join(filename, f)) continue new_files += get_files([os.path.join(filename, f)], recurse, suffixen, verbose, hidden_files) else: if verbose: sys.stderr.write("Directory: %s skipped.\n" % filename) continue elif S_ISREG(perms.st_mode): if suffixen != [] and \ not True in [ filename.endswith(s) for s in suffixen ]: sys.stderr.write("Skipping: %s (suffix not in list)\n" % filename) continue new_files += [(filename, perms)] else: sys.stderr.write("Skipping: %s (not a regular file)\n" % filename) return new_files def unescape(s): regex = re.compile(r'\\([0-7]{1,3}|x[0-9a-fA-F]{2}|[nrtvafb\\])') return regex.sub(lambda match: eval('"%s"' % match.group()), s) def blockrepl(instream, outstream, regex, before, after, blocksize=None): patlen = len(before) sum = 0 if not blocksize: blocksize = 2*patlen tonext = '' while 1: block = instream.read(blocksize) if not block: break parts = regex.split(tonext+block) sum += len(parts)-1 lastpart = parts[-1] if lastpart: tonext = lastpart[-patlen:] parts[-1] = lastpart[:-len(tonext)] else: tonext = '' outstream.write(after.join(parts)) outstream.write(tonext) return sum def main(): # First we parse the command line arguments... usage = "usage: %prog [options] old_string new_string target_file(s)" parser = optparse.OptionParser(usage, version="%prog 1.5.2") parser.add_option("-L", "--license", action="callback", callback=show_license, help="show the software license") parser.add_option("-x", metavar="SUFFIX", action="append", dest="suffixen", default=[], help="specify file suffix to match") parser.add_option("-i", "--ignore-case", action="store_true", dest="ignore_case", default=False, help="do a case insensitive match") parser.add_option("-w", "--whole-words", action="store_true", dest="whole_words", default=False, help="whole words (old_string matches on word boundaries only)") parser.add_option("-b", "--backup", action="store_true", dest="do_backup", default=False, help="make a backup before overwriting files") parser.add_option("-q", "--quiet", action="store_true", dest="quiet", default=False, help="quiet mode") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose mode") parser.add_option("-s", "--dry-run", action="store_true", dest="dry_run", default=False, help="simulation mode") parser.add_option("-R", "--recursive", action="store_true", dest="recurse", default=False, help="recurse into subdirectories") parser.add_option("-e", "--escape", action="store_true", dest="escapes", default=False, help="expand escapes in old_string and new_string") parser.add_option("-p", "--prompt", action="store_true", dest="prompt", default=False, help="prompt before modifying each file") parser.add_option("-f", "--force", action="store_true", dest="force", default=False, help="ignore errors when trying to preserve permissions") parser.add_option("-d", "--keep-times", action="store_true", dest="keep_times", default=False, help="keep the modification times on modified files") parser.add_option("-t", "--use-tmpdir", action="store_true", dest="use_tmpdir", default=False, help="use $TMPDIR for storing temporary files") parser.add_option("-a", "--all", action="store_true", dest="hidden_files", default=False, help="do not ignore files and directories starting with .") (opts, args) = parser.parse_args() # args should now contain old_str, new_str and a list of files/dirs if len(args) < 3: parser.error("must have at least three arguments") if args[0] == "": parser.error("must have something to replace") old_str = args[0] new_str = args[1] files = args[2:] # See if all the files actually exist for file in files: if not os.path.exists(file): sys.stderr.write("\nrpl: File \"%s\" not found.\n" % file) sys.exit(os.EX_DATAERR) if new_str == "" and not opts.quiet: sys.stderr.write("Really DELETE all occurences of %s " % old_str) if opts.ignore_case: sys.stderr.write("(ignoring case)? (Y/[N]) ") else: sys.stderr.write("(case sensitive)? (Y/[N]) ") line = raw_input() if line != "" and line[0] in "nN": sys.stderr.write("\nrpl: User cancelled operation.\n") sys.exit(os.EX_TEMPFAIL) # Tell the user what is going to happen if opts.dry_run: sys.stderr.write("Simulating replacement of \"%s\" with \"%s\" " % (old_str, new_str)) else: sys.stderr.write("Replacing \"%s\" with \"%s\" " % (old_str, new_str)) if opts.ignore_case: sys.stderr.write("(ignoring case) ") else: sys.stderr.write("(case sensitive) ") if opts.whole_words: sys.stderr.write("(whole words only)\n") else: sys.stderr.write("(partial words matched)\n") if opts.dry_run and not opts.quiet: sys.stderr.write("The files listed below would be modified in a replace operation.\n") if opts.escapes: old_str = unescape(old_str) new_str = unescape(new_str) if opts.whole_words: regex = re.compile(r"(?:(?<=\s)|^)" + re.escape(old_str) + r"(?=\s|$)", opts.ignore_case and re.I or 0) else: regex = re.compile(re.escape(old_str), opts.ignore_case and re.I or 0) total_matches = 0 files = get_files(files, opts.recurse, opts.suffixen, opts.verbose, opts.hidden_files) for filename, perms in files: # Open the input file try: f = open(filename, "rb") except IOError, e: sys.stderr.write("\nrpl: Unable to open %s for reading." % fn) sys.stderr.write("\nrpl: Error: %s" % e) sys.stderr.write("\nrpl: SKIPPING %s\n\n" % fn) continue # Find out where we should put the temporary file if opts.use_tmpdir: tempfile.tempdir = None else: tempfile.tempdir = os.path.dirname(filename) # Create the output file try: o, tmp_path = tempfile.mkstemp("", ".tmp.") o = os.fdopen(o, "wb") except OSError, e: sys.stderr.write("\nrpl: Unable to create temp file.") sys.stderr.write("\nrpl: Error: %s" % e) sys.stderr.write("\nrpl: (Type \"rpl -h\" and consider \"-t\" to specify temp file location.)") sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename) continue # Set permissions and owner try: os.chown(tmp_path, perms.st_uid, perms.st_gid) os.chmod(tmp_path, perms.st_mode) except OSError, e: sys.stderr.write("\nrpl: Unable to set owner/group/perms of %s" % filename) sys.stderr.write("\nrpl: Error: %s" % e) if opts.force: sys.stderr.write("\nrpl: WARNING: New owner/group/perms may not match!\n\n") else: sys.stderr.write("\nrpl: SKIPPING %s!\n\n" % filename) os.unlink(tmp_path) continue if opts.verbose and not opts.dry_run: sys.stderr.write("Processing: %s\n" % filename) elif not opts.quiet and not opts.dry_run: sys.stderr.write(".") sys.stderr.flush() # Do the actual work now matches = blockrepl(f, o, regex, old_str, new_str, 1024) f.close() o.close() if matches == 0: os.unlink(tmp_path) continue if opts.dry_run: try: fn = os.path.realpath(filename) except OSError, e: fn = filename if not opts.quiet: sys.stderr.write(" %s\n" % fn) os.unlink(tmp_path) total_matches += matches continue if opts.prompt: sys.stderr.write("\nSave '%s' ? ([Y]/N) " % filename) line = "" while line == "" or line[0] not in "Yy\nnN": line = raw_input() if line[0] in "nN": sys.stderr.write("Not Saved.\n") os.unlink(tmp_path) continue sys.stderr.write("Saved.\n") if opts.do_backup: try: os.rename(filename, filename + "~") except OSError, e: sys.stderr.write("rpl: An error occured renaming %s to %s." % (filename, filename + "~")) sys.stderr.write("\nrpl: Error: %s" % e) continue # Rename the file try: os.rename(tmp_path, filename) except OSError, e: sys.stderr.write("rpl: An error occured replacing %s with %s." % (tmp_path, filename)) sys.stderr.write("\nrpl: Error: %s" % e) os.unlink(tmp_path) continue # Restore the times if opts.keep_times: try: os.utime(filename, (perms.st_atime, perms.st_mtime)) except OSError, e: sys.stderr.write("\nrpl: An error occured setting the access time and mod time of the file %s.", filename) sys.stderr.write("\nrpl: Error: %s" % e) total_matches += matches # We're about to exit, give a summary if not opts.quiet: if opts.dry_run: sys.stderr.write("\nA Total of %lu matches found in %lu file%s searched." % (total_matches, len(files), len(files) != 1 and "s" or "")) sys.stderr.write("\nNone replaced (simulation mode).\n") else: sys.stderr.write("\nA Total of %lu matches replaced in %lu file%s searched.\n" % (total_matches, len(files), len(files) != 1 and "s" or "")) if __name__ == "__main__": main()