diff --git a/sith.py b/sith.py index 6f07d882..e35a4f5b 100755 --- a/sith.py +++ b/sith.py @@ -6,20 +6,37 @@ Sith attacks (and helps debugging) Jedi. Randomly search Python files and run Jedi on it. Exception and used arguments are recorded to ``./record.json`` (specified by --record):: - %(prog)s random /path/to/sourcecode + ./sith.py random /path/to/sourcecode Redo recorded exception:: - %(prog)s redo + ./sith.py redo -Fallback to pdb when error is raised:: +Show recorded exception:: - %(prog)s --pdb random - %(prog)s --pdb redo + ./sith.py show +Usage: + sith.py [--pdb|--ipdb|--pudb] [-d] [-m=] [-f] [--record=] random [] + sith.py [--pdb|--ipdb|--pudb] [--d] [-f] redo + sith.py [--pdb|--ipdb|--pudb] [--d] [-f] run + sith.py show + sith.py -h | --help + +Options: + -h --help Show this screen. + --record= Exceptions are recorded in here [default: record.json]. + -f, --fs-cache By default, file system cache is off for reproducibility. + -m, --maxtries= Maximum of random tries [default: 100] + -d, --debug Jedi print debugging when an error is raised. + --pdb Launch pdb when error is raised. + --ipdb Launch ipdb when error is raised. + --pudb Launch pudb when error is raised. """ from __future__ import print_function, division, unicode_literals +from docopt import docopt + import json import os import random @@ -28,284 +45,110 @@ import traceback import jedi -_unspecified = object() - - -class SourceCode(object): - - def __init__(self, path): - self.path = path - with open(path) as f: - self.source = f.read() - self.lines = self.source.splitlines() - self.maxline = len(self.lines) - - def choose_script_args(self): - line = random.randint(1, self.maxline) - column = random.randint(0, len(self.lines[line - 1])) - return (self.source, line, column, self.path) - class SourceFinder(object): + _files = None - def __init__(self, rootpath): - self.rootpath = rootpath - self.files = list(self.search_files()) - - def search_files(self): - for root, dirnames, filenames in os.walk(self.rootpath): + @staticmethod + def fetch(file_path): + for root, dirnames, filenames in os.walk(file_path): for name in filenames: if name.endswith('.py'): yield os.path.join(root, name) - def choose_source(self): - # FIXME: try same file for several times - return SourceCode(random.choice(self.files)) + @classmethod + def files(cls, file_path): + if cls._files is None: + cls._files = list(cls.fetch(file_path)) + return cls._files -class BaseAttacker(object): +class TestCase(object): + def __init__(self, operation, path, line, column, traceback=None): + self.operation = operation + self.path = path + self.line = line + self.column = column + self.traceback = traceback - def __init__(self): - self.record = {'data': []} + @classmethod + def from_cache(cls, record): + with open(record) as f: + dct = json.load(f) + return cls(**dct) - def attack(self, operation, *args): - script = jedi.Script(*args) - op = getattr(script, operation) - op() + @classmethod + def generate(cls, file_path): + operations = [ + 'completions', 'goto_assignments', 'goto_definitions', 'usages', + 'call_signatures'] + operation = random.choice(operations) - def add_record(self, exc_info, operation, args): - (_type, value, tb) = exc_info - self.record['data'].append({ - 'traceback': traceback.format_tb(tb), - 'error': repr(value), - 'operation': operation, - 'args': args, - }) - - def get_record(self, recid): - return self.record['data'][recid] - - def save_record(self, path): - directory = os.path.dirname(os.path.abspath(path)) - if not os.path.isdir(directory): - os.makedirs(directory) - with open(path, 'w') as f: - json.dump(self.record, f) - - def load_record(self, path): + path = random.choice(SourceFinder.files(file_path)) with open(path) as f: - self.record = json.load(f) - return self.record + source = f.read() + lines = source.splitlines() - def add_arguments(self, parser): - parser.set_defaults(func=self.do_run) + line = random.randint(1, max(len(lines), 1)) + column = random.randint(0, len(lines[line - 1])) + return cls(operation, path, line, column) - def get_help(self): - for line in self.__doc__.splitlines(): - line = line.strip() - if line: - return line - - -class MixinPrinter(object): - - def print_record(self, recid=-1): - data = self.get_record(recid) - print(*data['traceback'], end='') - print(""" -{error} is raised by running Script(...).{operation}() with -line : {args[1]} -column: {args[2]} -path : {args[3]} -""".format(**data)) - - -class MixinLoader(object): - - def add_arguments(self, parser): - super(MixinLoader, self).add_arguments(parser) - parser = parser.add_argument( - 'recid', default=0, nargs='?', type=int, help=""" - This option currently has no effect as random attack record - only one error. - """) - - def do_run(self, record, recid): - self.load_record(record) - - -class AttackReporter(object): - - def __init__(self): - self.tries = 0 - self.errors = 0 - - def __iter__(self): - return self - - def __next__(self): - self.tries += 1 - sys.stderr.write('.') - sys.stderr.flush() - return self.tries - - next = __next__ - - def error(self): - self.errors += 1 - sys.stderr.write('\n') - sys.stderr.flush() - print('{0}th error is encountered after {1} tries.' - .format(self.errors, self.tries)) - - -class RandomAttacker(MixinPrinter, BaseAttacker): - - """ - Randomly run Script().() against files under . - """ - - operations = [ - 'completions', 'goto_assignments', 'goto_definitions', 'usages', - 'call_signatures'] - - def choose_operation(self): - return random.choice(self.operations) - - def generate_attacks(self, maxtries, finder): - for _ in range(maxtries): - src = finder.choose_source() - operation = self.choose_operation() - yield (operation, src.choose_script_args()) - - def do_run(self, record, rootpath, maxtries): - finder = SourceFinder(rootpath) - reporter = AttackReporter() - for (operation, args) in self.generate_attacks(maxtries, finder): - reporter.next() - try: - self.attack(operation, *args) - except jedi.NotFoundError: - pass - except Exception: - self.add_record(sys.exc_info(), operation, args) - reporter.error() - self.print_record() - raise - finally: - self.save_record(record) - - def add_arguments(self, parser): - super(RandomAttacker, self).add_arguments(parser) - parser.add_argument( - '--maxtries', '-l', default=10000, type=int) - parser.add_argument( - 'rootpath', default='.', nargs='?', - help='root directory to look for Python files.') - - -class RedoAttacker(MixinLoader, BaseAttacker): - - """ - Redo recorded attack. - """ - - def do_run(self, record, recid): - super(RedoAttacker, self).do_run(record, recid) - data = self.get_record(recid) + def run(self, debugger, record=None): try: - self.attack(data['operation'], *data['args']) - except: - traceback.print_exc() - raise - - -class ShowRecord(MixinLoader, MixinPrinter, BaseAttacker): - - """ - Show recorded errors. - """ - - def do_run(self, record, recid): - super(ShowRecord, self).do_run(record, recid) - self.print_record() - - -class AttackApp(object): - - def __init__(self): - self.parsers = [] - self.attackers = [] - - def run(self, args=None): - parser = self.get_parser() - self.do_run(**vars(parser.parse_args(args))) - - def do_run(self, func, debugger, fs_cache, **kwds): - if fs_cache is _unspecified: - jedi.settings.use_filesystem_cache = False - else: - jedi.settings.cache_directory = fs_cache - - try: - func(**kwds) - except: + with open(self.path) as f: + script = jedi.Script(f.read(), self.line, self.column, + self.path) + getattr(script, self.operation)() + except jedi.NotFoundError: + pass + except Exception: + self.traceback = traceback.format_exc() + if record is not None: + with open(record, 'w') as f: + json.dump(self.__dict__, f) + self.show() if debugger: einfo = sys.exc_info() pdb = __import__(debugger) pdb.post_mortem(einfo if debugger == 'pudb' else einfo[2]) - sys.exit(1) + return False + return True - def add_parser(self, attacker_class, *args, **kwds): - attacker = attacker_class() - parser = self.subparsers.add_parser( - *args, - help=attacker.get_help(), - description=attacker.__doc__, - **kwds) - attacker.add_arguments(parser) + def show(self): + print(self.traceback) + print(("Error with running Script(...).{operation}() with\n" + "\tpath: {path}\n" + "\tline: {line}\n" + "\tcolumn: {column}").format(**self.__dict__)) - # Not required, just fore debugging: - self.parsers.append(parser) - self.attackers.append(attacker) +def main(arguments): + debugger = 'pdb' if arguments['--pdb'] else \ + 'ipdb' if arguments['--ipdb'] else \ + 'pudb' if arguments['--pudb'] else None + record = arguments['--record'] - def get_parser(self): - parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, - description=__doc__) - parser.add_argument( - '--record', '-R', default='record.json', - help='Exceptions are recorded in here (default: %(default)s).') - parser.add_argument( - '--pdb', dest='debugger', const='pdb', action='store_const', - help='Launch pdb when error is raised.') - parser.add_argument( - '--ipdb', dest='debugger', const='ipdb', action='store_const', - help='Launch ipdb when error is raised.') - parser.add_argument( - '--pudb', dest='debugger', const='pudb', action='store_const', - help='Launch pudb when error is raised.') - parser.add_argument( - '--fs-cache', '-C', default=_unspecified, - help=""" - By default, file system cache is off for reproducibility. - Pass a temporary directory to use file system cache. - It is set to ``jedi.settings.cache_directory``. - """) + jedi.settings.use_filesystem_cache = arguments['--fs-cache'] + if arguments['--debug']: + jedi.set_debug_function() - self.subparsers = parser.add_subparsers() - self.add_parser(RandomAttacker, 'random') - self.add_parser(RedoAttacker, 'redo') - self.add_parser(ShowRecord, 'show') - - return parser + if arguments['redo'] or arguments['show']: + t = TestCase.from_cache(record) + if arguments['show']: + t.show() + else: + t.run(debugger) + elif arguments['run']: + TestCase(arguments[''], arguments[''], + arguments[''], arguments['']).run(debugger) + else: + for _ in range(int(arguments['--maxtries'])): + t = TestCase.generate(arguments[''] or '.') + if not t.run(debugger, record): + break + print('.', end='') + sys.stdout.flush() if __name__ == '__main__': - try: - import argparse - except ImportError: - print('The argparse module (Python>=2.7) is needed to run sith.') - sys.exit(1) - app = AttackApp() - app.run() + arguments = docopt(__doc__) + main(arguments)