[stubsabot] Support "Removal" PRs (#14401)

This commit is contained in:
Semyon Moroz
2025-07-16 11:48:04 +00:00
committed by GitHub
parent bd17a577a5
commit 82e2e9c53a
3 changed files with 183 additions and 33 deletions
+181 -30
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import argparse
import asyncio
import calendar
import contextlib
import datetime
import enum
@@ -10,33 +11,38 @@ import functools
import io
import os
import re
import shutil
import subprocess
import sys
import tarfile
import textwrap
import urllib.parse
import zipfile
from collections.abc import Iterator, Mapping, Sequence
from collections.abc import Callable, Iterator, Mapping, Sequence
from dataclasses import dataclass, field
from http import HTTPStatus
from pathlib import Path
from typing import Annotated, Any, ClassVar, NamedTuple
from typing import Annotated, Any, ClassVar, NamedTuple, TypeVar
from typing_extensions import Self, TypeAlias
import aiohttp
import packaging.version
import tomli
import tomlkit
from packaging.specifiers import Specifier
from termcolor import colored
from tomlkit.items import String
from ts_utils.metadata import StubMetadata, read_metadata, update_metadata
from ts_utils.paths import STUBS_PATH, distribution_path
from ts_utils.metadata import NoSuchStubError, StubMetadata, metadata_path, read_metadata, update_metadata
from ts_utils.paths import PYRIGHT_CONFIG, STUBS_PATH, distribution_path
TYPESHED_OWNER = "python"
TYPESHED_API_URL = f"https://api.github.com/repos/{TYPESHED_OWNER}/typeshed"
STUBSABOT_LABEL = "bot: stubsabot"
POLICY_MONTHS_DELTA = 6
class ActionLevel(enum.IntEnum):
def __new__(cls, value: int, doc: str) -> Self:
@@ -149,6 +155,16 @@ class Obsolete:
return f"Marking {self.distribution} as obsolete since {self.obsolete_since_version!r}"
@dataclass
class Remove:
distribution: str
reason: str
links: dict[str, str]
def __str__(self) -> str:
return f"Removing {self.distribution} as {self.reason}"
@dataclass
class NoUpdate:
distribution: str
@@ -158,6 +174,38 @@ class NoUpdate:
return f"Skipping {self.distribution}: {self.reason}"
_T = TypeVar("_T")
async def with_extracted_archive(
release_to_download: PypiReleaseDownload,
*,
session: aiohttp.ClientSession,
handler: Callable[[zipfile.ZipFile | tarfile.TarFile], _T],
) -> _T:
async with session.get(release_to_download.url) as response:
body = io.BytesIO(await response.read())
packagetype = release_to_download.packagetype
if packagetype == "bdist_wheel":
assert release_to_download.filename.endswith(".whl")
with zipfile.ZipFile(body) as zf:
return handler(zf)
elif packagetype == "sdist":
# sdist defaults to `.tar.gz` on Lunix and to `.zip` on Windows:
# https://docs.python.org/3.11/distutils/sourcedist.html
if release_to_download.filename.endswith(".tar.gz"):
with tarfile.open(fileobj=body, mode="r:gz") as zf:
return handler(zf)
elif release_to_download.filename.endswith(".zip"):
with zipfile.ZipFile(body) as zf:
return handler(zf)
else:
raise AssertionError(f"Package file {release_to_download.filename!r} does not end with '.tar.gz' or '.zip'")
else:
raise AssertionError(f"Unknown package type for {release_to_download.distribution}: {packagetype!r}")
def all_py_files_in_source_are_in_py_typed_dirs(source: zipfile.ZipFile | tarfile.TarFile) -> bool:
py_typed_dirs: list[Path] = []
all_python_files: list[Path] = []
@@ -207,27 +255,7 @@ def all_py_files_in_source_are_in_py_typed_dirs(source: zipfile.ZipFile | tarfil
async def release_contains_py_typed(release_to_download: PypiReleaseDownload, *, session: aiohttp.ClientSession) -> bool:
async with session.get(release_to_download.url) as response:
body = io.BytesIO(await response.read())
packagetype = release_to_download.packagetype
if packagetype == "bdist_wheel":
assert release_to_download.filename.endswith(".whl")
with zipfile.ZipFile(body) as zf:
return all_py_files_in_source_are_in_py_typed_dirs(zf)
elif packagetype == "sdist":
# sdist defaults to `.tar.gz` on Lunix and to `.zip` on Windows:
# https://docs.python.org/3.11/distutils/sourcedist.html
if release_to_download.filename.endswith(".tar.gz"):
with tarfile.open(fileobj=body, mode="r:gz") as zf:
return all_py_files_in_source_are_in_py_typed_dirs(zf)
elif release_to_download.filename.endswith(".zip"):
with zipfile.ZipFile(body) as zf:
return all_py_files_in_source_are_in_py_typed_dirs(zf)
else:
raise AssertionError(f"Package file {release_to_download.filename!r} does not end with '.tar.gz' or '.zip'")
else:
raise AssertionError(f"Unknown package type for {release_to_download.distribution}: {packagetype!r}")
return await with_extracted_archive(release_to_download, session=session, handler=all_py_files_in_source_are_in_py_typed_dirs)
async def find_first_release_with_py_typed(pypi_info: PypiInfo, *, session: aiohttp.ClientSession) -> PypiReleaseDownload | None:
@@ -470,12 +498,94 @@ async def analyze_diff(
return DiffAnalysis(py_files=py_files, py_files_stubbed_in_typeshed=py_files_stubbed_in_typeshed)
async def determine_action(distribution: str, session: aiohttp.ClientSession) -> Update | NoUpdate | Obsolete:
def _add_months(date: datetime.date, months: int) -> datetime.date:
month = date.month - 1 + months
year = date.year + month // 12
month = month % 12 + 1
day = min(date.day, calendar.monthrange(year, month)[1])
return datetime.date(year, month, day)
def obsolete_more_than_6_months(distribution: str) -> bool:
try:
with metadata_path(distribution).open("rb") as file:
data = tomlkit.load(file)
except FileNotFoundError:
raise NoSuchStubError(f"Typeshed has no stubs for {distribution!r}!") from None
obsolete_since = data["obsolete_since"]
if not obsolete_since:
return False
assert type(obsolete_since) is String
comment: str | None = obsolete_since.trivia.comment
if not comment:
return False
release_date_string = comment.removeprefix("# Released on ")
release_date = datetime.date.fromisoformat(release_date_string)
remove_date = _add_months(release_date, POLICY_MONTHS_DELTA)
today = datetime.datetime.now(tz=datetime.timezone.utc).date()
return remove_date >= today
def parse_no_longer_updated_from_archive(source: zipfile.ZipFile | tarfile.TarFile) -> bool:
if isinstance(source, zipfile.ZipFile):
try:
file = source.open("METADATA.toml", "r")
except KeyError:
return False
else:
try:
tarinfo = source.getmember("METADATA.toml")
file = source.extractfile(tarinfo) # type: ignore[assignment]
if file is None:
return False
except KeyError:
return False
with file as f:
toml_data: dict[str, object] = tomli.load(f)
no_longer_updated = toml_data.get("no_longer_updated", False)
assert type(no_longer_updated) is bool
return bool(no_longer_updated)
async def has_no_longer_updated_release(release_to_download: PypiReleaseDownload, *, session: aiohttp.ClientSession) -> bool:
"""
Return `True` if the `no_longer_updated` field exists and the value is
`True` in the `METADATA.toml` file of latest `types-{distribution}` pypi release.
"""
return await with_extracted_archive(release_to_download, session=session, handler=parse_no_longer_updated_from_archive)
async def determine_action(distribution: str, session: aiohttp.ClientSession) -> Update | NoUpdate | Obsolete | Remove:
stub_info = read_metadata(distribution)
if stub_info.is_obsolete:
return NoUpdate(stub_info.distribution, "obsolete")
if obsolete_more_than_6_months(stub_info.distribution):
pypi_info = await fetch_pypi_info(f"types-{stub_info.distribution}", session)
latest_release = pypi_info.get_latest_release()
links = {
"Typeshed release": f"{pypi_info.pypi_root}",
"Typeshed stubs": f"https://github.com/{TYPESHED_OWNER}/typeshed/tree/main/stubs/{stub_info.distribution}",
}
return Remove(stub_info.distribution, reason="older than 6 months", links=links)
else:
return NoUpdate(stub_info.distribution, "obsolete")
if stub_info.no_longer_updated:
return NoUpdate(stub_info.distribution, "no longer updated")
pypi_info = await fetch_pypi_info(f"types-{stub_info.distribution}", session)
latest_release = pypi_info.get_latest_release()
if await has_no_longer_updated_release(latest_release, session=session):
links = {
"Typeshed release": f"{pypi_info.pypi_root}",
"Typeshed stubs": f"https://github.com/{TYPESHED_OWNER}/typeshed/tree/main/stubs/{stub_info.distribution}",
}
return Remove(stub_info.distribution, reason="no longer updated", links=links)
else:
return NoUpdate(stub_info.distribution, "no longer updated")
pypi_info = await fetch_pypi_info(stub_info.distribution, session)
latest_release = pypi_info.get_latest_release()
@@ -683,6 +793,22 @@ def get_update_pr_body(update: Update, metadata: Mapping[str, Any]) -> str:
return body
def remove_stubs(distribution: str) -> None:
stub_path = distribution_path(distribution)
target_path_prefix = f'"stubs/{distribution}'
if stub_path.exists() and stub_path.is_dir():
shutil.rmtree(stub_path)
with PYRIGHT_CONFIG.open("r", encoding="UTF-8") as f:
lines = f.readlines()
lines = [line for line in lines if not line.lstrip().startswith(target_path_prefix)]
with PYRIGHT_CONFIG.open("w", encoding="UTF-8") as f:
f.writelines(lines)
async def suggest_typeshed_update(update: Update, session: aiohttp.ClientSession, action_level: ActionLevel) -> None:
if action_level <= ActionLevel.nothing:
return
@@ -729,6 +855,28 @@ async def suggest_typeshed_obsolete(obsolete: Obsolete, session: aiohttp.ClientS
await create_or_update_pull_request(title=title, body=body, branch_name=branch_name, session=session)
async def suggest_typeshed_remove(remove: Remove, session: aiohttp.ClientSession, action_level: ActionLevel) -> None:
if action_level <= ActionLevel.nothing:
return
title = f"[stubsabot] Remove {remove.distribution} as {remove.reason}"
async with _repo_lock:
branch_name = f"{BRANCH_PREFIX}/{normalize(remove.distribution)}"
subprocess.check_call(["git", "checkout", "-B", branch_name, "origin/main"])
remove_stubs(remove.distribution)
body = "\n".join(f"{k}: {v}" for k, v in remove.links.items())
subprocess.check_call(["git", "commit", "--all", "-m", f"{title}\n\n{body}"])
if action_level <= ActionLevel.local:
return
if not latest_commit_is_different_to_last_commit_on_origin(branch_name):
print(f"No pushing to origin required: origin/{branch_name} exists and requires no changes!")
return
somewhat_safe_force_push(branch_name)
if action_level <= ActionLevel.fork:
return
await create_or_update_pull_request(title=title, body=body, branch_name=branch_name, session=session)
async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
@@ -803,10 +951,13 @@ async def main() -> None:
if isinstance(update, Update):
await suggest_typeshed_update(update, session, action_level=args.action_level)
continue
# Redundant, but keeping for extra runtime validation
if isinstance(update, Obsolete): # pyright: ignore[reportUnnecessaryIsInstance]
if isinstance(update, Obsolete):
await suggest_typeshed_obsolete(update, session, action_level=args.action_level)
continue
# Redundant, but keeping for extra runtime validation
if isinstance(update, Remove): # pyright: ignore[reportUnnecessaryIsInstance]
await suggest_typeshed_remove(update, session, action_level=args.action_level)
continue
except RemoteConflictError as e:
print(colored(f"... but ran into {type(e).__qualname__}: {e}", "red"))
continue