Skip to content
Snippets Groups Projects
rpl 10.4 KiB
Newer Older
Ruben Rodriguez's avatar
Ruben Rodriguez committed
#!/usr/bin/python

import optparse, sys, os, tempfile, re
try: import readline
except ImportError: pass
from stat import *

def show_license(*eat):
	print """rpl - replace strings in files
Copyright (C) 2004-2005 Goran Weinholt <weinholt@debian.org>
Copyright (C) 2004 Christian Haggstrom <chm@c00.info>

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
"""
	sys.exit(0)

def get_files(filenames, recurse, suffixen, verbose, hidden_files):
	new_files = []
	for filename in filenames:
		try:
			perms = os.lstat(filename)
		except OSError, e:
			sys.stderr.write("\nrpl: Unable to read permissions of %s."
							 % filename)
			sys.stderr.write("\nrpl: Error: %s" % e)
			sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename)
			continue

		if S_ISDIR(perms.st_mode):
			if recurse:
				if verbose:
					sys.stderr.write("Scanning Directory: %s\n" % filename)
				for f in os.listdir(filename):
					if not hidden_files and f.startswith('.'):
						if verbose:
							sys.stderr.write("Skipping: %s (hidden)\n"
											 % os.path.join(filename, f))
						continue
					new_files += get_files([os.path.join(filename, f)],
										   recurse, suffixen, verbose,
										   hidden_files)
			else:
				if verbose:
					sys.stderr.write("Directory: %s skipped.\n" % filename)
				continue
		elif S_ISREG(perms.st_mode):
			if suffixen != [] and \
				   not True in [ filename.endswith(s) for s in suffixen ]:
				sys.stderr.write("Skipping: %s (suffix not in list)\n"
								 % filename)
				continue
			new_files += [(filename, perms)]
		else:
			sys.stderr.write("Skipping: %s (not a regular file)\n"
							 % filename)
	return new_files

def unescape(s):
	regex = re.compile(r'\\([0-7]{1,3}|x[0-9a-fA-F]{2}|[nrtvafb\\])')
	return regex.sub(lambda match: eval('"%s"' % match.group()), s)

def blockrepl(instream, outstream, regex, before, after, blocksize=None):
	patlen = len(before)
	sum = 0
	if not blocksize: blocksize = 2*patlen
	tonext = ''
	while 1:
		block = instream.read(blocksize)
		if not block: break
		parts = regex.split(tonext+block)
		sum += len(parts)-1
		lastpart = parts[-1]
		if lastpart:
			tonext = lastpart[-patlen:]
			parts[-1] = lastpart[:-len(tonext)]
		else:
			tonext = ''
		outstream.write(after.join(parts))
	outstream.write(tonext)
	return sum

