mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-08 21:14:48 +08:00
Improve asyncio.subprocess stubs (#5327)
This commit is contained in:
@@ -1,13 +1,14 @@
|
||||
import subprocess
|
||||
import sys
|
||||
from _typeshed import AnyPath
|
||||
from asyncio import events, protocols, streams, transports
|
||||
from typing import IO, Any, Optional, Tuple, Union
|
||||
from typing import IO, Any, Callable, Optional, Tuple, Union
|
||||
from typing_extensions import Literal
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
from os import PathLike
|
||||
|
||||
_ExecArg = Union[str, bytes, PathLike[str], PathLike[bytes]]
|
||||
_ExecArg = AnyPath
|
||||
else:
|
||||
_ExecArg = Union[str, bytes] # Union used instead of AnyStr due to mypy issue #1236
|
||||
_ExecArg = Union[str, bytes]
|
||||
|
||||
PIPE: int
|
||||
STDOUT: int
|
||||
@@ -41,12 +42,30 @@ class Process:
|
||||
|
||||
if sys.version_info >= (3, 10):
|
||||
async def create_subprocess_shell(
|
||||
cmd: Union[str, bytes], # Union used instead of AnyStr due to mypy issue #1236
|
||||
cmd: Union[str, bytes],
|
||||
stdin: Union[int, IO[Any], None] = ...,
|
||||
stdout: Union[int, IO[Any], None] = ...,
|
||||
stderr: Union[int, IO[Any], None] = ...,
|
||||
limit: int = ...,
|
||||
**kwds: Any,
|
||||
*,
|
||||
# These parameters are forced to these values by BaseEventLoop.subprocess_shell
|
||||
universal_newlines: Literal[False] = ...,
|
||||
shell: Literal[True] = ...,
|
||||
bufsize: Literal[0] = ...,
|
||||
encoding: None = ...,
|
||||
errors: None = ...,
|
||||
text: Literal[False, None] = ...,
|
||||
# These parameters are taken by subprocess.Popen, which this ultimately delegates to
|
||||
executable: Optional[AnyPath] = ...,
|
||||
preexec_fn: Optional[Callable[[], Any]] = ...,
|
||||
close_fds: bool = ...,
|
||||
cwd: Optional[AnyPath] = ...,
|
||||
env: Optional[subprocess._ENV] = ...,
|
||||
startupinfo: Optional[Any] = ...,
|
||||
creationflags: int = ...,
|
||||
restore_signals: bool = ...,
|
||||
start_new_session: bool = ...,
|
||||
pass_fds: Any = ...,
|
||||
) -> Process: ...
|
||||
async def create_subprocess_exec(
|
||||
program: _ExecArg,
|
||||
@@ -55,18 +74,53 @@ if sys.version_info >= (3, 10):
|
||||
stdout: Union[int, IO[Any], None] = ...,
|
||||
stderr: Union[int, IO[Any], None] = ...,
|
||||
limit: int = ...,
|
||||
**kwds: Any,
|
||||
# These parameters are forced to these values by BaseEventLoop.subprocess_shell
|
||||
universal_newlines: Literal[False] = ...,
|
||||
shell: Literal[True] = ...,
|
||||
bufsize: Literal[0] = ...,
|
||||
encoding: None = ...,
|
||||
errors: None = ...,
|
||||
# These parameters are taken by subprocess.Popen, which this ultimately delegates to
|
||||
text: Optional[bool] = ...,
|
||||
executable: Optional[AnyPath] = ...,
|
||||
preexec_fn: Optional[Callable[[], Any]] = ...,
|
||||
close_fds: bool = ...,
|
||||
cwd: Optional[AnyPath] = ...,
|
||||
env: Optional[subprocess._ENV] = ...,
|
||||
startupinfo: Optional[Any] = ...,
|
||||
creationflags: int = ...,
|
||||
restore_signals: bool = ...,
|
||||
start_new_session: bool = ...,
|
||||
pass_fds: Any = ...,
|
||||
) -> Process: ...
|
||||
|
||||
else:
|
||||
async def create_subprocess_shell(
|
||||
cmd: Union[str, bytes], # Union used instead of AnyStr due to mypy issue #1236
|
||||
cmd: Union[str, bytes],
|
||||
stdin: Union[int, IO[Any], None] = ...,
|
||||
stdout: Union[int, IO[Any], None] = ...,
|
||||
stderr: Union[int, IO[Any], None] = ...,
|
||||
loop: Optional[events.AbstractEventLoop] = ...,
|
||||
limit: int = ...,
|
||||
**kwds: Any,
|
||||
*,
|
||||
# These parameters are forced to these values by BaseEventLoop.subprocess_shell
|
||||
universal_newlines: Literal[False] = ...,
|
||||
shell: Literal[True] = ...,
|
||||
bufsize: Literal[0] = ...,
|
||||
encoding: None = ...,
|
||||
errors: None = ...,
|
||||
text: Literal[False, None] = ...,
|
||||
# These parameters are taken by subprocess.Popen, which this ultimately delegates to
|
||||
executable: Optional[AnyPath] = ...,
|
||||
preexec_fn: Optional[Callable[[], Any]] = ...,
|
||||
close_fds: bool = ...,
|
||||
cwd: Optional[AnyPath] = ...,
|
||||
env: Optional[subprocess._ENV] = ...,
|
||||
startupinfo: Optional[Any] = ...,
|
||||
creationflags: int = ...,
|
||||
restore_signals: bool = ...,
|
||||
start_new_session: bool = ...,
|
||||
pass_fds: Any = ...,
|
||||
) -> Process: ...
|
||||
async def create_subprocess_exec(
|
||||
program: _ExecArg,
|
||||
@@ -76,5 +130,22 @@ else:
|
||||
stderr: Union[int, IO[Any], None] = ...,
|
||||
loop: Optional[events.AbstractEventLoop] = ...,
|
||||
limit: int = ...,
|
||||
**kwds: Any,
|
||||
# These parameters are forced to these values by BaseEventLoop.subprocess_shell
|
||||
universal_newlines: Literal[False] = ...,
|
||||
shell: Literal[True] = ...,
|
||||
bufsize: Literal[0] = ...,
|
||||
encoding: None = ...,
|
||||
errors: None = ...,
|
||||
# These parameters are taken by subprocess.Popen, which this ultimately delegates to
|
||||
text: Optional[bool] = ...,
|
||||
executable: Optional[AnyPath] = ...,
|
||||
preexec_fn: Optional[Callable[[], Any]] = ...,
|
||||
close_fds: bool = ...,
|
||||
cwd: Optional[AnyPath] = ...,
|
||||
env: Optional[subprocess._ENV] = ...,
|
||||
startupinfo: Optional[Any] = ...,
|
||||
creationflags: int = ...,
|
||||
restore_signals: bool = ...,
|
||||
start_new_session: bool = ...,
|
||||
pass_fds: Any = ...,
|
||||
) -> Process: ...
|
||||
|
||||
Reference in New Issue
Block a user