diff --git a/tests/regr_test.py b/tests/regr_test.py index 6746b2bc7..f7a2c2140 100755 --- a/tests/regr_test.py +++ b/tests/regr_test.py @@ -4,12 +4,17 @@ from __future__ import annotations import argparse +import concurrent.futures import os +import queue import re import shutil import subprocess import sys import tempfile +import threading +from contextlib import ExitStack, suppress +from dataclasses import dataclass from enum import IntEnum from itertools import product from pathlib import Path @@ -24,7 +29,6 @@ from utils import ( get_mypy_req, make_venv, print_error, - print_success_msg, testcase_dir_from_package_name, ) @@ -103,18 +107,20 @@ parser.add_argument( ), ) +_PRINT_QUEUE: queue.SimpleQueue[str] = queue.SimpleQueue() + def verbose_log(msg: str) -> None: - print(colored("\n" + msg, "blue")) + _PRINT_QUEUE.put(colored(msg, "blue")) -def setup_testcase_dir(package: PackageInfo, tempdir: Path, new_test_case_dir: Path, verbosity: Verbosity) -> None: +def setup_testcase_dir(package: PackageInfo, tempdir: Path, verbosity: Verbosity) -> None: if verbosity is verbosity.VERBOSE: - verbose_log(f"Setting up testcase dir in {tempdir}") + verbose_log(f"{package.name}: Setting up testcase dir in {tempdir}") # --warn-unused-ignores doesn't work for files inside typeshed. # SO, to work around this, we copy the test_cases directory into a TemporaryDirectory, # and run the test cases inside of that. - shutil.copytree(package.test_case_directory, new_test_case_dir) + shutil.copytree(package.test_case_directory, tempdir / TEST_CASES) if package.is_stdlib: return @@ -137,16 +143,15 @@ def setup_testcase_dir(package: PackageInfo, tempdir: Path, new_test_case_dir: P shutil.copytree(Path("stubs", requirement), new_typeshed / "stubs" / requirement) if requirements.external_pkgs: - if verbosity is Verbosity.VERBOSE: - verbose_log(f"Setting up venv in {tempdir / VENV_DIR}") pip_exe = make_venv(tempdir / VENV_DIR).pip_exe - pip_command = [pip_exe, "install", get_mypy_req(), *requirements.external_pkgs] + # Use --no-cache-dir to avoid issues with concurrent read/writes to the cache + pip_command = [pip_exe, "install", get_mypy_req(), *requirements.external_pkgs, "--no-cache-dir"] if verbosity is Verbosity.VERBOSE: - verbose_log(f"{pip_command=}") + verbose_log(f"{package.name}: Setting up venv in {tempdir / VENV_DIR}. {pip_command=}\n") try: subprocess.run(pip_command, check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: - print(e.stderr) + _PRINT_QUEUE.put(f"{package.name}\n{e.stderr}") raise @@ -155,10 +160,6 @@ def run_testcases( ) -> subprocess.CompletedProcess[str]: env_vars = dict(os.environ) new_test_case_dir = tempdir / TEST_CASES - testcasedir_already_setup = new_test_case_dir.exists() and new_test_case_dir.is_dir() - - if not testcasedir_already_setup: - setup_testcase_dir(package, tempdir=tempdir, new_test_case_dir=new_test_case_dir, verbosity=verbosity) # "--enable-error-code ignore-without-code" is purposefully omitted. # See https://github.com/python/typeshed/pull/8083 @@ -202,39 +203,103 @@ def run_testcases( mypy_command = [python_exe, "-m", "mypy"] + flags if verbosity is Verbosity.VERBOSE: - verbose_log(f"{mypy_command=}") + description = f"{package.name}/{version}/{platform}" + msg = f"{description}: {mypy_command=}\n" if "MYPYPATH" in env_vars: - verbose_log(f"{env_vars['MYPYPATH']=}") + msg += f"{description}: {env_vars['MYPYPATH']=}" else: - verbose_log("MYPYPATH not set") + msg += f"{description}: MYPYPATH not set" + msg += "\n" + verbose_log(msg) return subprocess.run(mypy_command, capture_output=True, text=True, env=env_vars) -def test_testcase_directory( - package: PackageInfo, version: str, platform: str, *, verbosity: Verbosity, tempdir: Path -) -> ReturnCode: - msg = f"Running mypy --platform {platform} --python-version {version} on the " - msg += "standard library test cases..." if package.is_stdlib else f"test cases for {package.name!r}..." +@dataclass(frozen=True) +class Result: + code: int + command_run: str + stderr: str + stdout: str + test_case_dir: Path + tempdir: Path + + def print_description(self, *, verbosity: Verbosity) -> None: + if self.code: + print(f"{self.command_run}:", end=" ") + print_error("FAILURE\n") + replacements = (str(self.tempdir / TEST_CASES), str(self.test_case_dir)) + if self.stderr: + print_error(self.stderr, fix_path=replacements) + if self.stdout: + print_error(self.stdout, fix_path=replacements) + + +def test_testcase_directory(package: PackageInfo, version: str, platform: str, *, verbosity: Verbosity, tempdir: Path) -> Result: + msg = f"mypy --platform {platform} --python-version {version} on the " + msg += "standard library test cases" if package.is_stdlib else f"test cases for {package.name!r}" if verbosity > Verbosity.QUIET: - print(msg, end=" ", flush=True) + _PRINT_QUEUE.put(f"Running {msg}...") - result = run_testcases(package=package, version=version, platform=platform, tempdir=tempdir, verbosity=verbosity) + proc_info = run_testcases(package=package, version=version, platform=platform, tempdir=tempdir, verbosity=verbosity) + return Result( + code=proc_info.returncode, + command_run=msg, + stderr=proc_info.stderr, + stdout=proc_info.stdout, + test_case_dir=package.test_case_directory, + tempdir=tempdir, + ) - if result.returncode: - if verbosity is Verbosity.QUIET: - # We'll already have printed this if --verbosity QUIET wasn't passed. - # If --verbosity QUIET was passed, only print this if there were errors. - # If there are errors, the output is inscrutable if this isn't printed. - print(msg, end=" ") - print_error("failure\n") - replacements = (str(tempdir / TEST_CASES), str(package.test_case_directory)) - if result.stderr: - print_error(result.stderr, fix_path=replacements) - if result.stdout: - print_error(result.stdout, fix_path=replacements) - elif verbosity > Verbosity.QUIET: - print_success_msg() - return result.returncode + +def print_queued_messages(ev: threading.Event) -> None: + while not ev.is_set(): + with suppress(queue.Empty): + print(_PRINT_QUEUE.get(timeout=0.5), flush=True) + while True: + try: + msg = _PRINT_QUEUE.get_nowait() + except queue.Empty: + return + else: + print(msg, flush=True) + + +def concurrently_run_testcases( + stack: ExitStack, + testcase_directories: list[PackageInfo], + verbosity: Verbosity, + platforms_to_test: list[str], + versions_to_test: list[str], +) -> list[Result]: + packageinfo_to_tempdir = { + package_info: Path(stack.enter_context(tempfile.TemporaryDirectory())) for package_info in testcase_directories + } + + event = threading.Event() + printer_thread = threading.Thread(target=print_queued_messages, args=(event,)) + printer_thread.start() + + with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + # Each temporary directory may be used by multiple processes concurrently during the next step; + # must make sure that they're all setup correctly before starting the next step, + # in order to avoid race conditions + testcase_futures = [ + executor.submit(setup_testcase_dir, package, tempdir, verbosity) + for package, tempdir in packageinfo_to_tempdir.items() + ] + concurrent.futures.wait(testcase_futures) + + mypy_futures = [ + executor.submit(test_testcase_directory, testcase_dir, version, platform, verbosity=verbosity, tempdir=tempdir) + for (testcase_dir, tempdir), platform, version in product( + packageinfo_to_tempdir.items(), platforms_to_test, versions_to_test + ) + ] + results = [future.result() for future in mypy_futures] + + event.set() + printer_thread.join() + return results def main() -> ReturnCode: @@ -253,16 +318,23 @@ def main() -> ReturnCode: versions_to_test = args.versions_to_test or [f"3.{sys.version_info[1]}"] code = 0 - for testcase_dir in testcase_directories: - with tempfile.TemporaryDirectory() as td: - tempdir = Path(td) - for platform, version in product(platforms_to_test, versions_to_test): - this_code = test_testcase_directory(testcase_dir, version, platform, verbosity=verbosity, tempdir=tempdir) - code = max(code, this_code) + results: list[Result] | None = None + + with ExitStack() as stack: + results = concurrently_run_testcases(stack, testcase_directories, verbosity, platforms_to_test, versions_to_test) + + assert results is not None + print() + + for result in results: + result.print_description(verbosity=verbosity) + + code = max(result.code for result in results) + if code: - print_error("\nTest completed with errors") + print_error("Test completed with errors") else: - print(colored("\nTest completed successfully!", "green")) + print(colored("Test completed successfully!", "green")) return code