mirror of
https://github.com/davidhalter/django-stubs.git
synced 2025-12-07 04:34:29 +08:00
245 lines
11 KiB
Python
245 lines
11 KiB
Python
import os
|
|
import posixpath
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
from typing import Any, Optional, Iterator, Dict, List, Tuple, Set
|
|
|
|
import pytest
|
|
from mypy.test.config import test_temp_dir
|
|
from mypy.test.data import DataDrivenTestCase, DataSuite, add_test_name_suffix, parse_test_data, \
|
|
expand_errors, expand_variables, fix_win_path
|
|
|
|
|
|
def parse_test_case(case: 'DataDrivenTestCase') -> None:
|
|
"""Parse and prepare a single case from suite with test case descriptions.
|
|
|
|
This method is part of the setup phase, just before the test case is run.
|
|
"""
|
|
test_items = parse_test_data(case.data, case.name)
|
|
base_path = case.suite.base_path
|
|
if case.suite.native_sep:
|
|
join = os.path.join
|
|
else:
|
|
join = posixpath.join # type: ignore
|
|
|
|
out_section_missing = case.suite.required_out_section
|
|
|
|
files = [] # type: List[Tuple[str, str]] # path and contents
|
|
output_files = [] # type: List[Tuple[str, str]] # path and contents for output files
|
|
output = [] # type: List[str] # Regular output errors
|
|
output2 = {} # type: Dict[int, List[str]] # Output errors for incremental, runs 2+
|
|
deleted_paths = {} # type: Dict[int, Set[str]] # from run number of paths
|
|
stale_modules = {} # type: Dict[int, Set[str]] # from run number to module names
|
|
rechecked_modules = {} # type: Dict[ int, Set[str]] # from run number module names
|
|
triggered = [] # type: List[str] # Active triggers (one line per incremental step)
|
|
|
|
# Process the parsed items. Each item has a header of form [id args],
|
|
# optionally followed by lines of text.
|
|
item = first_item = test_items[0]
|
|
for item in test_items[1:]:
|
|
if item.id == 'file' or item.id == 'outfile':
|
|
# Record an extra file needed for the test case.
|
|
assert item.arg is not None
|
|
contents = expand_variables('\n'.join(item.data))
|
|
file_entry = (join(base_path, item.arg), contents)
|
|
if item.id == 'file':
|
|
files.append(file_entry)
|
|
else:
|
|
output_files.append(file_entry)
|
|
elif item.id in ('builtins', 'builtins_py2'):
|
|
# Use an alternative stub file for the builtins module.
|
|
assert item.arg is not None
|
|
mpath = join(os.path.dirname(case.file), item.arg)
|
|
fnam = 'builtins.pyi' if item.id == 'builtins' else '__builtin__.pyi'
|
|
with open(mpath) as f:
|
|
files.append((join(base_path, fnam), f.read()))
|
|
elif item.id == 'typing':
|
|
# Use an alternative stub file for the typing module.
|
|
assert item.arg is not None
|
|
src_path = join(os.path.dirname(case.file), item.arg)
|
|
with open(src_path) as f:
|
|
files.append((join(base_path, 'typing.pyi'), f.read()))
|
|
elif re.match(r'stale[0-9]*$', item.id):
|
|
passnum = 1 if item.id == 'stale' else int(item.id[len('stale'):])
|
|
assert passnum > 0
|
|
modules = (set() if item.arg is None else {t.strip() for t in item.arg.split(',')})
|
|
stale_modules[passnum] = modules
|
|
elif re.match(r'rechecked[0-9]*$', item.id):
|
|
passnum = 1 if item.id == 'rechecked' else int(item.id[len('rechecked'):])
|
|
assert passnum > 0
|
|
modules = (set() if item.arg is None else {t.strip() for t in item.arg.split(',')})
|
|
rechecked_modules[passnum] = modules
|
|
elif item.id == 'delete':
|
|
# File to delete during a multi-step test case
|
|
assert item.arg is not None
|
|
m = re.match(r'(.*)\.([0-9]+)$', item.arg)
|
|
assert m, 'Invalid delete section: {}'.format(item.arg)
|
|
num = int(m.group(2))
|
|
assert num >= 2, "Can't delete during step {}".format(num)
|
|
full = join(base_path, m.group(1))
|
|
deleted_paths.setdefault(num, set()).add(full)
|
|
elif re.match(r'out[0-9]*$', item.id):
|
|
tmp_output = [expand_variables(line) for line in item.data]
|
|
if os.path.sep == '\\':
|
|
tmp_output = [fix_win_path(line) for line in tmp_output]
|
|
if item.id == 'out' or item.id == 'out1':
|
|
output = tmp_output
|
|
else:
|
|
passnum = int(item.id[len('out'):])
|
|
assert passnum > 1
|
|
output2[passnum] = tmp_output
|
|
out_section_missing = False
|
|
elif item.id == 'triggered' and item.arg is None:
|
|
triggered = item.data
|
|
elif item.id == 'env':
|
|
env_vars_to_set = item.arg
|
|
for env in env_vars_to_set.split(';'):
|
|
try:
|
|
name, value = env.split('=')
|
|
os.environ[name] = value
|
|
except ValueError:
|
|
continue
|
|
else:
|
|
raise ValueError(
|
|
'Invalid section header {} in {} at line {}'.format(
|
|
item.id, case.file, item.line))
|
|
|
|
if out_section_missing:
|
|
raise ValueError(
|
|
'{}, line {}: Required output section not found'.format(
|
|
case.file, first_item.line))
|
|
|
|
for passnum in stale_modules.keys():
|
|
if passnum not in rechecked_modules:
|
|
# If the set of rechecked modules isn't specified, make it the same as the set
|
|
# of modules with a stale public interface.
|
|
rechecked_modules[passnum] = stale_modules[passnum]
|
|
if (passnum in stale_modules
|
|
and passnum in rechecked_modules
|
|
and not stale_modules[passnum].issubset(rechecked_modules[passnum])):
|
|
raise ValueError(
|
|
('Stale modules after pass {} must be a subset of rechecked '
|
|
'modules ({}:{})').format(passnum, case.file, first_item.line))
|
|
|
|
input = first_item.data
|
|
expand_errors(input, output, 'main')
|
|
for file_path, contents in files:
|
|
expand_errors(contents.split('\n'), output, file_path)
|
|
|
|
case.input = input
|
|
case.output = output
|
|
case.output2 = output2
|
|
case.lastline = item.line
|
|
case.files = files
|
|
case.output_files = output_files
|
|
case.expected_stale_modules = stale_modules
|
|
case.expected_rechecked_modules = rechecked_modules
|
|
case.deleted_paths = deleted_paths
|
|
case.triggered = triggered or []
|
|
|
|
|
|
class DjangoDataDrivenTestCase(DataDrivenTestCase):
|
|
def setup(self) -> None:
|
|
self.old_environ = os.environ.copy()
|
|
|
|
parse_test_case(case=self)
|
|
self.old_cwd = os.getcwd()
|
|
|
|
self.tmpdir = tempfile.TemporaryDirectory(prefix='mypy-test-')
|
|
|
|
os.chdir(self.tmpdir.name)
|
|
os.mkdir(test_temp_dir)
|
|
encountered_files = set()
|
|
self.clean_up = []
|
|
for paths in self.deleted_paths.values():
|
|
for path in paths:
|
|
self.clean_up.append((False, path))
|
|
encountered_files.add(path)
|
|
for path, content in self.files:
|
|
dir = os.path.dirname(path)
|
|
for d in self.add_dirs(dir):
|
|
self.clean_up.append((True, d))
|
|
with open(path, 'w') as f:
|
|
f.write(content)
|
|
if path not in encountered_files:
|
|
self.clean_up.append((False, path))
|
|
encountered_files.add(path)
|
|
if re.search(r'\.[2-9]$', path):
|
|
# Make sure new files introduced in the second and later runs are accounted for
|
|
renamed_path = path[:-2]
|
|
if renamed_path not in encountered_files:
|
|
encountered_files.add(renamed_path)
|
|
self.clean_up.append((False, renamed_path))
|
|
for path, _ in self.output_files:
|
|
# Create directories for expected output and mark them to be cleaned up at the end
|
|
# of the test case.
|
|
dir = os.path.dirname(path)
|
|
for d in self.add_dirs(dir):
|
|
self.clean_up.append((True, d))
|
|
self.clean_up.append((False, path))
|
|
|
|
sys.path.insert(0, os.path.join(self.tmpdir.name, 'tmp'))
|
|
|
|
def teardown(self):
|
|
if hasattr(self, 'old_environ'):
|
|
os.environ = self.old_environ
|
|
super().teardown()
|
|
|
|
|
|
def split_test_cases(parent: 'DataSuiteCollector', suite: 'DataSuite',
|
|
file: str) -> Iterator[DjangoDataDrivenTestCase]:
|
|
"""Iterate over raw test cases in file, at collection time, ignoring sub items.
|
|
|
|
The collection phase is slow, so any heavy processing should be deferred to after
|
|
uninteresting tests are filtered (when using -k PATTERN switch).
|
|
"""
|
|
with open(file, encoding='utf-8') as f:
|
|
data = f.read()
|
|
cases = re.split(r'^\[case ([a-zA-Z_0-9]+)'
|
|
r'(-writescache)?'
|
|
r'(-only_when_cache|-only_when_nocache)?'
|
|
r'(-skip)?'
|
|
r'\][ \t]*$\n', data,
|
|
flags=re.DOTALL | re.MULTILINE)
|
|
line_no = cases[0].count('\n') + 1
|
|
|
|
for i in range(1, len(cases), 5):
|
|
name, writescache, only_when, skip, data = cases[i:i + 5]
|
|
yield DjangoDataDrivenTestCase(parent, suite, file,
|
|
name=add_test_name_suffix(name, suite.test_name_suffix),
|
|
writescache=bool(writescache),
|
|
only_when=only_when,
|
|
skip=bool(skip),
|
|
data=data,
|
|
line=line_no)
|
|
line_no += data.count('\n') + 1
|
|
|
|
|
|
class DataSuiteCollector(pytest.Class): # type: ignore # inheriting from Any
|
|
def collect(self) -> Iterator[pytest.Item]: # type: ignore
|
|
"""Called by pytest on each of the object returned from pytest_pycollect_makeitem"""
|
|
|
|
# obj is the object for which pytest_pycollect_makeitem returned self.
|
|
suite = self.obj # type: DataSuite
|
|
for f in suite.files:
|
|
yield from split_test_cases(self, suite, os.path.join(suite.data_prefix, f))
|
|
|
|
|
|
# This function name is special to pytest. See
|
|
# http://doc.pytest.org/en/latest/writing_plugins.html#collection-hooks
|
|
def pytest_pycollect_makeitem(collector: Any, name: str,
|
|
obj: object) -> 'Optional[Any]':
|
|
"""Called by pytest on each object in modules configured in conftest.py files.
|
|
|
|
collector is pytest.Collector, returns Optional[pytest.Class]
|
|
"""
|
|
if isinstance(obj, type):
|
|
# Only classes derived from DataSuite contain test cases, not the DataSuite class itself
|
|
if issubclass(obj, DataSuite) and obj is not DataSuite:
|
|
# Non-None result means this obj is a test case.
|
|
# The collect method of the returned DataSuiteCollector instance will be called later,
|
|
# with self.obj being obj.
|
|
return DataSuiteCollector(name, parent=collector)
|
|
return None
|