def main():
	# First we parse the command line arguments...
	usage = "usage: %prog [options] old_string new_string target_file(s)"
	parser = optparse.OptionParser(usage, version="%prog 1.5.2")
	parser.add_option("-L", "--license", action="callback",
					  callback=show_license, help="show the software license")
	parser.add_option("-x", metavar="SUFFIX",
					  action="append", dest="suffixen", default=[],
					  help="specify file suffix to match")
	parser.add_option("-i", "--ignore-case",
					  action="store_true", dest="ignore_case", default=False,
					  help="do a case insensitive match")
	parser.add_option("-w", "--whole-words",
					  action="store_true", dest="whole_words", default=False,
					  help="whole words (old_string matches on word boundaries only)")
	parser.add_option("-b", "--backup",
					  action="store_true", dest="do_backup", default=False,
					  help="make a backup before overwriting files")
	parser.add_option("-q", "--quiet",
					  action="store_true", dest="quiet", default=False,
					  help="quiet mode")
	parser.add_option("-v", "--verbose",
					  action="store_true", dest="verbose", default=False,
					  help="verbose mode")
	parser.add_option("-s", "--dry-run",
					  action="store_true", dest="dry_run", default=False,
					  help="simulation mode")
	parser.add_option("-R", "--recursive",
					  action="store_true", dest="recurse", default=False,
					  help="recurse into subdirectories")
	parser.add_option("-e", "--escape",
					  action="store_true", dest="escapes", default=False,
					  help="expand escapes in old_string and new_string")
	parser.add_option("-p", "--prompt",
					  action="store_true", dest="prompt", default=False,
					  help="prompt before modifying each file")
	parser.add_option("-f", "--force",
					  action="store_true", dest="force", default=False,
					  help="ignore errors when trying to preserve permissions")
	parser.add_option("-d", "--keep-times",
					  action="store_true", dest="keep_times", default=False,
					  help="keep the modification times on modified files")
	parser.add_option("-t", "--use-tmpdir",
					  action="store_true", dest="use_tmpdir", default=False,
					  help="use $TMPDIR for storing temporary files")
	parser.add_option("-a", "--all",
					  action="store_true", dest="hidden_files", default=False,
					  help="do not ignore files and directories starting with .")
	(opts, args) = parser.parse_args()

	# args should now contain old_str, new_str and a list of files/dirs
	if len(args) < 3:
		parser.error("must have at least three arguments")
	if args[0] == "":
		parser.error("must have something to replace")

	old_str = args[0]
	new_str = args[1]
	files = args[2:]

	# See if all the files actually exist
	for file in files:
		if not os.path.exists(file):
			sys.stderr.write("\nrpl: File \"%s\" not found.\n" % file)
			sys.exit(os.EX_DATAERR)

	if new_str == "" and not opts.quiet:
		sys.stderr.write("Really DELETE all occurences of %s " % old_str)
		if opts.ignore_case:
			sys.stderr.write("(ignoring case)? (Y/[N]) ")
		else:
			sys.stderr.write("(case sensitive)? (Y/[N]) ")
		line = raw_input()
		if line != "" and line[0] in "nN":
			sys.stderr.write("\nrpl:  User cancelled operation.\n")
			sys.exit(os.EX_TEMPFAIL)

	# Tell the user what is going to happen
	if opts.dry_run:
		sys.stderr.write("Simulating replacement of \"%s\" with \"%s\" "
						 % (old_str, new_str))
	else:
		sys.stderr.write("Replacing \"%s\" with \"%s\" " % (old_str, new_str))
	if opts.ignore_case: sys.stderr.write("(ignoring case) ")
	else:                sys.stderr.write("(case sensitive) ")
	if opts.whole_words: sys.stderr.write("(whole words only)\n")
	else:                sys.stderr.write("(partial words matched)\n")
	if opts.dry_run and not opts.quiet:
		sys.stderr.write("The files listed below would be modified in a replace operation.\n")

	if opts.escapes:
		old_str = unescape(old_str)
		new_str = unescape(new_str)
	if opts.whole_words:
		regex = re.compile(r"(?:(?<=\s)|^)" + re.escape(old_str) + r"(?=\s|$)",
						   opts.ignore_case and re.I or 0)
	else:
		regex = re.compile(re.escape(old_str), opts.ignore_case and re.I or 0)

	total_matches = 0
	files = get_files(files, opts.recurse, opts.suffixen, opts.verbose, opts.hidden_files)
	for filename, perms in files:
		# Open the input file
		try: f = open(filename, "rb")
		except IOError, e:
			sys.stderr.write("\nrpl: Unable to open %s for reading." % fn)
			sys.stderr.write("\nrpl: Error: %s" % e)
			sys.stderr.write("\nrpl: SKIPPING %s\n\n" % fn)
			continue

		# Find out where we should put the temporary file
		if opts.use_tmpdir: tempfile.tempdir = None
		else:               tempfile.tempdir = os.path.dirname(filename)

		# Create the output file
		try:
			o, tmp_path = tempfile.mkstemp("", ".tmp.")
			o = os.fdopen(o, "wb")
		except OSError, e:
			sys.stderr.write("\nrpl: Unable to create temp file.")
			sys.stderr.write("\nrpl: Error: %s" % e)
			sys.stderr.write("\nrpl: (Type \"rpl -h\" and consider \"-t\" to specify temp file location.)")
			sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename)
			continue

		# Set permissions and owner
		try:
			os.chown(tmp_path, perms.st_uid, perms.st_gid)
			os.chmod(tmp_path, perms.st_mode)
		except OSError, e:
			sys.stderr.write("\nrpl: Unable to set owner/group/perms of %s"
							 % filename)
			sys.stderr.write("\nrpl: Error: %s" % e)
			if opts.force:
				sys.stderr.write("\nrpl: WARNING: New owner/group/perms may not match!\n\n")
			else:
				sys.stderr.write("\nrpl: SKIPPING %s!\n\n" % filename)
				os.unlink(tmp_path)
				continue
				
		if opts.verbose and not opts.dry_run:
			sys.stderr.write("Processing:  %s\n" % filename)
		elif not opts.quiet and not opts.dry_run:
			sys.stderr.write(".")
			sys.stderr.flush()

		# Do the actual work now
		matches = blockrepl(f, o, regex, old_str, new_str, 1024)

		f.close()
		o.close()

		if matches == 0:
			os.unlink(tmp_path)
			continue

		if opts.dry_run:
			try:
				fn = os.path.realpath(filename)
			except OSError, e:
				fn = filename
			if not opts.quiet: sys.stderr.write("  %s\n" % fn)
			os.unlink(tmp_path)
			total_matches += matches
			continue

		if opts.prompt:
			sys.stderr.write("\nSave '%s' ? ([Y]/N) " % filename)
			line = ""
			while line == "" or line[0] not in "Yy\nnN":
				line = raw_input()
			if line[0] in "nN":
				sys.stderr.write("Not Saved.\n")
				os.unlink(tmp_path)
				continue
			sys.stderr.write("Saved.\n")

		if opts.do_backup:
			try: os.rename(filename, filename + "~")
			except OSError, e:
				sys.stderr.write("rpl: An error occured renaming %s to %s." % (filename, filename + "~"))
				sys.stderr.write("\nrpl: Error: %s" % e)
				continue

		# Rename the file
		try: os.rename(tmp_path, filename)
		except OSError, e:
			sys.stderr.write("rpl: An error occured replacing %s with %s."
							 % (tmp_path, filename))
			sys.stderr.write("\nrpl: Error: %s" % e)
			os.unlink(tmp_path)
			continue

		# Restore the times
		if opts.keep_times:
			try: os.utime(filename, (perms.st_atime, perms.st_mtime))
			except OSError, e:
				sys.stderr.write("\nrpl: An error occured setting the access time and mod time of the file %s.", filename)
				sys.stderr.write("\nrpl: Error: %s" % e)
		total_matches += matches

	# We're about to exit, give a summary
	if not opts.quiet:
		if opts.dry_run:
			sys.stderr.write("\nA Total of %lu matches found in %lu file%s searched."
							 % (total_matches,
								len(files),
								len(files) != 1 and "s" or ""))
			sys.stderr.write("\nNone replaced (simulation mode).\n")
		else:
			sys.stderr.write("\nA Total of %lu matches replaced in %lu file%s searched.\n"
							 % (total_matches,
								len(files),
								len(files) != 1 and "s" or ""))

if __name__ == "__main__":
	main()