regr_test.py: improve several concurrency details (#11984)

This commit is contained in:
Alex Waygood
2024-05-19 14:21:26 -04:00
committed by GitHub
parent 08b9c86488
commit 347f8a96b5

View File

@@ -13,8 +13,8 @@ import subprocess
import sys
import tempfile
import threading
from collections.abc import Callable
from contextlib import ExitStack, suppress
from collections.abc import Callable, Iterator
from contextlib import ExitStack, contextmanager, suppress
from dataclasses import dataclass
from enum import IntEnum
from functools import partial
@@ -145,8 +145,12 @@ def setup_testcase_dir(package: DistributionTests, tempdir: Path, verbosity: Ver
if requirements.external_pkgs:
venv_location = str(tempdir / VENV_DIR)
subprocess.run(["uv", "venv", venv_location], check=True, capture_output=True)
# Use --no-cache-dir to avoid issues with concurrent read/writes to the cache
uv_command = ["uv", "pip", "install", get_mypy_req(), *requirements.external_pkgs, "--no-cache-dir"]
uv_command = ["uv", "pip", "install", get_mypy_req(), *requirements.external_pkgs]
if sys.platform == "win32":
# Reads/writes to the cache are threadsafe with uv generally...
# but not on old Windows versions
# https://github.com/astral-sh/uv/issues/2810
uv_command.append("--no-cache-dir")
if verbosity is Verbosity.VERBOSE:
verbose_log(f"{package.name}: Setting up venv in {venv_location}. {uv_command=}\n")
try:
@@ -309,6 +313,19 @@ def concurrently_run_testcases(
if not to_do:
return []
@contextmanager
def cleanup_threads(
event: threading.Event, printer_thread: threading.Thread, executor: concurrent.futures.ThreadPoolExecutor
) -> Iterator[None]:
try:
yield
except:
_PRINT_QUEUE.put("Shutting down worker threads...")
event.set()
printer_thread.join()
executor.shutdown(cancel_futures=True)
raise
event = threading.Event()
printer_thread = threading.Thread(target=print_queued_messages, args=(event,))
printer_thread.start()
@@ -321,10 +338,14 @@ def concurrently_run_testcases(
executor.submit(setup_testcase_dir, package, tempdir, verbosity)
for package, tempdir in packageinfo_to_tempdir.items()
]
concurrent.futures.wait(testcase_futures)
with cleanup_threads(event, printer_thread, executor):
concurrent.futures.wait(testcase_futures)
mypy_futures = [executor.submit(task) for task in to_do]
results = [future.result() for future in mypy_futures]
with cleanup_threads(event, printer_thread, executor):
results = [future.result() for future in mypy_futures]
event.set()
printer_thread.join()