1
0
forked from VimPlug/jedi

completely rewrote sith.py with docopt - also added a run command and debug option

This commit is contained in:
David Halter
2013-07-17 14:00:28 +02:00
parent 2fffaee2c3
commit 8d8b645f11

361
sith.py
View File

@@ -6,20 +6,37 @@ Sith attacks (and helps debugging) Jedi.
Randomly search Python files and run Jedi on it. Exception and used Randomly search Python files and run Jedi on it. Exception and used
arguments are recorded to ``./record.json`` (specified by --record):: arguments are recorded to ``./record.json`` (specified by --record)::
%(prog)s random /path/to/sourcecode ./sith.py random /path/to/sourcecode
Redo recorded exception:: Redo recorded exception::
%(prog)s redo ./sith.py redo
Fallback to pdb when error is raised:: Show recorded exception::
%(prog)s --pdb random ./sith.py show
%(prog)s --pdb redo
Usage:
sith.py [--pdb|--ipdb|--pudb] [-d] [-m=<nr>] [-f] [--record=<file>] random [<path>]
sith.py [--pdb|--ipdb|--pudb] [--d] [-f] redo
sith.py [--pdb|--ipdb|--pudb] [--d] [-f] run <operation> <path> <line> <column>
sith.py show
sith.py -h | --help
Options:
-h --help Show this screen.
--record=<file> Exceptions are recorded in here [default: record.json].
-f, --fs-cache By default, file system cache is off for reproducibility.
-m, --maxtries=<nr> Maximum of random tries [default: 100]
-d, --debug Jedi print debugging when an error is raised.
--pdb Launch pdb when error is raised.
--ipdb Launch ipdb when error is raised.
--pudb Launch pudb when error is raised.
""" """
from __future__ import print_function, division, unicode_literals from __future__ import print_function, division, unicode_literals
from docopt import docopt
import json import json
import os import os
import random import random
@@ -28,284 +45,110 @@ import traceback
import jedi import jedi
_unspecified = object()
class SourceCode(object):
def __init__(self, path):
self.path = path
with open(path) as f:
self.source = f.read()
self.lines = self.source.splitlines()
self.maxline = len(self.lines)
def choose_script_args(self):
line = random.randint(1, self.maxline)
column = random.randint(0, len(self.lines[line - 1]))
return (self.source, line, column, self.path)
class SourceFinder(object): class SourceFinder(object):
_files = None
def __init__(self, rootpath): @staticmethod
self.rootpath = rootpath def fetch(file_path):
self.files = list(self.search_files()) for root, dirnames, filenames in os.walk(file_path):
def search_files(self):
for root, dirnames, filenames in os.walk(self.rootpath):
for name in filenames: for name in filenames:
if name.endswith('.py'): if name.endswith('.py'):
yield os.path.join(root, name) yield os.path.join(root, name)
def choose_source(self): @classmethod
# FIXME: try same file for several times def files(cls, file_path):
return SourceCode(random.choice(self.files)) if cls._files is None:
cls._files = list(cls.fetch(file_path))
return cls._files
class BaseAttacker(object): class TestCase(object):
def __init__(self, operation, path, line, column, traceback=None):
self.operation = operation
self.path = path
self.line = line
self.column = column
self.traceback = traceback
def __init__(self): @classmethod
self.record = {'data': []} def from_cache(cls, record):
with open(record) as f:
dct = json.load(f)
return cls(**dct)
def attack(self, operation, *args): @classmethod
script = jedi.Script(*args) def generate(cls, file_path):
op = getattr(script, operation) operations = [
op() 'completions', 'goto_assignments', 'goto_definitions', 'usages',
'call_signatures']
operation = random.choice(operations)
def add_record(self, exc_info, operation, args): path = random.choice(SourceFinder.files(file_path))
(_type, value, tb) = exc_info
self.record['data'].append({
'traceback': traceback.format_tb(tb),
'error': repr(value),
'operation': operation,
'args': args,
})
def get_record(self, recid):
return self.record['data'][recid]
def save_record(self, path):
directory = os.path.dirname(os.path.abspath(path))
if not os.path.isdir(directory):
os.makedirs(directory)
with open(path, 'w') as f:
json.dump(self.record, f)
def load_record(self, path):
with open(path) as f: with open(path) as f:
self.record = json.load(f) source = f.read()
return self.record lines = source.splitlines()
def add_arguments(self, parser): line = random.randint(1, max(len(lines), 1))
parser.set_defaults(func=self.do_run) column = random.randint(0, len(lines[line - 1]))
return cls(operation, path, line, column)
def get_help(self): def run(self, debugger, record=None):
for line in self.__doc__.splitlines():
line = line.strip()
if line:
return line
class MixinPrinter(object):
def print_record(self, recid=-1):
data = self.get_record(recid)
print(*data['traceback'], end='')
print("""
{error} is raised by running Script(...).{operation}() with
line : {args[1]}
column: {args[2]}
path : {args[3]}
""".format(**data))
class MixinLoader(object):
def add_arguments(self, parser):
super(MixinLoader, self).add_arguments(parser)
parser = parser.add_argument(
'recid', default=0, nargs='?', type=int, help="""
This option currently has no effect as random attack record
only one error.
""")
def do_run(self, record, recid):
self.load_record(record)
class AttackReporter(object):
def __init__(self):
self.tries = 0
self.errors = 0
def __iter__(self):
return self
def __next__(self):
self.tries += 1
sys.stderr.write('.')
sys.stderr.flush()
return self.tries
next = __next__
def error(self):
self.errors += 1
sys.stderr.write('\n')
sys.stderr.flush()
print('{0}th error is encountered after {1} tries.'
.format(self.errors, self.tries))
class RandomAttacker(MixinPrinter, BaseAttacker):
"""
Randomly run Script().<method>() against files under <rootpath>.
"""
operations = [
'completions', 'goto_assignments', 'goto_definitions', 'usages',
'call_signatures']
def choose_operation(self):
return random.choice(self.operations)
def generate_attacks(self, maxtries, finder):
for _ in range(maxtries):
src = finder.choose_source()
operation = self.choose_operation()
yield (operation, src.choose_script_args())
def do_run(self, record, rootpath, maxtries):
finder = SourceFinder(rootpath)
reporter = AttackReporter()
for (operation, args) in self.generate_attacks(maxtries, finder):
reporter.next()
try:
self.attack(operation, *args)
except jedi.NotFoundError:
pass
except Exception:
self.add_record(sys.exc_info(), operation, args)
reporter.error()
self.print_record()
raise
finally:
self.save_record(record)
def add_arguments(self, parser):
super(RandomAttacker, self).add_arguments(parser)
parser.add_argument(
'--maxtries', '-l', default=10000, type=int)
parser.add_argument(
'rootpath', default='.', nargs='?',
help='root directory to look for Python files.')
class RedoAttacker(MixinLoader, BaseAttacker):
"""
Redo recorded attack.
"""
def do_run(self, record, recid):
super(RedoAttacker, self).do_run(record, recid)
data = self.get_record(recid)
try: try:
self.attack(data['operation'], *data['args']) with open(self.path) as f:
except: script = jedi.Script(f.read(), self.line, self.column,
traceback.print_exc() self.path)
raise getattr(script, self.operation)()
except jedi.NotFoundError:
pass
class ShowRecord(MixinLoader, MixinPrinter, BaseAttacker): except Exception:
self.traceback = traceback.format_exc()
""" if record is not None:
Show recorded errors. with open(record, 'w') as f:
""" json.dump(self.__dict__, f)
self.show()
def do_run(self, record, recid):
super(ShowRecord, self).do_run(record, recid)
self.print_record()
class AttackApp(object):
def __init__(self):
self.parsers = []
self.attackers = []
def run(self, args=None):
parser = self.get_parser()
self.do_run(**vars(parser.parse_args(args)))
def do_run(self, func, debugger, fs_cache, **kwds):
if fs_cache is _unspecified:
jedi.settings.use_filesystem_cache = False
else:
jedi.settings.cache_directory = fs_cache
try:
func(**kwds)
except:
if debugger: if debugger:
einfo = sys.exc_info() einfo = sys.exc_info()
pdb = __import__(debugger) pdb = __import__(debugger)
pdb.post_mortem(einfo if debugger == 'pudb' else einfo[2]) pdb.post_mortem(einfo if debugger == 'pudb' else einfo[2])
sys.exit(1) return False
return True
def add_parser(self, attacker_class, *args, **kwds): def show(self):
attacker = attacker_class() print(self.traceback)
parser = self.subparsers.add_parser( print(("Error with running Script(...).{operation}() with\n"
*args, "\tpath: {path}\n"
help=attacker.get_help(), "\tline: {line}\n"
description=attacker.__doc__, "\tcolumn: {column}").format(**self.__dict__))
**kwds)
attacker.add_arguments(parser)
# Not required, just fore debugging: def main(arguments):
self.parsers.append(parser) debugger = 'pdb' if arguments['--pdb'] else \
self.attackers.append(attacker) 'ipdb' if arguments['--ipdb'] else \
'pudb' if arguments['--pudb'] else None
record = arguments['--record']
def get_parser(self): jedi.settings.use_filesystem_cache = arguments['--fs-cache']
parser = argparse.ArgumentParser( if arguments['--debug']:
formatter_class=argparse.RawDescriptionHelpFormatter, jedi.set_debug_function()
description=__doc__)
parser.add_argument(
'--record', '-R', default='record.json',
help='Exceptions are recorded in here (default: %(default)s).')
parser.add_argument(
'--pdb', dest='debugger', const='pdb', action='store_const',
help='Launch pdb when error is raised.')
parser.add_argument(
'--ipdb', dest='debugger', const='ipdb', action='store_const',
help='Launch ipdb when error is raised.')
parser.add_argument(
'--pudb', dest='debugger', const='pudb', action='store_const',
help='Launch pudb when error is raised.')
parser.add_argument(
'--fs-cache', '-C', default=_unspecified,
help="""
By default, file system cache is off for reproducibility.
Pass a temporary directory to use file system cache.
It is set to ``jedi.settings.cache_directory``.
""")
self.subparsers = parser.add_subparsers() if arguments['redo'] or arguments['show']:
self.add_parser(RandomAttacker, 'random') t = TestCase.from_cache(record)
self.add_parser(RedoAttacker, 'redo') if arguments['show']:
self.add_parser(ShowRecord, 'show') t.show()
else:
return parser t.run(debugger)
elif arguments['run']:
TestCase(arguments['<operation>'], arguments['<path>'],
arguments['<line>'], arguments['<column>']).run(debugger)
else:
for _ in range(int(arguments['--maxtries'])):
t = TestCase.generate(arguments['<path>'] or '.')
if not t.run(debugger, record):
break
print('.', end='')
sys.stdout.flush()
if __name__ == '__main__': if __name__ == '__main__':
try: arguments = docopt(__doc__)
import argparse main(arguments)
except ImportError:
print('The argparse module (Python>=2.7) is needed to run sith.')
sys.exit(1)
app = AttackApp()
app.run()