mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-07 20:54:28 +08:00
Speedup regr_test.py by running test cases concurrently (#10714)
This commit is contained in:
@@ -4,12 +4,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
import os
|
||||
import queue
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
from contextlib import ExitStack, suppress
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
from itertools import product
|
||||
from pathlib import Path
|
||||
@@ -24,7 +29,6 @@ from utils import (
|
||||
get_mypy_req,
|
||||
make_venv,
|
||||
print_error,
|
||||
print_success_msg,
|
||||
testcase_dir_from_package_name,
|
||||
)
|
||||
|
||||
@@ -103,18 +107,20 @@ parser.add_argument(
|
||||
),
|
||||
)
|
||||
|
||||
_PRINT_QUEUE: queue.SimpleQueue[str] = queue.SimpleQueue()
|
||||
|
||||
|
||||
def verbose_log(msg: str) -> None:
|
||||
print(colored("\n" + msg, "blue"))
|
||||
_PRINT_QUEUE.put(colored(msg, "blue"))
|
||||
|
||||
|
||||
def setup_testcase_dir(package: PackageInfo, tempdir: Path, new_test_case_dir: Path, verbosity: Verbosity) -> None:
|
||||
def setup_testcase_dir(package: PackageInfo, tempdir: Path, verbosity: Verbosity) -> None:
|
||||
if verbosity is verbosity.VERBOSE:
|
||||
verbose_log(f"Setting up testcase dir in {tempdir}")
|
||||
verbose_log(f"{package.name}: Setting up testcase dir in {tempdir}")
|
||||
# --warn-unused-ignores doesn't work for files inside typeshed.
|
||||
# SO, to work around this, we copy the test_cases directory into a TemporaryDirectory,
|
||||
# and run the test cases inside of that.
|
||||
shutil.copytree(package.test_case_directory, new_test_case_dir)
|
||||
shutil.copytree(package.test_case_directory, tempdir / TEST_CASES)
|
||||
if package.is_stdlib:
|
||||
return
|
||||
|
||||
@@ -137,16 +143,15 @@ def setup_testcase_dir(package: PackageInfo, tempdir: Path, new_test_case_dir: P
|
||||
shutil.copytree(Path("stubs", requirement), new_typeshed / "stubs" / requirement)
|
||||
|
||||
if requirements.external_pkgs:
|
||||
if verbosity is Verbosity.VERBOSE:
|
||||
verbose_log(f"Setting up venv in {tempdir / VENV_DIR}")
|
||||
pip_exe = make_venv(tempdir / VENV_DIR).pip_exe
|
||||
pip_command = [pip_exe, "install", get_mypy_req(), *requirements.external_pkgs]
|
||||
# Use --no-cache-dir to avoid issues with concurrent read/writes to the cache
|
||||
pip_command = [pip_exe, "install", get_mypy_req(), *requirements.external_pkgs, "--no-cache-dir"]
|
||||
if verbosity is Verbosity.VERBOSE:
|
||||
verbose_log(f"{pip_command=}")
|
||||
verbose_log(f"{package.name}: Setting up venv in {tempdir / VENV_DIR}. {pip_command=}\n")
|
||||
try:
|
||||
subprocess.run(pip_command, check=True, capture_output=True, text=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(e.stderr)
|
||||
_PRINT_QUEUE.put(f"{package.name}\n{e.stderr}")
|
||||
raise
|
||||
|
||||
|
||||
@@ -155,10 +160,6 @@ def run_testcases(
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
env_vars = dict(os.environ)
|
||||
new_test_case_dir = tempdir / TEST_CASES
|
||||
testcasedir_already_setup = new_test_case_dir.exists() and new_test_case_dir.is_dir()
|
||||
|
||||
if not testcasedir_already_setup:
|
||||
setup_testcase_dir(package, tempdir=tempdir, new_test_case_dir=new_test_case_dir, verbosity=verbosity)
|
||||
|
||||
# "--enable-error-code ignore-without-code" is purposefully omitted.
|
||||
# See https://github.com/python/typeshed/pull/8083
|
||||
@@ -202,39 +203,103 @@ def run_testcases(
|
||||
|
||||
mypy_command = [python_exe, "-m", "mypy"] + flags
|
||||
if verbosity is Verbosity.VERBOSE:
|
||||
verbose_log(f"{mypy_command=}")
|
||||
description = f"{package.name}/{version}/{platform}"
|
||||
msg = f"{description}: {mypy_command=}\n"
|
||||
if "MYPYPATH" in env_vars:
|
||||
verbose_log(f"{env_vars['MYPYPATH']=}")
|
||||
msg += f"{description}: {env_vars['MYPYPATH']=}"
|
||||
else:
|
||||
verbose_log("MYPYPATH not set")
|
||||
msg += f"{description}: MYPYPATH not set"
|
||||
msg += "\n"
|
||||
verbose_log(msg)
|
||||
return subprocess.run(mypy_command, capture_output=True, text=True, env=env_vars)
|
||||
|
||||
|
||||
def test_testcase_directory(
|
||||
package: PackageInfo, version: str, platform: str, *, verbosity: Verbosity, tempdir: Path
|
||||
) -> ReturnCode:
|
||||
msg = f"Running mypy --platform {platform} --python-version {version} on the "
|
||||
msg += "standard library test cases..." if package.is_stdlib else f"test cases for {package.name!r}..."
|
||||
@dataclass(frozen=True)
|
||||
class Result:
|
||||
code: int
|
||||
command_run: str
|
||||
stderr: str
|
||||
stdout: str
|
||||
test_case_dir: Path
|
||||
tempdir: Path
|
||||
|
||||
def print_description(self, *, verbosity: Verbosity) -> None:
|
||||
if self.code:
|
||||
print(f"{self.command_run}:", end=" ")
|
||||
print_error("FAILURE\n")
|
||||
replacements = (str(self.tempdir / TEST_CASES), str(self.test_case_dir))
|
||||
if self.stderr:
|
||||
print_error(self.stderr, fix_path=replacements)
|
||||
if self.stdout:
|
||||
print_error(self.stdout, fix_path=replacements)
|
||||
|
||||
|
||||
def test_testcase_directory(package: PackageInfo, version: str, platform: str, *, verbosity: Verbosity, tempdir: Path) -> Result:
|
||||
msg = f"mypy --platform {platform} --python-version {version} on the "
|
||||
msg += "standard library test cases" if package.is_stdlib else f"test cases for {package.name!r}"
|
||||
if verbosity > Verbosity.QUIET:
|
||||
print(msg, end=" ", flush=True)
|
||||
_PRINT_QUEUE.put(f"Running {msg}...")
|
||||
|
||||
result = run_testcases(package=package, version=version, platform=platform, tempdir=tempdir, verbosity=verbosity)
|
||||
proc_info = run_testcases(package=package, version=version, platform=platform, tempdir=tempdir, verbosity=verbosity)
|
||||
return Result(
|
||||
code=proc_info.returncode,
|
||||
command_run=msg,
|
||||
stderr=proc_info.stderr,
|
||||
stdout=proc_info.stdout,
|
||||
test_case_dir=package.test_case_directory,
|
||||
tempdir=tempdir,
|
||||
)
|
||||
|
||||
if result.returncode:
|
||||
if verbosity is Verbosity.QUIET:
|
||||
# We'll already have printed this if --verbosity QUIET wasn't passed.
|
||||
# If --verbosity QUIET was passed, only print this if there were errors.
|
||||
# If there are errors, the output is inscrutable if this isn't printed.
|
||||
print(msg, end=" ")
|
||||
print_error("failure\n")
|
||||
replacements = (str(tempdir / TEST_CASES), str(package.test_case_directory))
|
||||
if result.stderr:
|
||||
print_error(result.stderr, fix_path=replacements)
|
||||
if result.stdout:
|
||||
print_error(result.stdout, fix_path=replacements)
|
||||
elif verbosity > Verbosity.QUIET:
|
||||
print_success_msg()
|
||||
return result.returncode
|
||||
|
||||
def print_queued_messages(ev: threading.Event) -> None:
|
||||
while not ev.is_set():
|
||||
with suppress(queue.Empty):
|
||||
print(_PRINT_QUEUE.get(timeout=0.5), flush=True)
|
||||
while True:
|
||||
try:
|
||||
msg = _PRINT_QUEUE.get_nowait()
|
||||
except queue.Empty:
|
||||
return
|
||||
else:
|
||||
print(msg, flush=True)
|
||||
|
||||
|
||||
def concurrently_run_testcases(
|
||||
stack: ExitStack,
|
||||
testcase_directories: list[PackageInfo],
|
||||
verbosity: Verbosity,
|
||||
platforms_to_test: list[str],
|
||||
versions_to_test: list[str],
|
||||
) -> list[Result]:
|
||||
packageinfo_to_tempdir = {
|
||||
package_info: Path(stack.enter_context(tempfile.TemporaryDirectory())) for package_info in testcase_directories
|
||||
}
|
||||
|
||||
event = threading.Event()
|
||||
printer_thread = threading.Thread(target=print_queued_messages, args=(event,))
|
||||
printer_thread.start()
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
|
||||
# Each temporary directory may be used by multiple processes concurrently during the next step;
|
||||
# must make sure that they're all setup correctly before starting the next step,
|
||||
# in order to avoid race conditions
|
||||
testcase_futures = [
|
||||
executor.submit(setup_testcase_dir, package, tempdir, verbosity)
|
||||
for package, tempdir in packageinfo_to_tempdir.items()
|
||||
]
|
||||
concurrent.futures.wait(testcase_futures)
|
||||
|
||||
mypy_futures = [
|
||||
executor.submit(test_testcase_directory, testcase_dir, version, platform, verbosity=verbosity, tempdir=tempdir)
|
||||
for (testcase_dir, tempdir), platform, version in product(
|
||||
packageinfo_to_tempdir.items(), platforms_to_test, versions_to_test
|
||||
)
|
||||
]
|
||||
results = [future.result() for future in mypy_futures]
|
||||
|
||||
event.set()
|
||||
printer_thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def main() -> ReturnCode:
|
||||
@@ -253,16 +318,23 @@ def main() -> ReturnCode:
|
||||
versions_to_test = args.versions_to_test or [f"3.{sys.version_info[1]}"]
|
||||
|
||||
code = 0
|
||||
for testcase_dir in testcase_directories:
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
tempdir = Path(td)
|
||||
for platform, version in product(platforms_to_test, versions_to_test):
|
||||
this_code = test_testcase_directory(testcase_dir, version, platform, verbosity=verbosity, tempdir=tempdir)
|
||||
code = max(code, this_code)
|
||||
results: list[Result] | None = None
|
||||
|
||||
with ExitStack() as stack:
|
||||
results = concurrently_run_testcases(stack, testcase_directories, verbosity, platforms_to_test, versions_to_test)
|
||||
|
||||
assert results is not None
|
||||
print()
|
||||
|
||||
for result in results:
|
||||
result.print_description(verbosity=verbosity)
|
||||
|
||||
code = max(result.code for result in results)
|
||||
|
||||
if code:
|
||||
print_error("\nTest completed with errors")
|
||||
print_error("Test completed with errors")
|
||||
else:
|
||||
print(colored("\nTest completed successfully!", "green"))
|
||||
print(colored("Test completed successfully!", "green"))
|
||||
|
||||
return code
|
||||
|
||||
|
||||
Reference in New Issue
Block a user