shellous

Async Processes and Pipelines

PyPI docs CI codecov Downloads

shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.

import asyncio
from shellous import sh

async def main():
    result = await sh("echo", "hello")
    print(result)

asyncio.run(main())

Benefits

  • Run programs asynchronously in a single line.
  • Redirect stdin, stdout and stderr to files, memory buffers, async streams or loggers.
  • Iterate asynchronously over subprocess output.
  • Set timeouts and reliably cancel running processes.
  • Run a program with a pseudo-terminal (pty).
  • Use send() and expect() to manually control a subprocess.
  • Construct pipelines and use process substitution directly from Python (no shell required).
  • Runs on Linux, MacOS, FreeBSD and Windows.
  • Monitor processes being started and stopped with audit_callback API.

Requirements

  • Requires Python 3.9 or later.
  • Requires an asyncio event loop.
  • Pseudo-terminals require a Unix system.
  • Process substitution requires a Unix system with /dev/fd support.

Running a Command

The tutorial in this README uses the asyncio REPL built into Python. In these examples, >>> is the REPL prompt.

Start the asyncio REPL by typing python3 -m asyncio, and import sh from the shellous module:

>>> from shellous import sh

Here's a command that runs echo "hello, world".

>>> await sh("echo", "hello, world")
'hello, world\n'

The first argument to sh is the program name. It is followed by zero or more arguments. Each argument will be converted to a string. If an argument is a list or tuple, it is flattened recursively.

>>> await sh("echo", 1, 2, [3, 4, (5, 6)])
'1 2 3 4 5 6\n'

A command does not run until you await it. When you run a command using await, it returns the value of the standard output interpreted as a UTF-8 string. It is safe to await the same command object more than once.

Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc") will run the same command as echo -n "abc".

>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'

Commands are immutable objects that represent a program invocation: program name, arguments, environment variables, redirection operators and other settings. When you use a method to modify a Command, you are returning a new Command object. The original object is unchanged.

You can wrap your commands in a function to improve type safety:

>>> from shellous import Command
>>> def exclaim(word: str) -> Command[str]:
...   return sh("echo", "-n", f"{word}!!")
... 
>>> await exclaim("Oh")
'Oh!!'

The type hint Command[str] indicates that the command returns a str.

Arguments

Commands use positional arguments only; keyword arguments are not supported.

In most cases, shellous automatically converts Python objects passed as command arguments to str or bytes. As described above, the list and tuple types are an exception; they are recursively flattened before their elements are converted to strings.

Dicts, sets, and generator types are not supported as arguments. Their string format doesn't make sense as a command line argument.

Results

When a command completes successfully, it returns the standard output (or "" if stdout is redirected). For a more detailed response, you can specify that the command should return a Result object by using the .result modifier:

>>> await echo.result("abc")
Result(exit_code=0, output_bytes=b'abc', error_bytes=b'', cancelled=False, encoding='utf-8')

A Result object contains the command's exit_code in addition to its output. A Result is True if the command's exit_code is zero. You can access the string value of the output using the .output property:

if result := await sh.result("cat", "some-file"):
    output = result.output
else:
    print(f"Command failed with exit_code={result.exit_code})

You can retrieve the string value of the standard error using the .error property. (By default, only the first 1024 bytes of standard error is stored.)

If a command was terminated by a signal, the exit_code will be the negative signal number.

The return value of sh.result("cmd", ...) uses the type hint Command[Result].

ResultError

If you are not using the .result modifier and a command fails, it raises a ResultError exception:

>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'cat: does_not_exist: No such file or directory\n', cancelled=False, encoding='utf-8')

The ResultError exception contains a Result object with the exit_code and the first 1024 bytes of standard error.

In some cases, you want to ignore certain exit code values. That is, you want to treat them as if they are normal. To do this, you can set the exit_codes option:

>>> await sh("cat", "does_not_exist").set(exit_codes={0,1})
''

If there is a problem launching a process, shellous can also raise a separate FileNotFoundError or PermissionError exception.

Async For

Using await to run a command collects the entire output of the command in memory before returning it. You can also iterate over the output lines as they arrive using async for.

>>> [line async for line in echo("hi\n", "there")]
['hi\n', ' there']

Use an async for loop when you want to examine the stream of output from a command, line by line. For example, suppose you want to run tail on a log file.

async for line in sh("tail", "-f", "/var/log/syslog"):
    if "ERROR" in line:
        print(line.rstrip())

Async With

You can use a command as an asynchronous context manager. There are two ways to run a program using a context manager: a low-level API and a high-level API.

Byte-by-Byte (Low Level)

Use async with directly when you need byte-by-byte control over the individual streams: stdin, stdout and stderr. To control a standard stream, you must tell shellous to "capture" it (For more on this, see Redirection.)

cmd = sh("cat").stdin(sh.CAPTURE).stdout(sh.CAPTURE)
async with cmd as run:
    run.stdin.write(b"abc")
    run.stdin.close()
    print(await run.stdout.readline())

result = run.result()

The streams run.stdout and run.stderr are asyncio.StreamReader objects. The stream run.stdin is an asyncio.StreamWriter object. If we didn't specify that stdin/stdout are sh.CAPTURE, the streams run.stdin and run.stdout would be None.

The return value of run.result() is a Result object. Depending on the command settings, this function may raise a ResultError on a non-zero exit code.

:warning: When reading or writing individual streams, you are responsible for managing reads and writes so they don't deadlock. You may use run.create_task to schedule a concurrent task.

You can also use async with to run a server. When you do so, you must tell the server to stop using run.cancel(). Otherwise, the context manager will wait forever for the process to exit.

async with sh("some-server") as run:
    # Send commands to the server here...
    # Manually signal the server to stop.
    run.cancel()

Prompt with Send/Expect (High Level API)

Use the prompt() method to control a process using send and expect. The prompt() method returns an asynchronous context manager (the Prompt class) that facilitates reading and writing strings and matching regular expressions.

cmd = sh("cat").set(pty=True)

async with cmd.prompt() as client:
  await client.send("abc")
  output, _ = await client.expect("\r\n")
  print(output)

The Prompt API automatically captures stdin and stdout.

Here is another example of controlling a bash co-process running in a docker container.

async def list_packages():
    "Run bash in an ubuntu docker container and list packages."
    bash_prompt = re.compile("root@[0-9a-f]+:/[^#]*# ")
    cmd = sh("docker", "run", "-it", "--rm", "-e", "TERM=dumb", "ubuntu")

    async with cmd.set(pty=True).prompt(bash_prompt, timeout=3) as cli:
        # Read up to first prompt.
        await cli.expect()

        # Disable echo. The `command()` method combines send *and* expect methods.
        await cli.command("stty -echo")

        # Return list of packages.
        result = await cli.command("apt-cache pkgnames")
        return result.strip().split("\r\n")

    # You can check the result object's exit code. You can only
    # access `cli.result` outside the `async with` block.
    assert cli.result.exit_code == 0

The prompt() API does not raise a ResultError when a command exits with an error status. Typically, you'll see an EOFError when you were expecting to read a response. You can check the exit status by retrieving the Prompt's result property outside of the async with block.

Redirection

shellous supports the redirection operators | and >>. They work similar to how they work in the unix shell. Shellous does not support use of < or > for redirection. Instead, replace these with |.

To redirect to or from a file, use a pathlib.Path object. Alternatively, you can redirect input/output to a StringIO object, an open file, a Logger, or use a special redirection constant like sh.DEVNULL.

:warning: When combining the redirect operators with await, you must use parentheses; await has higher precedence than | and >>.

Redirecting Standard Input

To redirect standard input, use the pipe operator | with the argument on the left-side. Here is an example that passes the string "abc" as standard input.

>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
'       3\n'

To read input from a file, use a Path object from pathlib.

>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
'     201\n'

Shellous supports different STDIN behavior when using different Python types.

Python Type Behavior as STDIN
str Read input from string object.
bytes, bytearray Read input from bytes object.
Path Read input from file specified by Path.
File, StringIO, ByteIO Read input from open file object.
int Read input from existing file descriptor.
asyncio.StreamReader Read input from StreamReader.
sh.DEVNULL Read input from /dev/null.
sh.INHERIT Read input from existing sys.stdin.
sh.CAPTURE You will write to stdin interactively.

Redirecting Standard Output

To redirect standard output, use the pipe operator | with the argument on the right-side. Here is an example that writes to a temporary file.

>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'

To redirect standard output with append, use the >> operator.

>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'

Shellous supports different STDOUT behavior when using different Python types.

Python Type Behavior as STDOUT/STDERR
Path Write output to the file path specified by Path.
bytearray Write output to a mutable byte array.
File, StringIO, ByteIO Write output to an open file object.
int Write output to existing file descriptor at its current position. â—†
logging.Logger Log each line of output. â—†
asyncio.StreamWriter Write output to StreamWriter. â—†
sh.CAPTURE Capture output for async with. â—†
sh.DEVNULL Write output to /dev/null. â—†
sh.INHERIT Write output to existing sys.stdout or sys.stderr. â—†

â—† For these types, there is no difference between using | and >>.

Shellous does not support redirecting standard output/error to a plain str or bytes object. If you intend to redirect output to a file, you must use a pathlib.Path object.

Redirecting Standard Error

By default, the first 1024 bytes read from standard error are stored in the Result object. Any further bytes are discarded. You can change the 1024 byte limit using the error_limit option.

To redirect standard error, use the stderr method. Standard error supports the same Python types as standard output. To append, set append=True in the stderr method.

To redirect stderr to the same place as stdout, use the sh.STDOUT constant. If you also redirect stdout to sh.DEVNULL, you will only receive the standard error.

>>> cmd = sh("cat", "does_not_exist").stderr(sh.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'

To redirect standard error to the hosting program's sys.stderr, use the sh.INHERIT redirect option.

>>> cmd = sh("cat", "does_not_exist").stderr(sh.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'', cancelled=False, encoding='utf-8')

If you redirect stderr, it will no longer be stored in the Result object, and the error_limit option will not apply.

Default Redirections

For regular commands, the default redirections are:

  • Standard input is read from the empty string ("").
  • Standard out is buffered and stored in the Result object (BUFFER).
  • First 1024 bytes of standard error is buffered and stored in the Result object (BUFFER).

However, the default redirections are adjusted when using a pseudo-terminal (pty):

  • Standard input is captured and ignored (CAPTURE).
  • Standard out is buffered and stored in the Result object (BUFFER).
  • Standard error is redirected to standard output (STDOUT).

When you use the Prompt API, the standard input and standard output are automatically redirected to CAPTURE.

Pipelines

You can create a pipeline by combining commands using the | operator. A pipeline feeds the standard out of one process into the next process as standard input. Here is the shellous equivalent to the bash command: ls | grep README

>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'

A pipeline returns a Result if the last command in the pipeline has the .result modifier. To set other options like encoding for a Pipeline, set them on the last command.

>>> pipe = sh("ls") | sh("grep", "README").result
>>> await pipe
Result(exit_code=0, output_bytes=b'README.md\n', error_bytes=b'', cancelled=False, encoding='utf-8')

Error reporting for a pipeline is implemented similar to using the -o pipefail shell option.

Pipelines support the same await/async for/async with operations that work on a single command, including the Prompt API.

>>> [line.strip() async for line in pipe]
['README.md']

Process Substitution (Unix Only)

You can pass a shell command as an argument to another. Here is the shellous equivalent to the bash command: grep README <(ls).

>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'

Use .writable to write to a command instead.

>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | sh.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')

The above example is equivalent to ls | tee >(grep README > buf) > /dev/null.

Timeouts

You can specify a timeout using the timeout option. If the timeout expires, shellous will raise a TimeoutError.

>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
  ...
TimeoutError

Timeouts are just a special case of cancellation. When a command is cancelled, shellous terminates the running process and raises a CancelledError.

>>> t = asyncio.create_task(sh("sleep", 60).coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
  ...
CancelledError

By default, shellous will send a SIGTERM signal to the process to tell it to exit. If the process does not exit within 3 seconds, shellous will send a SIGKILL signal. You can change these defaults with the cancel_signal and cancel_timeout settings. A command is not considered fully cancelled until the process exits.

Pseudo-Terminal Support (Unix Only)

To run a command through a pseudo-terminal, set the pty option to True.

>>> await sh("echo", "in a pty").set(pty=True)
'in a pty\r\n'

Alternatively, you can pass a pty function to configure the tty mode and size.

>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'

Shellous provides three built-in helper functions: shellous.cooked(), shellous.raw() and shellous.cbreak().

Context Objects

You can store shared command settings in an immutable context object (CmdContext). To create a new context object, specify your changes to the default context sh:

>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh_audit = sh.set(audit_callback=auditor)

Now all commands created with sh_audit will log their progress using the audit callback.

>>> await sh_audit("echo", "goodbye")
start echo
stop echo
'goodbye\n'

You can also create a context object that specifies all return values are Result objects.

>>> rsh = sh.result
>>> await rsh("echo", "whatever")
Result(exit_code=0, output_bytes=b'whatever\n', error_bytes=b'', cancelled=False, encoding='utf-8')

Options

Both Command and CmdContext support options to control their runtime behavior. Some of these options (timeout, pty, audit_callback, and exit_codes) have been described above. See the shellous.Options class for more information.

You can retrieve an option from cmd with cmd.options.<option>. For example, use cmd.options.encoding to obtain the encoding:

>>> cmd = sh("echo").set(encoding="latin1")
>>> cmd.options.encoding
'latin1'

Command and CmdContext use the .set() method to specify most options:

Option Description
path Search path to use instead of the PATH environment variable. (Default=None)
env Additional environment variables to pass to the command. (Default={})
inherit_env True if command should inherit the environment variables from the current process. (Default=True)
encoding Text encoding of input/output streams. You can specify an error handling scheme by including it after a space, e.g. "ascii backslashreplace". (Default="utf-8 strict")
exit_codes Set of exit codes that do not raise a ResultError. (Default={0})
timeout Timeout in seconds to wait before cancelling the process. (Default=None)
cancel_timeout Timeout in seconds to wait for a cancelled process to exit before forcefully terminating it. (Default=3s)
cancel_signal The signal sent to a process when it is cancelled. (Default=SIGTERM)
alt_name Alternate name for the process used for debug logging. (Default=None)
pass_fds Additional file descriptors to pass to the process. (Default={})
pass_fds_close True if descriptors in pass_fds should be closed after the child process is launched. (Default=False)
pty Used to allocate a pseudo-terminal (PTY). (Default=False)
close_fds True if process should close all file descriptors when it starts. This setting defaults to False to align with posix_spawn requirements. (Default=False)
audit_callback Provide function to audit stages of process execution. (Default=None)
coerce_arg Provide function to coerce Command arguments to strings when str() is not sufficient. For example, you can provide your own function that converts a dictionary argument to a sequence of strings. (Default=None)
error_limit Maximum number of initial bytes of STDERR to store in Result object. (Default=1024)
read_buffer_limit Maximum number of bytes to read when looking for a separator. (Default=65536)

env

Use the env() method to add to the list of environment variables. The env() method supports keyword parameters. You can call env() more than once and the effect is additive.

>>> cmd = sh("echo").env(ENV1="a", ENV2="b").env(ENV2=3)
>>> cmd.options.env
{'ENV1': 'a', 'ENV2': '3'}

Use the env option with set() when you want to replace all the environment variables.

input, output, error

When you apply a redirection operator to a Command or CmdContext, the redirection targets are also stored in the Options object. To change these, use the .stdin(), .stdout(), or .stderr() methods or the redirection operator |.

Option Description
input The redirection target for standard input.
input_close True if standard input should be closed after the process is launched.
output The redirection target for standard output.
output_append True if standard output should be open for append.
output_close True if standard output should be closed after the process is launched.
error The redirection target for standard error.
error_append True if standard error should be open for append.
error_close True if standard error should be closed after the process is launched.

Type Checking

Shellous fully supports PEP 484 type hints.

Commands

Commands are generic on the return type, either str or Result. You will specify the type of a command object as Command[str] or Command[Result].

Use the result modifier to obtain a Command[Result] from a Command[str].

from shellous import sh, Command, Result

cmd1: Command[str] = sh("echo", "abc")
# When you `await cmd1`, the result is a `str` object.

cmd2: Command[Result] = sh.result("echo", "abc")
# When you `await cmd2`, the result is a `Result` object.

CmdContext

The CmdContext class is also generic on either str or Result.

from shellous import sh, CmdContext, Result

sh1: CmdContext[str] = sh.set(path="/bin:/usr/bin")
# When you use `sh1` to create commands, it produces `Command[str]` object with the given path.

sh2: CmdContext[Result] = sh.result.set(path="/bin:/usr/bin")
# When you use `sh2` to create commands, it produces `Command[Result]` objects with the given path.

Logging

For verbose logging, shellous supports a SHELLOUS_TRACE environment variable. Set the value of SHELLOUS_TRACE to a comma-delimited list of options:

  • detail: Enables detailed logging used to trace the steps of running a command.

  • prompt: Enables logging in the Prompt class when controlling a program using send/expect.

  • all: Enables all logging options.

Shellous uses the built-in Python logging module. After enabling these options, the shellous logger will display log messages at the INFO level.

Without these options enabled, Shellous generates almost no log messages.

 1"""
 2.. include:: ../README.md
 3"""
 4
 5# pylint: disable=cyclic-import
 6# pyright: reportUnusedImport=false
 7
 8__version__ = "0.40.0"
 9
10import sys
11import warnings
12
13from .command import AuditEventInfo, CmdContext, Command, Options
14from .pipeline import Pipeline
15from .prompt import Prompt
16from .pty_util import cbreak, cooked, raw
17from .result import Result, ResultError
18from .runner import PipeRunner, Runner
19
20if sys.version_info[:3] in [(3, 10, 9), (3, 11, 1)]:
21    # Warn about these specific Python releases: 3.10.9 and 3.11.1
22    # These releases have a known race condition.
23    warnings.warn(  # pragma: no cover
24        "Python 3.10.9 and Python 3.11.1 are unreliable with respect to "
25        + "asyncio subprocesses. Consider a newer Python release: 3.10.10+ "
26        + "or 3.11.2+. (https://github.com/python/cpython/issues/100133)",
27        RuntimeWarning,
28    )
29
30
31sh: CmdContext[str] = CmdContext()
32"""`sh` is the default command context (`CmdContext`).
33
34Use `sh` to create commands or new command contexts.
35
36```python
37from shellous import sh
38result = await sh("echo", "hello")
39```
40"""
41
42__all__ = [
43    "sh",
44    "CmdContext",
45    "Command",
46    "Options",
47    "Pipeline",
48    "Prompt",
49    "cbreak",
50    "cooked",
51    "raw",
52    "Result",
53    "ResultError",
54    "Runner",
55    "PipeRunner",
56    "AuditEventInfo",
57]
sh: CmdContext[str] = CmdContext(options=Options(path=None, inherit_env=True, input=<Redirect.DEFAULT: -20>, input_close=False, output=<Redirect.DEFAULT: -20>, output_append=False, output_close=False, error=<Redirect.DEFAULT: -20>, error_append=False, error_close=False, error_limit=1024, encoding='utf-8', _return_result=False, _catch_cancelled_error=False, exit_codes=None, timeout=None, cancel_timeout=3.0, cancel_signal=<Signals.SIGTERM: 15>, alt_name=None, pass_fds=(), pass_fds_close=False, _writable=False, _start_new_session=False, _preexec_fn=None, pty=False, close_fds=True, audit_callback=None, coerce_arg=None, read_buffer_limit=None))

sh is the default command context (CmdContext).

Use sh to create commands or new command contexts.

from shellous import sh
result = await sh("echo", "hello")
@dataclass(frozen=True)
class CmdContext(typing.Generic[~_RT]):
282@dataclass(frozen=True)
283class CmdContext(Generic[_RT]):
284    """Concrete class for an immutable execution context."""
285
286    CAPTURE: ClassVar[Redirect] = Redirect.CAPTURE
287    "Capture and read/write stream manually."
288
289    DEVNULL: ClassVar[Redirect] = Redirect.DEVNULL
290    "Redirect to /dev/null."
291
292    INHERIT: ClassVar[Redirect] = Redirect.INHERIT
293    "Redirect to same place as existing stdin/stderr/stderr."
294
295    STDOUT: ClassVar[Redirect] = Redirect.STDOUT
296    "Redirect stderr to same place as stdout."
297
298    BUFFER: ClassVar[Redirect] = Redirect.BUFFER
299    "Redirect output to a buffer in the Result object. This is the default for stdout/stderr."
300
301    options: Options = field(default_factory=Options)
302    "Default command options."
303
304    def stdin(
305        self,
306        input_: Any,
307        *,
308        close: bool = False,
309    ) -> "CmdContext[_RT]":
310        "Return new context with updated `input` settings."
311        new_options = self.options.set_stdin(input_, close)
312        return CmdContext(new_options)
313
314    def stdout(
315        self,
316        output: Any,
317        *,
318        append: bool = False,
319        close: bool = False,
320    ) -> "CmdContext[_RT]":
321        "Return new context with updated `output` settings."
322        new_options = self.options.set_stdout(output, append, close)
323        return CmdContext(new_options)
324
325    def stderr(
326        self,
327        error: Any,
328        *,
329        append: bool = False,
330        close: bool = False,
331    ) -> "CmdContext[_RT]":
332        "Return new context with updated `error` settings."
333        new_options = self.options.set_stderr(error, append, close)
334        return CmdContext(new_options)
335
336    def env(self, **kwds: Any) -> "CmdContext[_RT]":
337        """Return new context with augmented environment."""
338        new_options = self.options.add_env(kwds)
339        return CmdContext(new_options)
340
341    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
342        self,
343        *,
344        path: Unset[Optional[str]] = _UNSET,
345        env: Unset[dict[str, Any]] = _UNSET,
346        inherit_env: Unset[bool] = _UNSET,
347        encoding: Unset[str] = _UNSET,
348        _return_result: Unset[bool] = _UNSET,
349        _catch_cancelled_error: Unset[bool] = _UNSET,
350        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
351        timeout: Unset[Optional[float]] = _UNSET,
352        cancel_timeout: Unset[float] = _UNSET,
353        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
354        alt_name: Unset[Optional[str]] = _UNSET,
355        pass_fds: Unset[Iterable[int]] = _UNSET,
356        pass_fds_close: Unset[bool] = _UNSET,
357        _writable: Unset[bool] = _UNSET,
358        _start_new_session: Unset[bool] = _UNSET,
359        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
360        pty: Unset[PtyAdapterOrBool] = _UNSET,
361        close_fds: Unset[bool] = _UNSET,
362        audit_callback: Unset[_AuditFnT] = _UNSET,
363        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
364        error_limit: Unset[Optional[int]] = _UNSET,
365    ) -> "CmdContext[_RT]":
366        """Return new context with custom options set.
367
368        See `Command.set` for option reference.
369        """
370        kwargs = locals()
371        del kwargs["self"]
372        if not encoding:
373            raise TypeError("invalid encoding")
374        return CmdContext(self.options.set(kwargs))
375
376    def __call__(self, *args: Any) -> "Command[_RT]":
377        "Construct a new command."
378        return Command(coerce(args, self.options.coerce_arg), self.options)
379
380    @property
381    def result(self) -> "CmdContext[shellous.Result]":
382        "Set `_return_result` and `exit_codes`."
383        return cast(
384            CmdContext[shellous.Result],
385            self.set(
386                _return_result=True,
387                exit_codes=range(-255, 256),
388            ),
389        )
390
391    def find_command(self, name: str) -> Optional[Path]:
392        """Find the command with the given name and return its filesystem path.
393
394        Return None if the command name is not found in the search path.
395
396        Use the `path` variable specified by the context if set. Otherwise, the
397        default behavior is to use the `PATH` environment variable with a
398        fallback to the value of `os.defpath`.
399        """
400        result = self.options.which(name)
401        if not result:
402            return None
403        return Path(result)

Concrete class for an immutable execution context.

CmdContext(options: Options = <factory>)
CAPTURE: ClassVar[shellous.redirect.Redirect] = <Redirect.CAPTURE: -10>

Capture and read/write stream manually.

DEVNULL: ClassVar[shellous.redirect.Redirect] = <Redirect.DEVNULL: -3>

Redirect to /dev/null.

INHERIT: ClassVar[shellous.redirect.Redirect] = <Redirect.INHERIT: -11>

Redirect to same place as existing stdin/stderr/stderr.

STDOUT: ClassVar[shellous.redirect.Redirect] = <Redirect.STDOUT: -2>

Redirect stderr to same place as stdout.

BUFFER: ClassVar[shellous.redirect.Redirect] = <Redirect.BUFFER: -12>

Redirect output to a buffer in the Result object. This is the default for stdout/stderr.

options: Options

Default command options.

def stdin( self, input_: Any, *, close: bool = False) -> CmdContext[~_RT]:
304    def stdin(
305        self,
306        input_: Any,
307        *,
308        close: bool = False,
309    ) -> "CmdContext[_RT]":
310        "Return new context with updated `input` settings."
311        new_options = self.options.set_stdin(input_, close)
312        return CmdContext(new_options)

Return new context with updated input settings.

def stdout( self, output: Any, *, append: bool = False, close: bool = False) -> CmdContext[~_RT]:
314    def stdout(
315        self,
316        output: Any,
317        *,
318        append: bool = False,
319        close: bool = False,
320    ) -> "CmdContext[_RT]":
321        "Return new context with updated `output` settings."
322        new_options = self.options.set_stdout(output, append, close)
323        return CmdContext(new_options)

Return new context with updated output settings.

def stderr( self, error: Any, *, append: bool = False, close: bool = False) -> CmdContext[~_RT]:
325    def stderr(
326        self,
327        error: Any,
328        *,
329        append: bool = False,
330        close: bool = False,
331    ) -> "CmdContext[_RT]":
332        "Return new context with updated `error` settings."
333        new_options = self.options.set_stderr(error, append, close)
334        return CmdContext(new_options)

Return new context with updated error settings.

def env(self, **kwds: Any) -> CmdContext[~_RT]:
336    def env(self, **kwds: Any) -> "CmdContext[_RT]":
337        """Return new context with augmented environment."""
338        new_options = self.options.add_env(kwds)
339        return CmdContext(new_options)

Return new context with augmented environment.

def set( self, *, path: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, env: Union[dict[str, Any], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, inherit_env: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, encoding: Union[str, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _return_result: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _catch_cancelled_error: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, exit_codes: Union[Container[int], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, timeout: Union[float, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_timeout: Union[float, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_signal: Union[signal.Signals, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, alt_name: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds: Union[Iterable[int], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds_close: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _writable: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _start_new_session: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _preexec_fn: Union[Callable[[], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pty: Union[Callable[[int], NoneType], bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, close_fds: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, audit_callback: Union[Callable[[str, AuditEventInfo], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, coerce_arg: Union[Callable[[Any], Any], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, error_limit: Union[int, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>) -> CmdContext[~_RT]:
341    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
342        self,
343        *,
344        path: Unset[Optional[str]] = _UNSET,
345        env: Unset[dict[str, Any]] = _UNSET,
346        inherit_env: Unset[bool] = _UNSET,
347        encoding: Unset[str] = _UNSET,
348        _return_result: Unset[bool] = _UNSET,
349        _catch_cancelled_error: Unset[bool] = _UNSET,
350        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
351        timeout: Unset[Optional[float]] = _UNSET,
352        cancel_timeout: Unset[float] = _UNSET,
353        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
354        alt_name: Unset[Optional[str]] = _UNSET,
355        pass_fds: Unset[Iterable[int]] = _UNSET,
356        pass_fds_close: Unset[bool] = _UNSET,
357        _writable: Unset[bool] = _UNSET,
358        _start_new_session: Unset[bool] = _UNSET,
359        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
360        pty: Unset[PtyAdapterOrBool] = _UNSET,
361        close_fds: Unset[bool] = _UNSET,
362        audit_callback: Unset[_AuditFnT] = _UNSET,
363        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
364        error_limit: Unset[Optional[int]] = _UNSET,
365    ) -> "CmdContext[_RT]":
366        """Return new context with custom options set.
367
368        See `Command.set` for option reference.
369        """
370        kwargs = locals()
371        del kwargs["self"]
372        if not encoding:
373            raise TypeError("invalid encoding")
374        return CmdContext(self.options.set(kwargs))

Return new context with custom options set.

See Command.set for option reference.

result: CmdContext[Result]
380    @property
381    def result(self) -> "CmdContext[shellous.Result]":
382        "Set `_return_result` and `exit_codes`."
383        return cast(
384            CmdContext[shellous.Result],
385            self.set(
386                _return_result=True,
387                exit_codes=range(-255, 256),
388            ),
389        )

Set _return_result and exit_codes.

def find_command(self, name: str) -> Optional[pathlib.Path]:
391    def find_command(self, name: str) -> Optional[Path]:
392        """Find the command with the given name and return its filesystem path.
393
394        Return None if the command name is not found in the search path.
395
396        Use the `path` variable specified by the context if set. Otherwise, the
397        default behavior is to use the `PATH` environment variable with a
398        fallback to the value of `os.defpath`.
399        """
400        result = self.options.which(name)
401        if not result:
402            return None
403        return Path(result)

Find the command with the given name and return its filesystem path.

Return None if the command name is not found in the search path.

Use the path variable specified by the context if set. Otherwise, the default behavior is to use the PATH environment variable with a fallback to the value of os.defpath.

@dataclass(frozen=True)
class Command(typing.Generic[~_RT]):
406@dataclass(frozen=True)
407class Command(Generic[_RT]):
408    """A Command instance is lightweight and immutable object that specifies the
409    arguments and options used to run a program. Commands do not do anything
410    until they are awaited.
411
412    Commands are always created by a CmdContext.
413
414    ```
415    from shellous import sh
416
417    # Create a new command from the context.
418    echo = sh("echo", "hello, world")
419
420    # Run the command.
421    result = await echo
422    ```
423    """
424
425    args: "tuple[Union[str, bytes, os.PathLike[Any], Command[Any], shellous.Pipeline[Any]], ...]"
426    "Command arguments including the program name as first argument."
427
428    options: Options
429    "Command options."
430
431    def __post_init__(self) -> None:
432        "Validate the command."
433        if len(self.args) == 0:
434            raise ValueError("Command must include program name")
435
436    @property
437    def name(self) -> str:
438        """Returns the name of the program being run.
439
440        Names longer than 31 characters are truncated. If `alt_name` option
441        is set, return that instead.
442        """
443        if self.options.alt_name:
444            return self.options.alt_name
445        name = str(self.args[0])
446        if len(name) > 31:
447            return f"...{name[-31:]}"
448        return name
449
450    def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]":
451        "Pass `input` to command's standard input."
452        new_options = self.options.set_stdin(input_, close)
453        return Command(self.args, new_options)
454
455    def stdout(
456        self,
457        output: Any,
458        *,
459        append: bool = False,
460        close: bool = False,
461    ) -> "Command[_RT]":
462        "Redirect standard output to `output`."
463        new_options = self.options.set_stdout(output, append, close)
464        return Command(self.args, new_options)
465
466    def stderr(
467        self,
468        error: Any,
469        *,
470        append: bool = False,
471        close: bool = False,
472    ) -> "Command[_RT]":
473        "Redirect standard error to `error`."
474        new_options = self.options.set_stderr(error, append, close)
475        return Command(self.args, new_options)
476
477    def env(self, **kwds: Any) -> "Command[_RT]":
478        """Return new command with augmented environment.
479
480        The changes to the environment variables made by this method are
481        additive. For example, calling `cmd.env(A=1).env(B=2)` produces a
482        command with the environment set to `{"A": "1", "B": "2"}`.
483
484        To clear the environment, use the `cmd.set(env={})` method.
485        """
486        new_options = self.options.add_env(kwds)
487        return Command(self.args, new_options)
488
489    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
490        self,
491        *,
492        path: Unset[Optional[str]] = _UNSET,
493        env: Unset[dict[str, Any]] = _UNSET,
494        inherit_env: Unset[bool] = _UNSET,
495        encoding: Unset[str] = _UNSET,
496        _return_result: Unset[bool] = _UNSET,
497        _catch_cancelled_error: Unset[bool] = _UNSET,
498        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
499        timeout: Unset[Optional[float]] = _UNSET,
500        cancel_timeout: Unset[float] = _UNSET,
501        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
502        alt_name: Unset[Optional[str]] = _UNSET,
503        pass_fds: Unset[Iterable[int]] = _UNSET,
504        pass_fds_close: Unset[bool] = _UNSET,
505        _writable: Unset[bool] = _UNSET,
506        _start_new_session: Unset[bool] = _UNSET,
507        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
508        pty: Unset[PtyAdapterOrBool] = _UNSET,
509        close_fds: Unset[bool] = _UNSET,
510        audit_callback: Unset[_AuditFnT] = _UNSET,
511        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
512        error_limit: Unset[Optional[int]] = _UNSET,
513        read_buffer_limit: Unset[Optional[int]] = _UNSET,
514    ) -> "Command[_RT]":
515        """Return new command with custom options set.
516
517        **path** (str | None) default=None<br>
518        Search path for locating command executable. By default, `path` is None
519        which causes shellous to rely on the `PATH` environment variable.
520
521        **env** (dict[str, str]) default={}<br>
522        Set the environment variables for the subprocess. If `inherit_env` is
523        True, the subprocess will also inherit the environment variables
524        specified by the parent process.
525
526        Using `set(env=...)` will replace all environment variables using the
527        dictionary argument. You can also use the `env(...)` method to modify
528        the existing environment incrementally.
529
530        **inherit_env** (bool) default=True<br>
531        Subprocess should inherit the parent process environment. If this is
532        False, the subprocess will only have environment variables specified
533        by `Command.env`. If `inherit_env` is True, the parent process
534        environment is augmented/overridden by any variables specified in
535        `Command.env`.
536
537        **encoding** (str) default="utf-8"<br>
538        String encoding to use for subprocess input/output. To specify `errors`,
539        append it after a space. For example, use "utf-8 replace" to specify
540        "utf-8" with errors "replace".
541
542        **_return_result** (bool) default=False<br>
543        When True, return a `Result` object instead of the standard output.
544        Private API -- use the `result` modifier instead.
545
546        **_catch_cancelled_error** (bool) default=False<br>
547        When True, raise a `ResultError` when the command is cancelled.
548        Private API -- used internally by PipeRunner.
549
550        **exit_codes** (set[int] | None) default=None<br>
551        Set of allowed exit codes that will not raise a `ResultError`. By default,
552        `exit_codes` is `None` which indicates that 0 is the only valid exit
553        status. Any other exit status will raise a `ResultError`. In addition to
554        sets of integers, you can use a `range` object, e.g. `range(256)` for
555        any positive exit status.
556
557        **timeout** (float | None) default=None<br>
558        Timeout in seconds to wait before we cancel the process. The timer
559        begins immediately after the process is launched. This differs from
560        using `asyncio.wait_for` which includes the process launch time also.
561        If timeout is None (the default), there is no timeout.
562
563        **cancel_timeout** (float) default=3.0 seconds<br>
564        Timeout in seconds to wait for a process to exit after sending it a
565        `cancel_signal`. If the process does not exit after waiting for
566        `cancel_timeout` seconds, we send a kill signal to the process.
567
568        **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br>
569        Signal sent to a process when it is cancelled. If `cancel_signal` is
570        None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on
571        Windows.
572
573        **alt_name** (str| None) default=None<br>
574        Alternative name of the command displayed in logs. Used to resolve
575        ambiguity when the actual command name is a scripting language.
576
577        **pass_fds** (Iterable[int]) default=()<br>
578        Specify open file descriptors to pass to the subprocess.
579
580        **pass_fds_close** (bool) default=False<br>
581        Close the file descriptors in `pass_fds` immediately in the current
582        process immediately after launching the subprocess.
583
584        **_writable** (bool) default=False<br>
585        Used to indicate process substitution is writing.
586        Private API -- use the `writable` modifier instead.
587
588        **_start_new_session** (bool) default=False<br>
589        Private API -- provided for testing purposes only.
590
591        **_preexec_fn** (Callable() | None) default=None<br>
592        Private API -- provided for testing purposes only.
593
594        **pty** (bool | Callable(int)) default=False<br>
595        If True, use a pseudo-terminal (pty) to control the child process.
596        If `pty` is set to a callable, the function must take one int argument
597        for the child side of the pty. The function is called to set the child
598        pty's termios settings before spawning the subprocess.
599
600        shellous provides three utility functions: `shellous.cooked`,
601        `shellous.raw` and `shellous.cbreak` that can be used as arguments to
602        the `pty` option.
603
604        **close_fds** (bool) default=True<br>
605        Close all unnecessary file descriptors in the child process. This
606        defaults to True to align with the default behavior of the subprocess
607        module.
608
609        **audit_callback** (Callable(phase, info) | None) default=None<br>
610        Specify a function to call as the command execution goes through its
611        lifecycle. `audit_callback` is a function called with two arguments,
612        *phase* and *info*.
613
614        *phase* can be one of three values:
615
616            "start": The process is about to start.
617
618            "stop": The process finally stopped.
619
620            "signal": The process is being sent a signal.
621
622        *info* is a dictionary providing more information for the callback. The
623        following keys are currently defined:
624
625            "runner" (Runner): Reference to the Runner object.
626
627            "failure" (str): When phase is "stop", optional string with the
628            name of the exception from launching the process.
629
630            "signal" (str): When phase is "signal", the signal name/number
631            sent to the process, e.g. "SIGTERM".
632
633        The primary use case for `audit_callback` is measuring how long each
634        command takes to run and exporting this information to a metrics
635        framework like Prometheus.
636
637        **coerce_arg** (Callable(arg) | None) default=None<br>
638        Specify a function to call on each command line argument. This function
639        can specify how to coerce unsupported argument types (e.g. dict) to
640        a sequence of strings. This function should return the original value
641        unchanged if there is no conversion needed.
642
643        **error_limit** (int | None) default=1024<br>
644        Specify the number of bytes to store when redirecting STDERR to BUFFER.
645        After reading up to `error_limit` bytes, shellous will continue to
646        read from stderr, but will not store any additional bytes. Setting
647        `error_limit` only affects the internal BUFFER; it has no effect when
648        using other redirection types.
649
650        **read_buffer_limit** (int | None) default=65536<br>
651        Specify the maximum number of bytes to read from a stdout/stderr stream
652        when looking for a separator. Increase this value if you expect the
653        subprocess to emit extremely long lines. If this value is too small,
654        you may get the error: "Separator is not found".
655
656        """
657        kwargs = locals()
658        del kwargs["self"]
659        if not encoding:
660            raise TypeError("invalid encoding")
661        return Command(self.args, self.options.set(kwargs))
662
663    def _replace_args(self, new_args: Sequence[Any]) -> "Command[_RT]":
664        """Return new command with arguments replaced by `new_args`.
665
666        Arguments are NOT type-checked by the context. Program name must be the
667        exact same object.
668        """
669        assert new_args
670        assert new_args[0] is self.args[0]
671        return Command(tuple(new_args), self.options)
672
673    def coro(
674        self,
675        *,
676        _run_future: Optional[asyncio.Future[Runner]] = None,
677    ) -> Coroutine[Any, Any, _RT]:
678        "Return coroutine object to run awaitable."
679        return cast(
680            Coroutine[Any, Any, _RT],
681            Runner.run_command(self, _run_future=_run_future),
682        )
683
684    @contextlib.asynccontextmanager
685    async def prompt(
686        self,
687        prompt: Union[str, list[str], re.Pattern[str], None] = None,
688        *,
689        timeout: Optional[float] = None,
690        normalize_newlines: bool = False,
691    ) -> AsyncGenerator[Prompt, None]:
692        """Run command using the send/expect API.
693
694        This method should be called using `async with`. It returns a `Prompt`
695        object with send() and expect() methods.
696
697        You can optionally set a default `prompt`. This is used by `expect()`
698        when you don't provide another value.
699
700        Use the `timeout` parameter to set the default timeout for operations.
701
702        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
703        This conversion is done before matching with `expect()`. This option
704        does not affect strings sent with `send()`.
705        """
706        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
707
708        cli = None
709        try:
710            async with Runner(cmd) as run:
711                cli = Prompt(
712                    run,
713                    default_prompt=prompt,
714                    default_timeout=timeout,
715                    normalize_newlines=normalize_newlines,
716                )
717                yield cli
718                cli.close()
719        finally:
720            if cli is not None:
721                cli._finish_()  # pyright: ignore[reportPrivateUsage]
722
723    def __await__(self) -> "Generator[Any, None, _RT]":
724        "Run process and return the standard output."
725        return self.coro().__await__()
726
727    async def __aenter__(self) -> Runner:
728        "Enter the async context manager."
729        return await context_aenter(self, Runner(self))
730
731    async def __aexit__(
732        self,
733        exc_type: Optional[type[BaseException]],
734        exc_value: Optional[BaseException],
735        exc_tb: Optional[TracebackType],
736    ) -> Optional[bool]:
737        "Exit the async context manager."
738        return await context_aexit(self, exc_type, exc_value, exc_tb)
739
740    def __aiter__(self) -> AsyncIterator[str]:
741        "Return async iterator to iterate over output lines."
742        return aiter_preflight(self)._readlines()
743
744    async def _readlines(self) -> AsyncIterator[str]:
745        "Async generator to iterate over lines."
746        async with Runner(self) as run:
747            async for line in run:
748                yield line
749
750    def __call__(self, *args: Any) -> "Command[_RT]":
751        "Apply more arguments to the end of the command."
752        if not args:
753            return self
754        new_args = self.args + coerce(args, self.options.coerce_arg)
755        return Command(new_args, self.options)
756
757    def __str__(self) -> str:
758        """Return string representation for command.
759
760        Display the full name of the command only. Don't include arguments or
761        environment variables.
762        """
763        return str(self.args[0])
764
765    @overload
766    def __or__(self, rhs: StdoutType) -> "Command[_RT]": ...  # pragma: no cover
767
768    @overload
769    def __or__(
770        self, rhs: "Command[str]"
771    ) -> "shellous.Pipeline[str]": ...  # pragma: no cover
772
773    @overload
774    def __or__(
775        self,
776        rhs: "Command[shellous.Result]",
777    ) -> "shellous.Pipeline[shellous.Result]": ...  # pragma: no cover
778
779    def __or__(self, rhs: Any) -> Any:
780        "Bitwise or operator is used to build pipelines."
781        if isinstance(rhs, STDOUT_TYPES):
782            return self.stdout(rhs)
783        return shellous.Pipeline.create(self) | rhs
784
785    def __ror__(self, lhs: StdinType) -> "Command[_RT]":
786        "Bitwise or operator is used to build pipelines."
787        if isinstance(lhs, STDIN_TYPES):  # pyright: ignore[reportUnnecessaryIsInstance]
788            return self.stdin(lhs)
789        return NotImplemented
790
791    def __rshift__(self, rhs: StdoutType) -> "Command[_RT]":
792        "Right shift operator is used to build pipelines."
793        if isinstance(
794            rhs, STDOUT_TYPES
795        ):  # pyright: ignore[reportUnnecessaryIsInstance]
796            return self.stdout(rhs, append=True)
797        return NotImplemented
798
799    @property
800    def writable(self) -> "Command[_RT]":
801        "Set `writable` to True."
802        return self.set(_writable=True)
803
804    @property
805    def result(self) -> "Command[shellous.Result]":
806        "Set `_return_result` and `exit_codes`."
807        return cast(
808            Command[shellous.Result],
809            self.set(_return_result=True, exit_codes=range(-255, 256)),
810        )

A Command instance is lightweight and immutable object that specifies the arguments and options used to run a program. Commands do not do anything until they are awaited.

Commands are always created by a CmdContext.

from shellous import sh

# Create a new command from the context.
echo = sh("echo", "hello, world")

# Run the command.
result = await echo
Command( args: tuple[typing.Union[str, bytes, os.PathLike[typing.Any], Command[typing.Any], Pipeline[typing.Any]], ...], options: Options)
args: tuple[typing.Union[str, bytes, os.PathLike[typing.Any], Command[typing.Any], Pipeline[typing.Any]], ...]

Command arguments including the program name as first argument.

options: Options

Command options.

name: str
436    @property
437    def name(self) -> str:
438        """Returns the name of the program being run.
439
440        Names longer than 31 characters are truncated. If `alt_name` option
441        is set, return that instead.
442        """
443        if self.options.alt_name:
444            return self.options.alt_name
445        name = str(self.args[0])
446        if len(name) > 31:
447            return f"...{name[-31:]}"
448        return name

Returns the name of the program being run.

Names longer than 31 characters are truncated. If alt_name option is set, return that instead.

def stdin( self, input_: Any, *, close: bool = False) -> Command[~_RT]:
450    def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]":
451        "Pass `input` to command's standard input."
452        new_options = self.options.set_stdin(input_, close)
453        return Command(self.args, new_options)

Pass input to command's standard input.

def stdout( self, output: Any, *, append: bool = False, close: bool = False) -> Command[~_RT]:
455    def stdout(
456        self,
457        output: Any,
458        *,
459        append: bool = False,
460        close: bool = False,
461    ) -> "Command[_RT]":
462        "Redirect standard output to `output`."
463        new_options = self.options.set_stdout(output, append, close)
464        return Command(self.args, new_options)

Redirect standard output to output.

def stderr( self, error: Any, *, append: bool = False, close: bool = False) -> Command[~_RT]:
466    def stderr(
467        self,
468        error: Any,
469        *,
470        append: bool = False,
471        close: bool = False,
472    ) -> "Command[_RT]":
473        "Redirect standard error to `error`."
474        new_options = self.options.set_stderr(error, append, close)
475        return Command(self.args, new_options)

Redirect standard error to error.

def env(self, **kwds: Any) -> Command[~_RT]:
477    def env(self, **kwds: Any) -> "Command[_RT]":
478        """Return new command with augmented environment.
479
480        The changes to the environment variables made by this method are
481        additive. For example, calling `cmd.env(A=1).env(B=2)` produces a
482        command with the environment set to `{"A": "1", "B": "2"}`.
483
484        To clear the environment, use the `cmd.set(env={})` method.
485        """
486        new_options = self.options.add_env(kwds)
487        return Command(self.args, new_options)

Return new command with augmented environment.

The changes to the environment variables made by this method are additive. For example, calling cmd.env(A=1).env(B=2) produces a command with the environment set to {"A": "1", "B": "2"}.

To clear the environment, use the cmd.set(env={}) method.

def set( self, *, path: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, env: Union[dict[str, Any], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, inherit_env: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, encoding: Union[str, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _return_result: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _catch_cancelled_error: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, exit_codes: Union[Container[int], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, timeout: Union[float, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_timeout: Union[float, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_signal: Union[signal.Signals, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, alt_name: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds: Union[Iterable[int], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds_close: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _writable: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _start_new_session: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _preexec_fn: Union[Callable[[], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pty: Union[Callable[[int], NoneType], bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, close_fds: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, audit_callback: Union[Callable[[str, AuditEventInfo], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, coerce_arg: Union[Callable[[Any], Any], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, error_limit: Union[int, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, read_buffer_limit: Union[int, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>) -> Command[~_RT]:
489    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
490        self,
491        *,
492        path: Unset[Optional[str]] = _UNSET,
493        env: Unset[dict[str, Any]] = _UNSET,
494        inherit_env: Unset[bool] = _UNSET,
495        encoding: Unset[str] = _UNSET,
496        _return_result: Unset[bool] = _UNSET,
497        _catch_cancelled_error: Unset[bool] = _UNSET,
498        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
499        timeout: Unset[Optional[float]] = _UNSET,
500        cancel_timeout: Unset[float] = _UNSET,
501        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
502        alt_name: Unset[Optional[str]] = _UNSET,
503        pass_fds: Unset[Iterable[int]] = _UNSET,
504        pass_fds_close: Unset[bool] = _UNSET,
505        _writable: Unset[bool] = _UNSET,
506        _start_new_session: Unset[bool] = _UNSET,
507        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
508        pty: Unset[PtyAdapterOrBool] = _UNSET,
509        close_fds: Unset[bool] = _UNSET,
510        audit_callback: Unset[_AuditFnT] = _UNSET,
511        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
512        error_limit: Unset[Optional[int]] = _UNSET,
513        read_buffer_limit: Unset[Optional[int]] = _UNSET,
514    ) -> "Command[_RT]":
515        """Return new command with custom options set.
516
517        **path** (str | None) default=None<br>
518        Search path for locating command executable. By default, `path` is None
519        which causes shellous to rely on the `PATH` environment variable.
520
521        **env** (dict[str, str]) default={}<br>
522        Set the environment variables for the subprocess. If `inherit_env` is
523        True, the subprocess will also inherit the environment variables
524        specified by the parent process.
525
526        Using `set(env=...)` will replace all environment variables using the
527        dictionary argument. You can also use the `env(...)` method to modify
528        the existing environment incrementally.
529
530        **inherit_env** (bool) default=True<br>
531        Subprocess should inherit the parent process environment. If this is
532        False, the subprocess will only have environment variables specified
533        by `Command.env`. If `inherit_env` is True, the parent process
534        environment is augmented/overridden by any variables specified in
535        `Command.env`.
536
537        **encoding** (str) default="utf-8"<br>
538        String encoding to use for subprocess input/output. To specify `errors`,
539        append it after a space. For example, use "utf-8 replace" to specify
540        "utf-8" with errors "replace".
541
542        **_return_result** (bool) default=False<br>
543        When True, return a `Result` object instead of the standard output.
544        Private API -- use the `result` modifier instead.
545
546        **_catch_cancelled_error** (bool) default=False<br>
547        When True, raise a `ResultError` when the command is cancelled.
548        Private API -- used internally by PipeRunner.
549
550        **exit_codes** (set[int] | None) default=None<br>
551        Set of allowed exit codes that will not raise a `ResultError`. By default,
552        `exit_codes` is `None` which indicates that 0 is the only valid exit
553        status. Any other exit status will raise a `ResultError`. In addition to
554        sets of integers, you can use a `range` object, e.g. `range(256)` for
555        any positive exit status.
556
557        **timeout** (float | None) default=None<br>
558        Timeout in seconds to wait before we cancel the process. The timer
559        begins immediately after the process is launched. This differs from
560        using `asyncio.wait_for` which includes the process launch time also.
561        If timeout is None (the default), there is no timeout.
562
563        **cancel_timeout** (float) default=3.0 seconds<br>
564        Timeout in seconds to wait for a process to exit after sending it a
565        `cancel_signal`. If the process does not exit after waiting for
566        `cancel_timeout` seconds, we send a kill signal to the process.
567
568        **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br>
569        Signal sent to a process when it is cancelled. If `cancel_signal` is
570        None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on
571        Windows.
572
573        **alt_name** (str| None) default=None<br>
574        Alternative name of the command displayed in logs. Used to resolve
575        ambiguity when the actual command name is a scripting language.
576
577        **pass_fds** (Iterable[int]) default=()<br>
578        Specify open file descriptors to pass to the subprocess.
579
580        **pass_fds_close** (bool) default=False<br>
581        Close the file descriptors in `pass_fds` immediately in the current
582        process immediately after launching the subprocess.
583
584        **_writable** (bool) default=False<br>
585        Used to indicate process substitution is writing.
586        Private API -- use the `writable` modifier instead.
587
588        **_start_new_session** (bool) default=False<br>
589        Private API -- provided for testing purposes only.
590
591        **_preexec_fn** (Callable() | None) default=None<br>
592        Private API -- provided for testing purposes only.
593
594        **pty** (bool | Callable(int)) default=False<br>
595        If True, use a pseudo-terminal (pty) to control the child process.
596        If `pty` is set to a callable, the function must take one int argument
597        for the child side of the pty. The function is called to set the child
598        pty's termios settings before spawning the subprocess.
599
600        shellous provides three utility functions: `shellous.cooked`,
601        `shellous.raw` and `shellous.cbreak` that can be used as arguments to
602        the `pty` option.
603
604        **close_fds** (bool) default=True<br>
605        Close all unnecessary file descriptors in the child process. This
606        defaults to True to align with the default behavior of the subprocess
607        module.
608
609        **audit_callback** (Callable(phase, info) | None) default=None<br>
610        Specify a function to call as the command execution goes through its
611        lifecycle. `audit_callback` is a function called with two arguments,
612        *phase* and *info*.
613
614        *phase* can be one of three values:
615
616            "start": The process is about to start.
617
618            "stop": The process finally stopped.
619
620            "signal": The process is being sent a signal.
621
622        *info* is a dictionary providing more information for the callback. The
623        following keys are currently defined:
624
625            "runner" (Runner): Reference to the Runner object.
626
627            "failure" (str): When phase is "stop", optional string with the
628            name of the exception from launching the process.
629
630            "signal" (str): When phase is "signal", the signal name/number
631            sent to the process, e.g. "SIGTERM".
632
633        The primary use case for `audit_callback` is measuring how long each
634        command takes to run and exporting this information to a metrics
635        framework like Prometheus.
636
637        **coerce_arg** (Callable(arg) | None) default=None<br>
638        Specify a function to call on each command line argument. This function
639        can specify how to coerce unsupported argument types (e.g. dict) to
640        a sequence of strings. This function should return the original value
641        unchanged if there is no conversion needed.
642
643        **error_limit** (int | None) default=1024<br>
644        Specify the number of bytes to store when redirecting STDERR to BUFFER.
645        After reading up to `error_limit` bytes, shellous will continue to
646        read from stderr, but will not store any additional bytes. Setting
647        `error_limit` only affects the internal BUFFER; it has no effect when
648        using other redirection types.
649
650        **read_buffer_limit** (int | None) default=65536<br>
651        Specify the maximum number of bytes to read from a stdout/stderr stream
652        when looking for a separator. Increase this value if you expect the
653        subprocess to emit extremely long lines. If this value is too small,
654        you may get the error: "Separator is not found".
655
656        """
657        kwargs = locals()
658        del kwargs["self"]
659        if not encoding:
660            raise TypeError("invalid encoding")
661        return Command(self.args, self.options.set(kwargs))

Return new command with custom options set.

path (str | None) default=None
Search path for locating command executable. By default, path is None which causes shellous to rely on the PATH environment variable.

env (dict[str, str]) default={}
Set the environment variables for the subprocess. If inherit_env is True, the subprocess will also inherit the environment variables specified by the parent process.

Using set(env=...) will replace all environment variables using the dictionary argument. You can also use the env(...) method to modify the existing environment incrementally.

inherit_env (bool) default=True
Subprocess should inherit the parent process environment. If this is False, the subprocess will only have environment variables specified by Command.env. If inherit_env is True, the parent process environment is augmented/overridden by any variables specified in Command.env.

encoding (str) default="utf-8"
String encoding to use for subprocess input/output. To specify errors, append it after a space. For example, use "utf-8 replace" to specify "utf-8" with errors "replace".

_return_result (bool) default=False
When True, return a Result object instead of the standard output. Private API -- use the result modifier instead.

_catch_cancelled_error (bool) default=False
When True, raise a ResultError when the command is cancelled. Private API -- used internally by PipeRunner.

exit_codes (set[int] | None) default=None
Set of allowed exit codes that will not raise a ResultError. By default, exit_codes is None which indicates that 0 is the only valid exit status. Any other exit status will raise a ResultError. In addition to sets of integers, you can use a range object, e.g. range(256) for any positive exit status.

timeout (float | None) default=None
Timeout in seconds to wait before we cancel the process. The timer begins immediately after the process is launched. This differs from using asyncio.wait_for which includes the process launch time also. If timeout is None (the default), there is no timeout.

cancel_timeout (float) default=3.0 seconds
Timeout in seconds to wait for a process to exit after sending it a cancel_signal. If the process does not exit after waiting for cancel_timeout seconds, we send a kill signal to the process.

cancel_signal (signals.Signal | None) default=signal.SIGTERM
Signal sent to a process when it is cancelled. If cancel_signal is None, send a SIGKILL on Unix and SIGTERM (TerminateProcess) on Windows.

alt_name (str| None) default=None
Alternative name of the command displayed in logs. Used to resolve ambiguity when the actual command name is a scripting language.

pass_fds (Iterable[int]) default=()
Specify open file descriptors to pass to the subprocess.

pass_fds_close (bool) default=False
Close the file descriptors in pass_fds immediately in the current process immediately after launching the subprocess.

_writable (bool) default=False
Used to indicate process substitution is writing. Private API -- use the writable modifier instead.

_start_new_session (bool) default=False
Private API -- provided for testing purposes only.

_preexec_fn (Callable() | None) default=None
Private API -- provided for testing purposes only.

pty (bool | Callable(int)) default=False
If True, use a pseudo-terminal (pty) to control the child process. If pty is set to a callable, the function must take one int argument for the child side of the pty. The function is called to set the child pty's termios settings before spawning the subprocess.

shellous provides three utility functions: shellous.cooked, shellous.raw and shellous.cbreak that can be used as arguments to the pty option.

close_fds (bool) default=True
Close all unnecessary file descriptors in the child process. This defaults to True to align with the default behavior of the subprocess module.

audit_callback (Callable(phase, info) | None) default=None
Specify a function to call as the command execution goes through its lifecycle. audit_callback is a function called with two arguments, phase and info.

phase can be one of three values:

"start": The process is about to start.

"stop": The process finally stopped.

"signal": The process is being sent a signal.

info is a dictionary providing more information for the callback. The following keys are currently defined:

"runner" (Runner): Reference to the Runner object.

"failure" (str): When phase is "stop", optional string with the
name of the exception from launching the process.

"signal" (str): When phase is "signal", the signal name/number
sent to the process, e.g. "SIGTERM".

The primary use case for audit_callback is measuring how long each command takes to run and exporting this information to a metrics framework like Prometheus.

coerce_arg (Callable(arg) | None) default=None
Specify a function to call on each command line argument. This function can specify how to coerce unsupported argument types (e.g. dict) to a sequence of strings. This function should return the original value unchanged if there is no conversion needed.

error_limit (int | None) default=1024
Specify the number of bytes to store when redirecting STDERR to BUFFER. After reading up to error_limit bytes, shellous will continue to read from stderr, but will not store any additional bytes. Setting error_limit only affects the internal BUFFER; it has no effect when using other redirection types.

read_buffer_limit (int | None) default=65536
Specify the maximum number of bytes to read from a stdout/stderr stream when looking for a separator. Increase this value if you expect the subprocess to emit extremely long lines. If this value is too small, you may get the error: "Separator is not found".

def coro( self, *, _run_future: Optional[_asyncio.Future[Runner]] = None) -> Coroutine[Any, Any, ~_RT]:
673    def coro(
674        self,
675        *,
676        _run_future: Optional[asyncio.Future[Runner]] = None,
677    ) -> Coroutine[Any, Any, _RT]:
678        "Return coroutine object to run awaitable."
679        return cast(
680            Coroutine[Any, Any, _RT],
681            Runner.run_command(self, _run_future=_run_future),
682        )

Return coroutine object to run awaitable.

@contextlib.asynccontextmanager
async def prompt( self, prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, *, timeout: Optional[float] = None, normalize_newlines: bool = False) -> AsyncGenerator[Prompt, NoneType]:
684    @contextlib.asynccontextmanager
685    async def prompt(
686        self,
687        prompt: Union[str, list[str], re.Pattern[str], None] = None,
688        *,
689        timeout: Optional[float] = None,
690        normalize_newlines: bool = False,
691    ) -> AsyncGenerator[Prompt, None]:
692        """Run command using the send/expect API.
693
694        This method should be called using `async with`. It returns a `Prompt`
695        object with send() and expect() methods.
696
697        You can optionally set a default `prompt`. This is used by `expect()`
698        when you don't provide another value.
699
700        Use the `timeout` parameter to set the default timeout for operations.
701
702        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
703        This conversion is done before matching with `expect()`. This option
704        does not affect strings sent with `send()`.
705        """
706        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
707
708        cli = None
709        try:
710            async with Runner(cmd) as run:
711                cli = Prompt(
712                    run,
713                    default_prompt=prompt,
714                    default_timeout=timeout,
715                    normalize_newlines=normalize_newlines,
716                )
717                yield cli
718                cli.close()
719        finally:
720            if cli is not None:
721                cli._finish_()  # pyright: ignore[reportPrivateUsage]

Run command using the send/expect API.

This method should be called using async with. It returns a Prompt object with send() and expect() methods.

You can optionally set a default prompt. This is used by expect() when you don't provide another value.

Use the timeout parameter to set the default timeout for operations.

Set normalize_newlines to True to convert incoming CR and CR-LF to LF. This conversion is done before matching with expect(). This option does not affect strings sent with send().

def __await__(self) -> Generator[Any, NoneType, ~_RT]:
723    def __await__(self) -> "Generator[Any, None, _RT]":
724        "Run process and return the standard output."
725        return self.coro().__await__()

Run process and return the standard output.

async def __aenter__(self) -> Runner:
727    async def __aenter__(self) -> Runner:
728        "Enter the async context manager."
729        return await context_aenter(self, Runner(self))

Enter the async context manager.

def __aiter__(self) -> AsyncIterator[str]:
740    def __aiter__(self) -> AsyncIterator[str]:
741        "Return async iterator to iterate over output lines."
742        return aiter_preflight(self)._readlines()

Return async iterator to iterate over output lines.

writable: Command[~_RT]
799    @property
800    def writable(self) -> "Command[_RT]":
801        "Set `writable` to True."
802        return self.set(_writable=True)

Set writable to True.

result: Command[Result]
804    @property
805    def result(self) -> "Command[shellous.Result]":
806        "Set `_return_result` and `exit_codes`."
807        return cast(
808            Command[shellous.Result],
809            self.set(_return_result=True, exit_codes=range(-255, 256)),
810        )

Set _return_result and exit_codes.

@dataclass(frozen=True)
class Options:
 95@dataclass(frozen=True)
 96class Options:  # pylint: disable=too-many-instance-attributes
 97    "Concrete class for per-command options."
 98
 99    path: Optional[str] = None
100    "Optional search path to use instead of PATH environment variable."
101
102    env: Optional[EnvironmentDict] = field(default=None, repr=False)
103    "Additional environment variables for command."
104
105    inherit_env: bool = True
106    "True if subprocess should inherit the current environment variables."
107
108    input: _RedirectT = Redirect.DEFAULT
109    "Input object to bind to stdin."
110
111    input_close: bool = False
112    "True if input object should be closed after subprocess launch."
113
114    output: _RedirectT = Redirect.DEFAULT
115    "Output object to bind to stdout."
116
117    output_append: bool = False
118    "True if output object should be opened in append mode."
119
120    output_close: bool = False
121    "True if output object should be closed after subprocess launch."
122
123    error: _RedirectT = Redirect.DEFAULT
124    "Error object to bind to stderr."
125
126    error_append: bool = False
127    "True if error object should be opened in append mode."
128
129    error_close: bool = False
130    "True if error object should be closed after subprocess launch."
131
132    error_limit: Optional[int] = DEFAULT_ERROR_LIMIT
133    "Bytes of stderr to buffer in memory (`sh.BUFFER`). None means unlimited."
134
135    encoding: str = "utf-8"
136    "Specifies encoding of input/output."
137
138    _return_result: bool = False
139    "True if we should return `Result` object instead of the output text/bytes."
140
141    _catch_cancelled_error: bool = False
142    "True if we should raise `ResultError` after clean up from cancelled task."
143
144    exit_codes: Optional[Container[int]] = None
145    "Set of exit codes that do not raise a `ResultError`. None means {0}."
146
147    timeout: Optional[float] = None
148    "Timeout in seconds that we wait before cancelling the process."
149
150    cancel_timeout: float = 3.0
151    "Timeout in seconds that we wait for a cancelled process to terminate."
152
153    cancel_signal: Optional[signal.Signals] = signal.SIGTERM
154    "The signal sent to terminate a cancelled process."
155
156    alt_name: Optional[str] = None
157    "Alternate name for the command to use when logging."
158
159    pass_fds: Iterable[int] = ()
160    "File descriptors to pass to the command."
161
162    pass_fds_close: bool = False
163    "True if pass_fds should be closed after subprocess launch."
164
165    _writable: bool = False
166    "True if using process substitution in write mode."
167
168    _start_new_session: bool = False
169    "True if child process should start a new session with `setsid` call."
170
171    _preexec_fn: _PreexecFnT = None
172    "Function to call in child process after fork from parent."
173
174    pty: PtyAdapterOrBool = False
175    "True if child process should be controlled using a pseudo-terminal (pty)."
176
177    close_fds: bool = True
178    "True if child process should close all file descriptors."
179
180    audit_callback: _AuditFnT = None
181    "Function called to audit stages of process execution."
182
183    coerce_arg: _CoerceArgFnT = None
184    "Function called to coerce top level arguments."
185
186    read_buffer_limit: Optional[int] = None
187    "Maximum number of bytes to read when looking for a separator."
188
189    def runtime_env(self) -> Optional[dict[str, str]]:
190        "@private Return our `env` merged with the global environment."
191        if self.inherit_env:
192            if not self.env:
193                return None
194            return os.environ | self.env
195
196        if self.env:
197            return dict(self.env)  # make copy of dict
198        return {}
199
200    def set_stdin(self, input_: Any, close: bool) -> "Options":
201        "@private Return new options with `input` configured."
202        if input_ is None:
203            raise TypeError("invalid stdin")
204
205        if input_ == Redirect.STDOUT:
206            raise ValueError("STDOUT is only supported by stderr")
207
208        return dataclasses.replace(
209            self,
210            input=input_,
211            input_close=close,
212        )
213
214    def set_stdout(self, output: Any, append: bool, close: bool) -> "Options":
215        "@private Return new options with `output` configured."
216        if output is None:
217            raise TypeError("invalid stdout")
218
219        if output == Redirect.STDOUT:
220            raise ValueError("STDOUT is only supported by stderr")
221
222        return dataclasses.replace(
223            self,
224            output=output,
225            output_append=append,
226            output_close=close,
227        )
228
229    def set_stderr(self, error: Any, append: bool, close: bool) -> "Options":
230        "@private Return new options with `error` configured."
231        if error is None:
232            raise TypeError("invalid stderr")
233
234        return dataclasses.replace(
235            self,
236            error=error,
237            error_append=append,
238            error_close=close,
239        )
240
241    def add_env(self, updates: dict[str, Any]) -> "Options":
242        "@private Return new options with augmented environment."
243        new_env = EnvironmentDict(self.env, updates)
244        return dataclasses.replace(self, env=new_env)
245
246    def set(self, kwds: dict[str, Any]) -> "Options":
247        """@private Return new options with given properties updated.
248
249        See `Command.set` for option reference.
250        """
251        kwds = {key: value for key, value in kwds.items() if value is not _UNSET}
252        if "env" in kwds:
253            # The "env" property is stored as an `EnvironmentDict`.
254            new_env = kwds["env"]
255            if new_env:
256                kwds["env"] = EnvironmentDict(None, new_env)
257            else:
258                kwds["env"] = None
259        return dataclasses.replace(self, **kwds)
260
261    @overload
262    def which(self, name: bytes) -> Optional[bytes]:
263        "@private Find the command with the given name and return its path."
264
265    @overload
266    def which(
267        self, name: Union[str, os.PathLike[Any]]
268    ) -> Optional[Union[str, os.PathLike[Any]]]:
269        "@private Find the command with the given name and return its path."
270
271    def which(
272        self, name: Union[str, bytes, os.PathLike[Any]]
273    ) -> Optional[Union[str, bytes, os.PathLike[Any]]]:
274        "@private Find the command with the given name and return its path."
275        return shutil.which(name, path=self.path)

Concrete class for per-command options.

Options( path: Optional[str] = None, env: Optional[shellous.util.EnvironmentDict] = None, inherit_env: bool = True, input: Any = <Redirect.DEFAULT: -20>, input_close: bool = False, output: Any = <Redirect.DEFAULT: -20>, output_append: bool = False, output_close: bool = False, error: Any = <Redirect.DEFAULT: -20>, error_append: bool = False, error_close: bool = False, error_limit: Optional[int] = 1024, encoding: str = 'utf-8', _return_result: bool = False, _catch_cancelled_error: bool = False, exit_codes: Optional[Container[int]] = None, timeout: Optional[float] = None, cancel_timeout: float = 3.0, cancel_signal: Optional[signal.Signals] = <Signals.SIGTERM: 15>, alt_name: Optional[str] = None, pass_fds: Iterable[int] = (), pass_fds_close: bool = False, _writable: bool = False, _start_new_session: bool = False, _preexec_fn: Optional[Callable[[], NoneType]] = None, pty: Union[Callable[[int], NoneType], bool] = False, close_fds: bool = True, audit_callback: Optional[Callable[[str, AuditEventInfo], NoneType]] = None, coerce_arg: Optional[Callable[[Any], Any]] = None, read_buffer_limit: Optional[int] = None)
path: Optional[str] = None

Optional search path to use instead of PATH environment variable.

env: Optional[shellous.util.EnvironmentDict] = None

Additional environment variables for command.

inherit_env: bool = True

True if subprocess should inherit the current environment variables.

input: Any = <Redirect.DEFAULT: -20>

Input object to bind to stdin.

input_close: bool = False

True if input object should be closed after subprocess launch.

output: Any = <Redirect.DEFAULT: -20>

Output object to bind to stdout.

output_append: bool = False

True if output object should be opened in append mode.

output_close: bool = False

True if output object should be closed after subprocess launch.

error: Any = <Redirect.DEFAULT: -20>

Error object to bind to stderr.

error_append: bool = False

True if error object should be opened in append mode.

error_close: bool = False

True if error object should be closed after subprocess launch.

error_limit: Optional[int] = 1024

Bytes of stderr to buffer in memory (sh.BUFFER). None means unlimited.

encoding: str = 'utf-8'

Specifies encoding of input/output.

exit_codes: Optional[Container[int]] = None

Set of exit codes that do not raise a ResultError. None means {0}.

timeout: Optional[float] = None

Timeout in seconds that we wait before cancelling the process.

cancel_timeout: float = 3.0

Timeout in seconds that we wait for a cancelled process to terminate.

cancel_signal: Optional[signal.Signals] = <Signals.SIGTERM: 15>

The signal sent to terminate a cancelled process.

alt_name: Optional[str] = None

Alternate name for the command to use when logging.

pass_fds: Iterable[int] = ()

File descriptors to pass to the command.

pass_fds_close: bool = False

True if pass_fds should be closed after subprocess launch.

pty: Union[Callable[[int], NoneType], bool] = False

True if child process should be controlled using a pseudo-terminal (pty).

close_fds: bool = True

True if child process should close all file descriptors.

audit_callback: Optional[Callable[[str, AuditEventInfo], NoneType]] = None

Function called to audit stages of process execution.

coerce_arg: Optional[Callable[[Any], Any]] = None

Function called to coerce top level arguments.

read_buffer_limit: Optional[int] = None

Maximum number of bytes to read when looking for a separator.

@dataclass(frozen=True)
class Pipeline(typing.Generic[~_RT]):
 41@dataclass(frozen=True)
 42class Pipeline(Generic[_RT]):
 43    "A Pipeline is a sequence of commands."
 44
 45    commands: tuple[shellous.Command[_RT], ...] = ()
 46
 47    @staticmethod
 48    def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]":
 49        "Create a new Pipeline."
 50        return Pipeline(commands)
 51
 52    def __post_init__(self) -> None:
 53        "Validate the pipeline."
 54        if len(self.commands) == 0:
 55            raise ValueError("Pipeline must include at least one command")
 56
 57    @property
 58    def name(self) -> str:
 59        "Return the name of the pipeline."
 60        return "|".join(cmd.name for cmd in self.commands)
 61
 62    @property
 63    def options(self) -> shellous.Options:
 64        "Return the last command's options."
 65        return self.commands[-1].options
 66
 67    def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]":
 68        "Set stdin on the first command of the pipeline."
 69        new_first = self.commands[0].stdin(input_, close=close)
 70        new_commands = (new_first, *self.commands[1:])
 71        return dataclasses.replace(self, commands=new_commands)
 72
 73    def stdout(
 74        self,
 75        output: Any,
 76        *,
 77        append: bool = False,
 78        close: bool = False,
 79    ) -> "Pipeline[_RT]":
 80        "Set stdout on the last command of the pipeline."
 81        new_last = self.commands[-1].stdout(output, append=append, close=close)
 82        new_commands = (*self.commands[0:-1], new_last)
 83        return dataclasses.replace(self, commands=new_commands)
 84
 85    def stderr(
 86        self,
 87        error: Any,
 88        *,
 89        append: bool = False,
 90        close: bool = False,
 91    ) -> "Pipeline[_RT]":
 92        "Set stderr on the last command of the pipeline."
 93        new_last = self.commands[-1].stderr(error, append=append, close=close)
 94        new_commands = (*self.commands[0:-1], new_last)
 95        return dataclasses.replace(self, commands=new_commands)
 96
 97    def _set(self, **kwds: Any):
 98        "Set options on last command of the pipeline."
 99        new_last = self.commands[-1].set(**kwds)
100        new_commands = (*self.commands[0:-1], new_last)
101        return dataclasses.replace(self, commands=new_commands)
102
103    def coro(self) -> Coroutine[Any, Any, _RT]:
104        "Return coroutine object for pipeline."
105        return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self))
106
107    @contextlib.asynccontextmanager
108    async def prompt(
109        self,
110        prompt: Union[str, list[str], re.Pattern[str], None] = None,
111        *,
112        timeout: Optional[float] = None,
113        normalize_newlines: bool = False,
114    ) -> AsyncGenerator[Prompt, None]:
115        """Run pipeline using the send/expect API.
116
117        This method should be called using `async with`. It returns a `Prompt`
118        object with send() and expect() methods.
119
120        You can optionally set a default `prompt`. This is used by `expect()`
121        when you don't provide another value.
122
123        Use the `timeout` parameter to set the default timeout for operations.
124
125        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
126        This conversion is done before matching with `expect()`. This option
127        does not affect strings sent with `send()`.
128        """
129        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
130
131        cli = None
132        try:
133            async with PipeRunner(cmd, capturing=True) as run:
134                cli = Prompt(
135                    run,
136                    default_prompt=prompt,
137                    default_timeout=timeout,
138                    normalize_newlines=normalize_newlines,
139                )
140                yield cli
141                cli.close()
142        finally:
143            if cli is not None:
144                cli._finish_()  # pyright: ignore[reportPrivateUsage]
145
146    def _add(self, item: Union["shellous.Command[Any]", "Pipeline[Any]"]):
147        if isinstance(item, shellous.Command):
148            return dataclasses.replace(self, commands=(*self.commands, item))
149        return dataclasses.replace(
150            self,
151            commands=self.commands + item.commands,
152        )
153
154    def __len__(self) -> int:
155        "Return number of commands in pipe."
156        return len(self.commands)
157
158    def __getitem__(self, key: int) -> shellous.Command[Any]:
159        "Return specified command by index."
160        return self.commands[key]
161
162    def __call__(self, *args: Any) -> "Pipeline[_RT]":
163        if args:
164            raise TypeError("Calling pipeline with 1 or more arguments.")
165        return self
166
167    @overload
168    def __or__(
169        self, rhs: "Union[shellous.Command[shellous.Result], Pipeline[shellous.Result]]"
170    ) -> "Pipeline[shellous.Result]": ...  # pragma: no cover
171
172    @overload
173    def __or__(
174        self, rhs: "Union[shellous.Command[str], Pipeline[str]]"
175    ) -> "Pipeline[str]": ...  # pragma: no cover
176
177    @overload
178    def __or__(self, rhs: StdoutType) -> "Pipeline[_RT]": ...  # pragma: no cover
179
180    def __or__(self, rhs: Any) -> "Pipeline[Any]":
181        if isinstance(rhs, (shellous.Command, Pipeline)):
182            return self._add(rhs)  # pyright: ignore[reportUnknownArgumentType]
183        if isinstance(rhs, STDOUT_TYPES):
184            return self.stdout(rhs)
185        if isinstance(rhs, (str, bytes)):
186            raise TypeError(
187                f"{type(rhs)!r} unsupported for | output (Use 'pathlib.Path')"
188            )
189        return NotImplemented
190
191    def __ror__(self, lhs: StdinType) -> "Pipeline[_RT]":
192        if isinstance(lhs, STDIN_TYPES):  # pyright: ignore[reportUnnecessaryIsInstance]
193            return self.stdin(lhs)
194        return NotImplemented
195
196    def __rshift__(self, rhs: StdoutType) -> "Pipeline[_RT]":
197        if isinstance(
198            rhs, STDOUT_TYPES
199        ):  # pyright: ignore[reportUnnecessaryIsInstance]
200            return self.stdout(rhs, append=True)
201        if isinstance(rhs, (str, bytes)):
202            raise TypeError(
203                f"{type(rhs)!r} unsupported for >> output (Use 'pathlib.Path')"
204            )
205        return NotImplemented
206
207    @property
208    def writable(self) -> "Pipeline[_RT]":
209        "Set writable=True option on last command of pipeline."
210        return self._set(_writable=True)
211
212    @property
213    def result(self) -> "Pipeline[shellous.Result]":
214        "Set `_return_result` and `exit_codes`."
215        return cast(
216            Pipeline[shellous.Result],
217            self._set(_return_result=True, exit_codes=range(-255, 256)),
218        )
219
220    def __await__(self) -> "Generator[Any, None, _RT]":
221        return self.coro().__await__()  # FP pylint: disable=no-member
222
223    async def __aenter__(self) -> PipeRunner:
224        "Enter the async context manager."
225        return await context_aenter(self, PipeRunner(self, capturing=True))
226
227    async def __aexit__(
228        self,
229        exc_type: Optional[type[BaseException]],
230        exc_value: Optional[BaseException],
231        exc_tb: Optional[TracebackType],
232    ) -> Optional[bool]:
233        "Exit the async context manager."
234        return await context_aexit(self, exc_type, exc_value, exc_tb)
235
236    def __aiter__(self) -> AsyncIterator[str]:
237        "Return async iterator to iterate over output lines."
238        return aiter_preflight(self)._readlines()
239
240    async def _readlines(self):
241        "Async generator to iterate over lines."
242        async with PipeRunner(self, capturing=True) as run:
243            async for line in run:
244                yield line

A Pipeline is a sequence of commands.

Pipeline(commands: tuple[Command[~_RT], ...] = ())
commands: tuple[Command[~_RT], ...] = ()
@staticmethod
def create( *commands: Command[~_T]) -> Pipeline[~_T]:
47    @staticmethod
48    def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]":
49        "Create a new Pipeline."
50        return Pipeline(commands)

Create a new Pipeline.

name: str
57    @property
58    def name(self) -> str:
59        "Return the name of the pipeline."
60        return "|".join(cmd.name for cmd in self.commands)

Return the name of the pipeline.

options: Options
62    @property
63    def options(self) -> shellous.Options:
64        "Return the last command's options."
65        return self.commands[-1].options

Return the last command's options.

def stdin( self, input_: Any, *, close: bool = False) -> Pipeline[~_RT]:
67    def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]":
68        "Set stdin on the first command of the pipeline."
69        new_first = self.commands[0].stdin(input_, close=close)
70        new_commands = (new_first, *self.commands[1:])
71        return dataclasses.replace(self, commands=new_commands)

Set stdin on the first command of the pipeline.

def stdout( self, output: Any, *, append: bool = False, close: bool = False) -> Pipeline[~_RT]:
73    def stdout(
74        self,
75        output: Any,
76        *,
77        append: bool = False,
78        close: bool = False,
79    ) -> "Pipeline[_RT]":
80        "Set stdout on the last command of the pipeline."
81        new_last = self.commands[-1].stdout(output, append=append, close=close)
82        new_commands = (*self.commands[0:-1], new_last)
83        return dataclasses.replace(self, commands=new_commands)

Set stdout on the last command of the pipeline.

def stderr( self, error: Any, *, append: bool = False, close: bool = False) -> Pipeline[~_RT]:
85    def stderr(
86        self,
87        error: Any,
88        *,
89        append: bool = False,
90        close: bool = False,
91    ) -> "Pipeline[_RT]":
92        "Set stderr on the last command of the pipeline."
93        new_last = self.commands[-1].stderr(error, append=append, close=close)
94        new_commands = (*self.commands[0:-1], new_last)
95        return dataclasses.replace(self, commands=new_commands)

Set stderr on the last command of the pipeline.

def coro(self) -> Coroutine[Any, Any, ~_RT]:
103    def coro(self) -> Coroutine[Any, Any, _RT]:
104        "Return coroutine object for pipeline."
105        return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self))

Return coroutine object for pipeline.

@contextlib.asynccontextmanager
async def prompt( self, prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, *, timeout: Optional[float] = None, normalize_newlines: bool = False) -> AsyncGenerator[Prompt, NoneType]:
107    @contextlib.asynccontextmanager
108    async def prompt(
109        self,
110        prompt: Union[str, list[str], re.Pattern[str], None] = None,
111        *,
112        timeout: Optional[float] = None,
113        normalize_newlines: bool = False,
114    ) -> AsyncGenerator[Prompt, None]:
115        """Run pipeline using the send/expect API.
116
117        This method should be called using `async with`. It returns a `Prompt`
118        object with send() and expect() methods.
119
120        You can optionally set a default `prompt`. This is used by `expect()`
121        when you don't provide another value.
122
123        Use the `timeout` parameter to set the default timeout for operations.
124
125        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
126        This conversion is done before matching with `expect()`. This option
127        does not affect strings sent with `send()`.
128        """
129        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
130
131        cli = None
132        try:
133            async with PipeRunner(cmd, capturing=True) as run:
134                cli = Prompt(
135                    run,
136                    default_prompt=prompt,
137                    default_timeout=timeout,
138                    normalize_newlines=normalize_newlines,
139                )
140                yield cli
141                cli.close()
142        finally:
143            if cli is not None:
144                cli._finish_()  # pyright: ignore[reportPrivateUsage]

Run pipeline using the send/expect API.

This method should be called using async with. It returns a Prompt object with send() and expect() methods.

You can optionally set a default prompt. This is used by expect() when you don't provide another value.

Use the timeout parameter to set the default timeout for operations.

Set normalize_newlines to True to convert incoming CR and CR-LF to LF. This conversion is done before matching with expect(). This option does not affect strings sent with send().

def __len__(self) -> int:
154    def __len__(self) -> int:
155        "Return number of commands in pipe."
156        return len(self.commands)

Return number of commands in pipe.

def __getitem__(self, key: int) -> Command[typing.Any]:
158    def __getitem__(self, key: int) -> shellous.Command[Any]:
159        "Return specified command by index."
160        return self.commands[key]

Return specified command by index.

writable: Pipeline[~_RT]
207    @property
208    def writable(self) -> "Pipeline[_RT]":
209        "Set writable=True option on last command of pipeline."
210        return self._set(_writable=True)

Set writable=True option on last command of pipeline.

result: Pipeline[Result]
212    @property
213    def result(self) -> "Pipeline[shellous.Result]":
214        "Set `_return_result` and `exit_codes`."
215        return cast(
216            Pipeline[shellous.Result],
217            self._set(_return_result=True, exit_codes=range(-255, 256)),
218        )

Set _return_result and exit_codes.

def __await__(self) -> Generator[Any, NoneType, ~_RT]:
220    def __await__(self) -> "Generator[Any, None, _RT]":
221        return self.coro().__await__()  # FP pylint: disable=no-member
async def __aenter__(self) -> PipeRunner:
223    async def __aenter__(self) -> PipeRunner:
224        "Enter the async context manager."
225        return await context_aenter(self, PipeRunner(self, capturing=True))

Enter the async context manager.

def __aiter__(self) -> AsyncIterator[str]:
236    def __aiter__(self) -> AsyncIterator[str]:
237        "Return async iterator to iterate over output lines."
238        return aiter_preflight(self)._readlines()

Return async iterator to iterate over output lines.

class Prompt:
 24class Prompt:
 25    """Utility class to help with an interactive prompt session.
 26
 27    When you are controlling a co-process you will usually "send" some
 28    text to it, and then "expect" a response. The "expect" operation can use
 29    a regular expression to match different types of responses.
 30
 31    Create a new `Prompt` instance using the `prompt()` API.
 32
 33    ```
 34    # In this example, we are using a default prompt of "??? ".
 35    # Setting the PS1 environment variable tells the shell to use this as
 36    # the shell prompt.
 37    cmd = sh("sh").env(PS1="??? ").set(pty=True)
 38
 39    async with cmd.prompt("??? ", timeout=3.0) as cli:
 40        # Turn off terminal echo.
 41        cli.echo = False
 42
 43        # Wait for greeting and initial prompt. Calling expect() with no
 44        # argument will match the default prompt "??? ".
 45        greeting, _ = await cli.expect()
 46
 47        # Send a command and wait for the response.
 48        await cli.send("echo hello")
 49        answer, _ = await cli.expect()
 50        assert answer == "hello\\r\\n"
 51    ```
 52    """
 53
 54    _runner: Union[Runner, PipeRunner]
 55    _encoding: str
 56    _default_prompt: Optional[re.Pattern[str]]
 57    _default_timeout: Optional[float]
 58    _normalize_newlines: bool
 59    _chunk_size: int
 60    _decoder: codecs.IncrementalDecoder
 61    _pending: str = ""
 62    _at_eof: bool = False
 63    _result: Optional[Result] = None
 64
 65    def __init__(
 66        self,
 67        runner: Runner,
 68        *,
 69        default_prompt: Union[str, list[str], re.Pattern[str], None] = None,
 70        default_timeout: Optional[float] = None,
 71        normalize_newlines: bool = False,
 72        _chunk_size: int = _DEFAULT_CHUNK_SIZE,
 73    ):
 74        assert runner.stdin is not None
 75        assert runner.stdout is not None
 76
 77        if isinstance(default_prompt, (str, list)):
 78            default_prompt = _regex_compile_exact(default_prompt)
 79
 80        self._runner = runner
 81        self._encoding = runner.options.encoding
 82        self._default_prompt = default_prompt
 83        self._default_timeout = default_timeout
 84        self._normalize_newlines = normalize_newlines
 85        self._chunk_size = _chunk_size
 86        self._decoder = _make_decoder(self._encoding, normalize_newlines)
 87
 88        if LOG_PROMPT:
 89            LOGGER.info(
 90                "Prompt[pid=%s]: --- BEGIN --- name=%r",
 91                self._runner.pid,
 92                self._runner.name,
 93            )
 94
 95    @property
 96    def at_eof(self) -> bool:
 97        "True if the prompt reader is at the end of file."
 98        return self._at_eof and not self._pending
 99
100    @property
101    def pending(self) -> str:
102        """Characters that still remain in the `pending` buffer."""
103        return self._pending
104
105    @property
106    def echo(self) -> bool:
107        """True if PTY is in echo mode.
108
109        If the runner is not using a PTY, always return False.
110
111        When the process is using a PTY, you can enable/disable terminal echo
112        mode by setting the `echo` property to True/False.
113        """
114        if not isinstance(self._runner, Runner) or self._runner.pty_fd is None:
115            return False
116        return pty_util.get_term_echo(self._runner.pty_fd)
117
118    @echo.setter
119    def echo(self, value: bool) -> None:
120        """Set echo mode for the PTY.
121
122        Raise an error if the runner is not using a PTY.
123        """
124        if not isinstance(self._runner, Runner) or self._runner.pty_fd is None:
125            raise RuntimeError("Cannot set echo mode. Not running in a PTY.")
126        pty_util.set_term_echo(self._runner.pty_fd, value)
127
128    @property
129    def result(self) -> Result:
130        """The `Result` of the co-process when it exited.
131
132        You can only retrieve this property *after* the `async with` block
133        exits where the co-process is running:
134
135        ```
136        async with cmd.prompt() as cli:
137            ...
138        # Access `cli.result` here.
139        ```
140
141        Inside the `async with` block, raise a RuntimeError because the process
142        has not exited yet.
143        """
144        if self._result is None:
145            raise RuntimeError("Prompt process is still running")
146        return self._result
147
148    async def send(
149        self,
150        text: Union[bytes, str],
151        *,
152        end: Optional[str] = _DEFAULT_LINE_END,
153        no_echo: bool = False,
154        timeout: Optional[float] = None,
155    ) -> None:
156        """Write some `text` to co-process standard input and append a newline.
157
158        The `text` parameter is the string that you want to write. Use a `bytes`
159        object to send some raw bytes (e.g. to the terminal driver).
160
161        The default line ending is "\n". Use the `end` parameter to change the
162        line ending. To omit the line ending entirely, specify `end=None`.
163
164        Set `no_echo` to True when you are writing a password. When you set
165        `no_echo` to True, the `send` method will wait for terminal
166        echo mode to be disabled before writing the text. If shellous logging
167        is enabled, the sensitive information will **not** be logged.
168
169        Use the `timeout` parameter to override the default timeout. Normally,
170        data is delivered immediately and this method returns making a trip
171        through the event loop. However, there are situations where the
172        co-process input pipeline fills and we have to wait for it to
173        drain.
174
175        When this method needs to wait for the co-process input pipe to drain,
176        this method will concurrently read from the output pipe into the pending
177        buffer. This is necessary to avoid a deadlock situation where everything
178        stops because neither process can make progress.
179        """
180        if end is None:
181            end = ""
182
183        if no_echo:
184            await self._wait_no_echo()
185
186        if isinstance(text, bytes):
187            data = text + encode_bytes(end, self._encoding)
188        else:
189            data = encode_bytes(text + end, self._encoding)
190
191        stdin = self._runner.stdin
192        assert stdin is not None
193        stdin.write(data)
194
195        if LOG_PROMPT:
196            self._log_send(data, no_echo)
197
198        # Drain our write to stdin.
199        cancelled, ex = await harvest_results(
200            self._drain(stdin),
201            timeout=timeout or self._default_timeout,
202        )
203        if cancelled:
204            raise asyncio.CancelledError()
205        if isinstance(ex[0], Exception):
206            raise ex[0]
207
208    async def expect(
209        self,
210        prompt: Union[str, list[str], re.Pattern[str], None] = None,
211        *,
212        timeout: Optional[float] = None,
213    ) -> tuple[str, re.Match[str]]:
214        """Read from co-process standard output until `prompt` pattern matches.
215
216        Returns a 2-tuple of (output, match) where `output` is the text *before*
217        the prompt pattern and `match` is a `re.Match` object for the prompt
218        text itself.
219
220        If `expect` reaches EOF or a timeout occurs before the prompt pattern
221        matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread
222        characters will be available in the `pending` buffer.
223
224        After this method returns, there may still be characters read from stdout
225        that remain in the `pending` buffer. These are the characters *after* the
226        prompt pattern. You can examine these using the `pending` property.
227        Subsequent calls to expect will examine this buffer first before
228        reading new data from the output pipe.
229
230        By default, this method will use the default `timeout` for the `Prompt`
231        object if one is set. You can use the `timeout` parameter to specify
232        a custom timeout in seconds.
233
234        Prompt Patterns
235        ~~~~~~~~~~~~~~~
236
237        The `expect()` method supports matching fixed strings and regular
238        expressions. The type of the `prompt` parameter determines the type of
239        search.
240
241        No argument or `None`:
242            Use the default prompt pattern. If there is no default prompt,
243            raise a TypeError.
244        `str`:
245            Match this string exactly.
246        `list[str]`:
247            Match one of these strings exactly.
248        `re.Pattern[str]`:
249            Match the given regular expression.
250
251        When matching a regular expression, only a single Pattern object is
252        supported. To match multiple regular expressions, combine them into a
253        single regular expression using *alternation* syntax (|).
254
255        The `expect()` method returns a 2-tuple (output, match). The `match`
256        is the result of the regular expression search (re.Match). If you
257        specify your prompt as a string or list of strings, it is still compiled
258        into a regular expression that produces an `re.Match` object. You can
259        examine the `match` object to determine the prompt value found.
260
261        This method conducts a regular expression search on streaming data. The
262        `expect()` method reads a new chunk of data into the `pending` buffer
263        and then searches it. You must be careful in writing a regular
264        expression so that the search is agnostic to how the incoming chunks of
265        data arrive. Consider including a boundary condition at the end of your pattern.
266        For example, instead of searching for the open-ended pattern`[a-z]+`,
267        search for the pattern `[a-z]+[^a-z]` which ends with a non-letter
268        character.
269
270        Examples
271        ~~~~~~~~
272
273        Expect an exact string:
274
275        ```
276        await cli.expect("ftp> ")
277        await cli.send(command)
278        response, _ = await cli.expect("ftp> ")
279        ```
280
281        Expect a choice of strings:
282
283        ```
284        _, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
285        match m[0]:
286            case "Login: ":
287                await cli.send(login)
288            case "Password: ":
289                await cli.send(password)
290            case "ftp> ":
291                await cli.send(command)
292        ```
293
294        Read until EOF:
295
296        ```
297        data = await cli.read_all()
298        ```
299
300        Read the contents of the `pending` buffer without filling the buffer
301        with any new data from the co-process pipe:
302
303        ```
304        data = await cli.read_pending()
305        ```
306        """
307        if prompt is None:
308            prompt = self._default_prompt
309            if prompt is None:
310                raise TypeError("prompt is required when default prompt is not set")
311        elif isinstance(prompt, (str, list)):
312            prompt = _regex_compile_exact(prompt)
313
314        if self._pending:
315            result = self._search_pending(prompt)
316            if result is not None:
317                return result
318
319        if self._at_eof:
320            raise EOFError("Prompt has reached EOF")
321
322        cancelled, (result,) = await harvest_results(
323            self._read_to_pattern(prompt),
324            timeout=timeout or self._default_timeout,
325        )
326        if cancelled:
327            raise asyncio.CancelledError()
328        if isinstance(result, Exception):
329            raise result
330
331        return result
332
333    async def read_all(
334        self,
335        *,
336        timeout: Optional[float] = None,
337    ) -> str:
338        """Read from co-process output until EOF.
339
340        If we are already at EOF, return "".
341        """
342        if not self._at_eof:
343            cancelled, (result,) = await harvest_results(
344                self._read_some(tag="@read_all"),
345                timeout=timeout or self._default_timeout,
346            )
347            if cancelled:
348                raise asyncio.CancelledError()
349            if isinstance(result, Exception):
350                raise result
351
352        return self.read_pending()
353
354    def read_pending(self) -> str:
355        """Read the contents of the pending buffer and empty it.
356
357        This method does not fill the pending buffer with any new data from the
358        co-process output pipe. If the pending buffer is already empty, return
359        "".
360        """
361        result = self._pending
362        self._pending = ""
363        return result
364
365    async def command(
366        self,
367        text: str,
368        *,
369        end: str = _DEFAULT_LINE_END,
370        no_echo: bool = False,
371        prompt: Union[str, re.Pattern[str], None] = None,
372        timeout: Optional[float] = None,
373        allow_eof: bool = False,
374    ) -> str:
375        """Send some text to the co-process and return the response.
376
377        This method is equivalent to calling send() following by expect().
378        However, the return value is simpler; `command()` does not return the
379        `re.Match` object.
380
381        If you call this method *after* the co-process output pipe has already
382        returned EOF, raise `EOFError`.
383
384        If `allow_eof` is True, this method will read data up to EOF instead of
385        raising an EOFError.
386        """
387        if self._at_eof:
388            raise EOFError("Prompt has reached EOF")
389
390        await self.send(text, end=end, no_echo=no_echo, timeout=timeout)
391        try:
392            result, _ = await self.expect(prompt, timeout=timeout)
393        except EOFError:
394            if not allow_eof:
395                raise
396            result = self.read_pending()
397
398        return result
399
400    def close(self) -> None:
401        "Close stdin to end the prompt session."
402        stdin = self._runner.stdin
403        assert stdin is not None
404
405        if isinstance(self._runner, Runner) and self._runner.pty_eof:
406            # Write EOF twice; once to end the current line, and the second
407            # time to signal the end.
408            stdin.write(self._runner.pty_eof * 2)
409            if LOG_PROMPT:
410                LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid)
411
412        else:
413            stdin.close()
414            if LOG_PROMPT:
415                LOGGER.info("Prompt[pid=%s] close", self._runner.pid)
416
417    def _finish_(self) -> None:
418        "Internal method called when process exits to fetch the `Result` and cache it."
419        self._result = self._runner.result(check=False)
420        if LOG_PROMPT:
421            LOGGER.info(
422                "Prompt[pid=%s]: --- END --- result=%r",
423                self._runner.pid,
424                self._result,
425            )
426
427    async def _read_to_pattern(
428        self,
429        pattern: re.Pattern[str],
430    ) -> tuple[str, re.Match[str]]:
431        """Read text up to part that matches the pattern.
432
433        Returns 2-tuple with (text, match).
434        """
435        stdout = self._runner.stdout
436        assert stdout is not None
437        assert self._chunk_size > 0
438
439        while not self._at_eof:
440            _prev_len = len(self._pending)  # debug check
441
442            try:
443                # Read chunk and check for EOF.
444                chunk = await stdout.read(self._chunk_size)
445                if not chunk:
446                    self._at_eof = True
447            except asyncio.CancelledError:
448                if LOG_PROMPT:
449                    LOGGER.info(
450                        "Prompt[pid=%s] receive cancelled: pending=%r",
451                        self._runner.pid,
452                        self._pending,
453                    )
454                raise
455
456            if LOG_PROMPT:
457                self._log_receive(chunk)
458
459            # Decode eligible bytes into our buffer.
460            data = self._decoder.decode(chunk, final=self._at_eof)
461            if not data and not self._at_eof:
462                continue
463            self._pending += data
464
465            result = self._search_pending(pattern)
466            if result is not None:
467                return result
468
469            assert self._at_eof or len(self._pending) > _prev_len  # debug check
470
471        raise EOFError("Prompt has reached EOF")
472
473    def _search_pending(
474        self,
475        pattern: re.Pattern[str],
476    ) -> Optional[tuple[str, re.Match[str]]]:
477        """Search our `pending` buffer for the pattern.
478
479        If we find a match, we return the data up to the portion that matched
480        and leave the trailing data in the `pending` buffer. This method returns
481        (result, match) or None if there is no match.
482        """
483        found = pattern.search(self._pending)
484        if found:
485            result = self._pending[0 : found.start(0)]
486            self._pending = self._pending[found.end(0) :]
487            if LOG_PROMPT:
488                LOGGER.info(
489                    "Prompt[pid=%s] found: %r [%s CHARS PENDING]",
490                    self._runner.pid,
491                    found,
492                    len(self._pending),
493                )
494            return (result, found)
495
496        return None
497
498    async def _drain(self, stream: asyncio.StreamWriter) -> None:
499        "Drain stream while reading into buffer concurrently."
500        read_task = asyncio.create_task(
501            self._read_some(tag="@drain", concurrent_cancel=True)
502        )
503        try:
504            await stream.drain()
505
506        finally:
507            if not read_task.done():
508                read_task.cancel()
509                await read_task
510
511    async def _read_some(
512        self,
513        *,
514        tag: str = "",
515        concurrent_cancel: bool = False,
516    ) -> None:
517        "Read into `pending` buffer until cancelled or EOF."
518        stdout = self._runner.stdout
519        assert stdout is not None
520        assert self._chunk_size > 0
521
522        while not self._at_eof:
523            # Yield time to other tasks; read() doesn't yield as long as there
524            # is data to read. We need to provide a cancel point when this
525            # method is called during `drain`.
526            if concurrent_cancel:
527                await asyncio.sleep(0)
528
529            # Read chunk and check for EOF.
530            chunk = await stdout.read(self._chunk_size)
531            if not chunk:
532                self._at_eof = True
533
534            if LOG_PROMPT:
535                self._log_receive(chunk, tag)
536
537            # Decode eligible bytes into our buffer.
538            data = self._decoder.decode(chunk, final=self._at_eof)
539            self._pending += data
540
541    async def _wait_no_echo(self):
542        "Wait for terminal echo mode to be disabled."
543        if LOG_PROMPT:
544            LOGGER.info("Prompt[pid=%s] wait: no_echo", self._runner.pid)
545
546        for _ in range(4 * 30):
547            if not self.echo:
548                break
549            await asyncio.sleep(0.25)
550        else:
551            raise RuntimeError("Timed out: Terminal echo mode remains enabled.")
552
553    def _log_send(self, data: bytes, no_echo: bool):
554        "Log data as it is being sent."
555        pid = self._runner.pid
556
557        if no_echo:
558            LOGGER.info("Prompt[pid=%s] send: [[HIDDEN]]", pid)
559        else:
560            data_len = len(data)
561            if data_len > _LOG_LIMIT:
562                LOGGER.info(
563                    "Prompt[pid=%s] send: [%d B] %r...%r",
564                    pid,
565                    data_len,
566                    data[: _LOG_LIMIT - _LOG_LIMIT_END],
567                    data[-_LOG_LIMIT_END:],
568                )
569            else:
570                LOGGER.info(
571                    "Prompt[pid=%s] send: [%d B] %r",
572                    pid,
573                    data_len,
574                    data,
575                )
576
577    def _log_receive(self, data: bytes, tag: str = ""):
578        "Log data as it is being received."
579        pid = self._runner.pid
580        data_len = len(data)
581
582        if data_len > _LOG_LIMIT:
583            LOGGER.info(
584                "Prompt[pid=%s] receive%s: [%d B] %r...%r",
585                pid,
586                tag,
587                data_len,
588                data[: _LOG_LIMIT - _LOG_LIMIT_END],
589                data[-_LOG_LIMIT_END:],
590            )
591        else:
592            LOGGER.info(
593                "Prompt[pid=%s] receive%s: [%d B] %r",
594                pid,
595                tag,
596                data_len,
597                data,
598            )

Utility class to help with an interactive prompt session.

When you are controlling a co-process you will usually "send" some text to it, and then "expect" a response. The "expect" operation can use a regular expression to match different types of responses.

Create a new Prompt instance using the prompt() API.

# In this example, we are using a default prompt of "??? ".
# Setting the PS1 environment variable tells the shell to use this as
# the shell prompt.
cmd = sh("sh").env(PS1="??? ").set(pty=True)

async with cmd.prompt("??? ", timeout=3.0) as cli:
    # Turn off terminal echo.
    cli.echo = False

    # Wait for greeting and initial prompt. Calling expect() with no
    # argument will match the default prompt "??? ".
    greeting, _ = await cli.expect()

    # Send a command and wait for the response.
    await cli.send("echo hello")
    answer, _ = await cli.expect()
    assert answer == "hello\r\n"
Prompt( runner: Runner, *, default_prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, default_timeout: Optional[float] = None, normalize_newlines: bool = False, _chunk_size: int = 16384)
65    def __init__(
66        self,
67        runner: Runner,
68        *,
69        default_prompt: Union[str, list[str], re.Pattern[str], None] = None,
70        default_timeout: Optional[float] = None,
71        normalize_newlines: bool = False,
72        _chunk_size: int = _DEFAULT_CHUNK_SIZE,
73    ):
74        assert runner.stdin is not None
75        assert runner.stdout is not None
76
77        if isinstance(default_prompt, (str, list)):
78            default_prompt = _regex_compile_exact(default_prompt)
79
80        self._runner = runner
81        self._encoding = runner.options.encoding
82        self._default_prompt = default_prompt
83        self._default_timeout = default_timeout
84        self._normalize_newlines = normalize_newlines
85        self._chunk_size = _chunk_size
86        self._decoder = _make_decoder(self._encoding, normalize_newlines)
87
88        if LOG_PROMPT:
89            LOGGER.info(
90                "Prompt[pid=%s]: --- BEGIN --- name=%r",
91                self._runner.pid,
92                self._runner.name,
93            )
at_eof: bool
95    @property
96    def at_eof(self) -> bool:
97        "True if the prompt reader is at the end of file."
98        return self._at_eof and not self._pending

True if the prompt reader is at the end of file.

pending: str
100    @property
101    def pending(self) -> str:
102        """Characters that still remain in the `pending` buffer."""
103        return self._pending

Characters that still remain in the pending buffer.

echo: bool
105    @property
106    def echo(self) -> bool:
107        """True if PTY is in echo mode.
108
109        If the runner is not using a PTY, always return False.
110
111        When the process is using a PTY, you can enable/disable terminal echo
112        mode by setting the `echo` property to True/False.
113        """
114        if not isinstance(self._runner, Runner) or self._runner.pty_fd is None:
115            return False
116        return pty_util.get_term_echo(self._runner.pty_fd)

True if PTY is in echo mode.

If the runner is not using a PTY, always return False.

When the process is using a PTY, you can enable/disable terminal echo mode by setting the echo property to True/False.

result: Result
128    @property
129    def result(self) -> Result:
130        """The `Result` of the co-process when it exited.
131
132        You can only retrieve this property *after* the `async with` block
133        exits where the co-process is running:
134
135        ```
136        async with cmd.prompt() as cli:
137            ...
138        # Access `cli.result` here.
139        ```
140
141        Inside the `async with` block, raise a RuntimeError because the process
142        has not exited yet.
143        """
144        if self._result is None:
145            raise RuntimeError("Prompt process is still running")
146        return self._result

The Result of the co-process when it exited.

You can only retrieve this property after the async with block exits where the co-process is running:

async with cmd.prompt() as cli:
    ...
# Access `cli.result` here.

Inside the async with block, raise a RuntimeError because the process has not exited yet.

async def send( self, text: Union[bytes, str], *, end: Optional[str] = '\n', no_echo: bool = False, timeout: Optional[float] = None) -> None:
148    async def send(
149        self,
150        text: Union[bytes, str],
151        *,
152        end: Optional[str] = _DEFAULT_LINE_END,
153        no_echo: bool = False,
154        timeout: Optional[float] = None,
155    ) -> None:
156        """Write some `text` to co-process standard input and append a newline.
157
158        The `text` parameter is the string that you want to write. Use a `bytes`
159        object to send some raw bytes (e.g. to the terminal driver).
160
161        The default line ending is "\n". Use the `end` parameter to change the
162        line ending. To omit the line ending entirely, specify `end=None`.
163
164        Set `no_echo` to True when you are writing a password. When you set
165        `no_echo` to True, the `send` method will wait for terminal
166        echo mode to be disabled before writing the text. If shellous logging
167        is enabled, the sensitive information will **not** be logged.
168
169        Use the `timeout` parameter to override the default timeout. Normally,
170        data is delivered immediately and this method returns making a trip
171        through the event loop. However, there are situations where the
172        co-process input pipeline fills and we have to wait for it to
173        drain.
174
175        When this method needs to wait for the co-process input pipe to drain,
176        this method will concurrently read from the output pipe into the pending
177        buffer. This is necessary to avoid a deadlock situation where everything
178        stops because neither process can make progress.
179        """
180        if end is None:
181            end = ""
182
183        if no_echo:
184            await self._wait_no_echo()
185
186        if isinstance(text, bytes):
187            data = text + encode_bytes(end, self._encoding)
188        else:
189            data = encode_bytes(text + end, self._encoding)
190
191        stdin = self._runner.stdin
192        assert stdin is not None
193        stdin.write(data)
194
195        if LOG_PROMPT:
196            self._log_send(data, no_echo)
197
198        # Drain our write to stdin.
199        cancelled, ex = await harvest_results(
200            self._drain(stdin),
201            timeout=timeout or self._default_timeout,
202        )
203        if cancelled:
204            raise asyncio.CancelledError()
205        if isinstance(ex[0], Exception):
206            raise ex[0]

Write some text to co-process standard input and append a newline.

    The `text` parameter is the string that you want to write. Use a `bytes`
    object to send some raw bytes (e.g. to the terminal driver).

    The default line ending is "

". Use the end parameter to change the line ending. To omit the line ending entirely, specify end=None.

    Set `no_echo` to True when you are writing a password. When you set
    `no_echo` to True, the `send` method will wait for terminal
    echo mode to be disabled before writing the text. If shellous logging
    is enabled, the sensitive information will **not** be logged.

    Use the `timeout` parameter to override the default timeout. Normally,
    data is delivered immediately and this method returns making a trip
    through the event loop. However, there are situations where the
    co-process input pipeline fills and we have to wait for it to
    drain.

    When this method needs to wait for the co-process input pipe to drain,
    this method will concurrently read from the output pipe into the pending
    buffer. This is necessary to avoid a deadlock situation where everything
    stops because neither process can make progress.
async def expect( self, prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, *, timeout: Optional[float] = None) -> tuple[str, re.Match[str]]:
208    async def expect(
209        self,
210        prompt: Union[str, list[str], re.Pattern[str], None] = None,
211        *,
212        timeout: Optional[float] = None,
213    ) -> tuple[str, re.Match[str]]:
214        """Read from co-process standard output until `prompt` pattern matches.
215
216        Returns a 2-tuple of (output, match) where `output` is the text *before*
217        the prompt pattern and `match` is a `re.Match` object for the prompt
218        text itself.
219
220        If `expect` reaches EOF or a timeout occurs before the prompt pattern
221        matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread
222        characters will be available in the `pending` buffer.
223
224        After this method returns, there may still be characters read from stdout
225        that remain in the `pending` buffer. These are the characters *after* the
226        prompt pattern. You can examine these using the `pending` property.
227        Subsequent calls to expect will examine this buffer first before
228        reading new data from the output pipe.
229
230        By default, this method will use the default `timeout` for the `Prompt`
231        object if one is set. You can use the `timeout` parameter to specify
232        a custom timeout in seconds.
233
234        Prompt Patterns
235        ~~~~~~~~~~~~~~~
236
237        The `expect()` method supports matching fixed strings and regular
238        expressions. The type of the `prompt` parameter determines the type of
239        search.
240
241        No argument or `None`:
242            Use the default prompt pattern. If there is no default prompt,
243            raise a TypeError.
244        `str`:
245            Match this string exactly.
246        `list[str]`:
247            Match one of these strings exactly.
248        `re.Pattern[str]`:
249            Match the given regular expression.
250
251        When matching a regular expression, only a single Pattern object is
252        supported. To match multiple regular expressions, combine them into a
253        single regular expression using *alternation* syntax (|).
254
255        The `expect()` method returns a 2-tuple (output, match). The `match`
256        is the result of the regular expression search (re.Match). If you
257        specify your prompt as a string or list of strings, it is still compiled
258        into a regular expression that produces an `re.Match` object. You can
259        examine the `match` object to determine the prompt value found.
260
261        This method conducts a regular expression search on streaming data. The
262        `expect()` method reads a new chunk of data into the `pending` buffer
263        and then searches it. You must be careful in writing a regular
264        expression so that the search is agnostic to how the incoming chunks of
265        data arrive. Consider including a boundary condition at the end of your pattern.
266        For example, instead of searching for the open-ended pattern`[a-z]+`,
267        search for the pattern `[a-z]+[^a-z]` which ends with a non-letter
268        character.
269
270        Examples
271        ~~~~~~~~
272
273        Expect an exact string:
274
275        ```
276        await cli.expect("ftp> ")
277        await cli.send(command)
278        response, _ = await cli.expect("ftp> ")
279        ```
280
281        Expect a choice of strings:
282
283        ```
284        _, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
285        match m[0]:
286            case "Login: ":
287                await cli.send(login)
288            case "Password: ":
289                await cli.send(password)
290            case "ftp> ":
291                await cli.send(command)
292        ```
293
294        Read until EOF:
295
296        ```
297        data = await cli.read_all()
298        ```
299
300        Read the contents of the `pending` buffer without filling the buffer
301        with any new data from the co-process pipe:
302
303        ```
304        data = await cli.read_pending()
305        ```
306        """
307        if prompt is None:
308            prompt = self._default_prompt
309            if prompt is None:
310                raise TypeError("prompt is required when default prompt is not set")
311        elif isinstance(prompt, (str, list)):
312            prompt = _regex_compile_exact(prompt)
313
314        if self._pending:
315            result = self._search_pending(prompt)
316            if result is not None:
317                return result
318
319        if self._at_eof:
320            raise EOFError("Prompt has reached EOF")
321
322        cancelled, (result,) = await harvest_results(
323            self._read_to_pattern(prompt),
324            timeout=timeout or self._default_timeout,
325        )
326        if cancelled:
327            raise asyncio.CancelledError()
328        if isinstance(result, Exception):
329            raise result
330
331        return result

Read from co-process standard output until prompt pattern matches.

Returns a 2-tuple of (output, match) where output is the text before the prompt pattern and match is a re.Match object for the prompt text itself.

If expect reaches EOF or a timeout occurs before the prompt pattern matches, it raises an EOFError or asyncio.TimeoutError. The unread characters will be available in the pending buffer.

After this method returns, there may still be characters read from stdout that remain in the pending buffer. These are the characters after the prompt pattern. You can examine these using the pending property. Subsequent calls to expect will examine this buffer first before reading new data from the output pipe.

By default, this method will use the default timeout for the Prompt object if one is set. You can use the timeout parameter to specify a custom timeout in seconds.

Prompt Patterns ~~~

The expect() method supports matching fixed strings and regular expressions. The type of the prompt parameter determines the type of search.

No argument or None: Use the default prompt pattern. If there is no default prompt, raise a TypeError. str: Match this string exactly. list[str]: Match one of these strings exactly. re.Pattern[str]: Match the given regular expression.

When matching a regular expression, only a single Pattern object is supported. To match multiple regular expressions, combine them into a single regular expression using alternation syntax (|).

The expect() method returns a 2-tuple (output, match). The match is the result of the regular expression search (re.Match). If you specify your prompt as a string or list of strings, it is still compiled into a regular expression that produces an re.Match object. You can examine the match object to determine the prompt value found.

This method conducts a regular expression search on streaming data. The expect() method reads a new chunk of data into the pending buffer and then searches it. You must be careful in writing a regular expression so that the search is agnostic to how the incoming chunks of data arrive. Consider including a boundary condition at the end of your pattern. For example, instead of searching for the open-ended pattern[a-z]+, search for the pattern [a-z]+[^a-z] which ends with a non-letter character.

Examples ~~~~

Expect an exact string:

await cli.expect("ftp> ")
await cli.send(command)
response, _ = await cli.expect("ftp> ")

Expect a choice of strings:

_, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
match m[0]:
    case "Login: ":
        await cli.send(login)
    case "Password: ":
        await cli.send(password)
    case "ftp> ":
        await cli.send(command)

Read until EOF:

data = await cli.read_all()

Read the contents of the pending buffer without filling the buffer with any new data from the co-process pipe:

data = await cli.read_pending()
async def read_all(self, *, timeout: Optional[float] = None) -> str:
333    async def read_all(
334        self,
335        *,
336        timeout: Optional[float] = None,
337    ) -> str:
338        """Read from co-process output until EOF.
339
340        If we are already at EOF, return "".
341        """
342        if not self._at_eof:
343            cancelled, (result,) = await harvest_results(
344                self._read_some(tag="@read_all"),
345                timeout=timeout or self._default_timeout,
346            )
347            if cancelled:
348                raise asyncio.CancelledError()
349            if isinstance(result, Exception):
350                raise result
351
352        return self.read_pending()

Read from co-process output until EOF.

If we are already at EOF, return "".

def read_pending(self) -> str:
354    def read_pending(self) -> str:
355        """Read the contents of the pending buffer and empty it.
356
357        This method does not fill the pending buffer with any new data from the
358        co-process output pipe. If the pending buffer is already empty, return
359        "".
360        """
361        result = self._pending
362        self._pending = ""
363        return result

Read the contents of the pending buffer and empty it.

This method does not fill the pending buffer with any new data from the co-process output pipe. If the pending buffer is already empty, return "".

async def command( self, text: str, *, end: str = '\n', no_echo: bool = False, prompt: Union[str, re.Pattern[str], NoneType] = None, timeout: Optional[float] = None, allow_eof: bool = False) -> str:
365    async def command(
366        self,
367        text: str,
368        *,
369        end: str = _DEFAULT_LINE_END,
370        no_echo: bool = False,
371        prompt: Union[str, re.Pattern[str], None] = None,
372        timeout: Optional[float] = None,
373        allow_eof: bool = False,
374    ) -> str:
375        """Send some text to the co-process and return the response.
376
377        This method is equivalent to calling send() following by expect().
378        However, the return value is simpler; `command()` does not return the
379        `re.Match` object.
380
381        If you call this method *after* the co-process output pipe has already
382        returned EOF, raise `EOFError`.
383
384        If `allow_eof` is True, this method will read data up to EOF instead of
385        raising an EOFError.
386        """
387        if self._at_eof:
388            raise EOFError("Prompt has reached EOF")
389
390        await self.send(text, end=end, no_echo=no_echo, timeout=timeout)
391        try:
392            result, _ = await self.expect(prompt, timeout=timeout)
393        except EOFError:
394            if not allow_eof:
395                raise
396            result = self.read_pending()
397
398        return result

Send some text to the co-process and return the response.

This method is equivalent to calling send() following by expect(). However, the return value is simpler; command() does not return the re.Match object.

If you call this method after the co-process output pipe has already returned EOF, raise EOFError.

If allow_eof is True, this method will read data up to EOF instead of raising an EOFError.

def close(self) -> None:
400    def close(self) -> None:
401        "Close stdin to end the prompt session."
402        stdin = self._runner.stdin
403        assert stdin is not None
404
405        if isinstance(self._runner, Runner) and self._runner.pty_eof:
406            # Write EOF twice; once to end the current line, and the second
407            # time to signal the end.
408            stdin.write(self._runner.pty_eof * 2)
409            if LOG_PROMPT:
410                LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid)
411
412        else:
413            stdin.close()
414            if LOG_PROMPT:
415                LOGGER.info("Prompt[pid=%s] close", self._runner.pid)

Close stdin to end the prompt session.

def cbreak(rows: int = 0, cols: int = 0) -> Callable[[int], NoneType]:
200def cbreak(rows: int = 0, cols: int = 0) -> PtyAdapter:
201    "Return a function that sets PtyOptions.child_fd to cbreak mode."
202
203    def _pty_set_cbreak(fdesc: int):
204        tty.setcbreak(fdesc)
205        if rows or cols:
206            _set_term_size(fdesc, rows, cols)
207        assert _get_eof(fdesc) == b""
208
209    return _pty_set_cbreak

Return a function that sets PtyOptions.child_fd to cbreak mode.

def cooked( rows: int = 0, cols: int = 0, echo: bool = True) -> Callable[[int], NoneType]:
212def cooked(rows: int = 0, cols: int = 0, echo: bool = True) -> PtyAdapter:
213    "Return a function that leaves PtyOptions.child_fd in cooked mode."
214
215    def _pty_set_canonical(fdesc: int):
216        if rows or cols:
217            _set_term_size(fdesc, rows, cols)
218        if not echo:
219            set_term_echo(fdesc, False)
220        assert _get_eof(fdesc) == b"\x04"
221
222    return _pty_set_canonical

Return a function that leaves PtyOptions.child_fd in cooked mode.

def raw(rows: int = 0, cols: int = 0) -> Callable[[int], NoneType]:
188def raw(rows: int = 0, cols: int = 0) -> PtyAdapter:
189    "Return a function that sets PtyOptions.child_fd to raw mode."
190
191    def _pty_set_raw(fdesc: int):
192        tty.setraw(fdesc)
193        if rows or cols:
194            _set_term_size(fdesc, rows, cols)
195        assert _get_eof(fdesc) == b""
196
197    return _pty_set_raw

Return a function that sets PtyOptions.child_fd to raw mode.

@dataclass(frozen=True, **_KW_ONLY)
class Result:
26@dataclass(frozen=True, **_KW_ONLY)
27class Result:
28    "Concrete class for the result of a Command."
29
30    exit_code: int
31    "Command's exit status. If < 0, this is a negated signal number."
32
33    output_bytes: bytes
34    "Output of command as bytes. May be None if there is no output."
35
36    error_bytes: bytes
37    "Limited standard error from command if not redirected."
38
39    cancelled: bool
40    "Command was cancelled."
41
42    encoding: str
43    "Output encoding."
44
45    @property
46    def output(self) -> str:
47        "Output of command as a string."
48        return decode_bytes(self.output_bytes, self.encoding)
49
50    @property
51    def error(self) -> str:
52        "Error from command as a string (if it is not redirected)."
53        return decode_bytes(self.error_bytes, self.encoding)
54
55    @property
56    def exit_signal(self) -> Optional[signal.Signals]:
57        "Signal that caused the command to exit, or None if no signal."
58        if self.exit_code >= 0:
59            return None
60        return signal.Signals(-self.exit_code)
61
62    def __bool__(self) -> bool:
63        "Return true if exit_code is 0."
64        return self.exit_code == 0

Concrete class for the result of a Command.

Result( *, exit_code: int, output_bytes: bytes, error_bytes: bytes, cancelled: bool, encoding: str)
exit_code: int

Command's exit status. If < 0, this is a negated signal number.

output_bytes: bytes

Output of command as bytes. May be None if there is no output.

error_bytes: bytes

Limited standard error from command if not redirected.

cancelled: bool

Command was cancelled.

encoding: str

Output encoding.

output: str
45    @property
46    def output(self) -> str:
47        "Output of command as a string."
48        return decode_bytes(self.output_bytes, self.encoding)

Output of command as a string.

error: str
50    @property
51    def error(self) -> str:
52        "Error from command as a string (if it is not redirected)."
53        return decode_bytes(self.error_bytes, self.encoding)

Error from command as a string (if it is not redirected).

exit_signal: Optional[signal.Signals]
55    @property
56    def exit_signal(self) -> Optional[signal.Signals]:
57        "Signal that caused the command to exit, or None if no signal."
58        if self.exit_code >= 0:
59            return None
60        return signal.Signals(-self.exit_code)

Signal that caused the command to exit, or None if no signal.

class ResultError(builtins.Exception):
14class ResultError(Exception):
15    "Represents a non-zero exit status."
16
17    @property
18    def result(self) -> "shellous.Result":
19        "Result of the command."
20        return self.args[0]

Represents a non-zero exit status.

result: Result
17    @property
18    def result(self) -> "shellous.Result":
19        "Result of the command."
20        return self.args[0]

Result of the command.

class Runner:
386class Runner:
387    """Runner is an asynchronous context manager that runs a command.
388
389    ```
390    async with Runner(cmd) as run:
391        # process streams: run.stdin, run.stdout, run.stderr (if not None)
392    result = run.result()
393    ```
394    """
395
396    stdin: Optional[asyncio.StreamWriter] = None
397    "Process standard input."
398
399    stdout: Optional[asyncio.StreamReader] = None
400    "Process standard output."
401
402    stderr: Optional[asyncio.StreamReader] = None
403    "Process standard error."
404
405    _options: _RunOptions
406    _tasks: list[asyncio.Task[Any]]
407    _proc: Optional["asyncio.subprocess.Process"] = None
408    _cancelled: bool = False
409    _timer: Optional[asyncio.TimerHandle] = None
410    _timed_out: bool = False
411    _last_signal: Optional[int] = None
412
413    def __init__(self, command: "shellous.Command[Any]"):
414        self._options = _RunOptions(command)
415        self._tasks = []
416
417    @property
418    def name(self) -> str:
419        "Return name of process being run."
420        return self.command.name
421
422    @property
423    def options(self) -> "shellous.Options":
424        "Return options for process being run."
425        return self.command.options
426
427    @property
428    def command(self) -> "shellous.Command[Any]":
429        "Return the command being run."
430        return self._options.command
431
432    @property
433    def pid(self) -> Optional[int]:
434        "Return the command's process ID."
435        if not self._proc:
436            return None
437        return self._proc.pid
438
439    @property
440    def returncode(self) -> Optional[int]:
441        "Process's exit code."
442        if not self._proc:
443            if self._cancelled:
444                # The process was cancelled before starting.
445                return CANCELLED_EXIT_CODE
446            return None
447        code = self._proc.returncode
448        if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None:
449            # After sending a signal, `waitpid` may fail to locate the child
450            # process. In this case, map the status to the last signal we sent.
451            # For more on this, see https://github.com/python/cpython/issues/87744
452            return -self._last_signal  # pylint: disable=invalid-unary-operand-type
453        return code
454
455    @property
456    def cancelled(self) -> bool:
457        "Return True if the command was cancelled."
458        return self._cancelled
459
460    @property
461    def pty_fd(self) -> Optional[int]:
462        """The file descriptor used to communicate with the child PTY process.
463
464        Returns None if the process is not using a PTY.
465        """
466        pty_fds = self._options.pty_fds
467        if pty_fds is not None:
468            return pty_fds.parent_fd
469        return None
470
471    @property
472    def pty_eof(self) -> Optional[bytes]:
473        """Byte sequence used to indicate EOF when written to the PTY child.
474
475        Returns None if process is not using a PTY.
476        """
477        pty_fds = self._options.pty_fds
478        if pty_fds is not None:
479            return pty_fds.eof
480        return None
481
482    def result(self, *, check: bool = True) -> Result:
483        "Check process exit code and raise a ResultError if necessary."
484        code = self.returncode
485        if code is None:
486            raise TypeError("Runner.result(): Process has not exited")
487
488        result = Result(
489            exit_code=code,
490            output_bytes=bytes(self._options.output_bytes or b""),
491            error_bytes=bytes(self._options.error_bytes or b""),
492            cancelled=self._cancelled,
493            encoding=self._options.encoding,
494        )
495
496        if not check:
497            return result
498
499        return check_result(
500            result,
501            self.command.options,
502            self._cancelled,
503            self._timed_out,
504        )
505
506    def add_task(
507        self,
508        coro: Coroutine[Any, Any, _T],
509        tag: str = "",
510    ) -> asyncio.Task[_T]:
511        "Add a background task."
512        task_name = f"{self.name}#{tag}"
513        task = asyncio.create_task(coro, name=task_name)
514        self._tasks.append(task)
515        return task
516
517    def send_signal(self, sig: int) -> None:
518        "Send an arbitrary signal to the process if it is running."
519        if self.returncode is None:
520            self._signal(sig)
521
522    def cancel(self) -> None:
523        "Cancel the running process if it is running."
524        if self.returncode is None:
525            self._signal(self.command.options.cancel_signal)
526
527    def _is_bsd_pty(self) -> bool:
528        "Return true if we're running a pty on BSD."
529        return BSD_DERIVED and bool(self._options.pty_fds)
530
531    @log_method(LOG_DETAIL)
532    async def _wait(self) -> None:
533        "Normal wait for background I/O tasks and process to finish."
534        assert self._proc
535
536        try:
537            if self._tasks:
538                await harvest(*self._tasks, trustee=self)
539            if self._is_bsd_pty():
540                await self._waiter()
541
542        except asyncio.CancelledError:
543            LOGGER.debug("Runner.wait cancelled %r", self)
544            self._set_cancelled()
545            self._tasks.clear()  # all tasks were cancelled
546            await self._kill()
547
548        except Exception as ex:
549            LOGGER.debug("Runner.wait exited with error %r ex=%r", self, ex)
550            self._tasks.clear()  # all tasks were cancelled
551            await self._kill()
552            raise  # re-raise exception
553
554    @log_method(LOG_DETAIL)
555    async def _wait_pid(self):
556        "Manually poll `waitpid` until process finishes."
557        assert self._is_bsd_pty()
558
559        while True:
560            assert self._proc is not None  # (pyright)
561
562            if poll_wait_pid(self._proc):
563                break
564            await asyncio.sleep(0.025)
565
566    @log_method(LOG_DETAIL)
567    async def _kill(self):
568        "Kill process and wait for it to finish."
569        assert self._proc
570
571        cancel_timeout = self.command.options.cancel_timeout
572        cancel_signal = self.command.options.cancel_signal
573
574        try:
575            # If not already done, send cancel signal.
576            if self._proc.returncode is None:
577                self._signal(cancel_signal)
578
579            if self._tasks:
580                await harvest(*self._tasks, timeout=cancel_timeout, trustee=self)
581
582            if self._proc.returncode is None:
583                await harvest(self._waiter(), timeout=cancel_timeout, trustee=self)
584
585        except (asyncio.CancelledError, asyncio.TimeoutError) as ex:
586            LOGGER.warning("Runner.kill %r (ex)=%r", self, ex)
587            if _is_cancelled(ex):
588                self._set_cancelled()
589            await self._kill_wait()
590
591        except (Exception, GeneratorExit) as ex:
592            LOGGER.warning("Runner.kill %r ex=%r", self, ex)
593            await self._kill_wait()
594            raise
595
596    def _signal(self, sig: Optional[int]):
597        "Send a signal to the process."
598        assert self._proc is not None  # (pyright)
599
600        if LOG_DETAIL:
601            LOGGER.debug("Runner.signal %r signal=%r", self, sig)
602        self._audit_callback("signal", signal=sig)
603
604        if sig is None:
605            self._proc.kill()
606        else:
607            self._proc.send_signal(sig)
608            self._last_signal = sig
609
610    @log_method(LOG_DETAIL)
611    async def _kill_wait(self):
612        "Wait for killed process to exit."
613        assert self._proc
614
615        # Check if process is already done.
616        if self._proc.returncode is not None:
617            return
618
619        try:
620            self._signal(None)
621            await harvest(self._waiter(), timeout=_KILL_TIMEOUT, trustee=self)
622        except asyncio.TimeoutError as ex:
623            # Manually check if the process is still running.
624            if poll_wait_pid(self._proc):
625                LOGGER.warning("%r process reaped manually %r", self, self._proc)
626            else:
627                LOGGER.error("%r failed to kill process %r", self, self._proc)
628                raise RuntimeError(f"Unable to kill process {self._proc!r}") from ex
629
630    @log_method(LOG_DETAIL)
631    async def __aenter__(self):
632        "Set up redirections and launch subprocess."
633        self._audit_callback("start")
634        try:
635            return await self._start()
636        except BaseException as ex:
637            self._stop_timer()  # failsafe just in case
638            self._audit_callback("stop", failure=type(ex).__name__)
639            raise
640        finally:
641            if self._cancelled and self.command.options._catch_cancelled_error:
642                # Raises ResultError instead of CancelledError.
643                self.result()
644
645    @log_method(LOG_DETAIL)
646    async def _start(self):
647        "Set up redirections and launch subprocess."
648        # assert self._proc is None
649        assert not self._tasks
650
651        try:
652            # Set up subprocess arguments and launch subprocess.
653            with self._options as opts:
654                await self._subprocess_spawn(opts)
655
656            assert self._proc is not None
657            stdin = self._proc.stdin
658            stdout = self._proc.stdout
659            stderr = self._proc.stderr
660
661            # Assign pty streams.
662            if opts.pty_fds:
663                assert (stdin, stdout) == (None, None)
664                stdin, stdout = opts.pty_fds.writer, opts.pty_fds.reader
665
666            if stderr is not None:
667                limit = None
668                if opts.error_bytes is not None:
669                    error = opts.error_bytes
670                    limit = opts.command.options.error_limit
671                elif opts.is_stderr_only:
672                    assert stdout is None
673                    assert opts.output_bytes is not None
674                    error = opts.output_bytes
675                else:
676                    error = opts.command.options.error
677                stderr = self._setup_output_sink(
678                    stderr, error, opts.encoding, "stderr", limit
679                )
680
681            if stdout is not None:
682                limit = None
683                if opts.output_bytes is not None:
684                    output = opts.output_bytes
685                else:
686                    output = opts.command.options.output
687                stdout = self._setup_output_sink(
688                    stdout, output, opts.encoding, "stdout", limit
689                )
690
691            if stdin is not None:
692                stdin = self._setup_input_source(stdin, opts)
693
694        except (Exception, asyncio.CancelledError) as ex:
695            LOGGER.debug("Runner._start %r ex=%r", self, ex)
696            if _is_cancelled(ex):
697                self._set_cancelled()
698            if self._proc:
699                await self._kill()
700            raise
701
702        # Make final streams available. These may be different from `self.proc`
703        # versions.
704        self.stdin = stdin
705        self.stdout = stdout
706        self.stderr = stderr
707
708        # Add a task to monitor for when the process finishes.
709        if not self._is_bsd_pty():
710            self.add_task(self._waiter(), "waiter")
711
712        # Set a timer to cancel the current task after a timeout.
713        self._start_timer(self.command.options.timeout)
714
715        return self
716
717    @log_method(LOG_DETAIL)
718    async def _subprocess_spawn(self, opts: _RunOptions):
719        "Start the subprocess."
720        assert self._proc is None
721
722        # Second half of pty setup.
723        if opts.pty_fds:
724            opts.pty_fds = await opts.pty_fds.open_streams()
725
726        # Check for task cancellation and yield right before exec'ing. If the
727        # current task is already cancelled, this will raise a CancelledError,
728        # and we save ourselves the work of launching and immediately killing
729        # a process.
730        await asyncio.sleep(0)
731
732        # Launch the subprocess (always completes even if cancelled).
733        await uninterrupted(self._subprocess_exec(opts))
734
735        # Launch the process substitution commands (if any).
736        for cmd in opts.subcmds:
737            self.add_task(cmd.coro(), "procsub")
738
739    @log_method(LOG_DETAIL)
740    async def _subprocess_exec(self, opts: _RunOptions):
741        "Start the subprocess and assign to `self.proc`."
742        with log_timer("asyncio.create_subprocess_exec"):
743            sys.audit(EVENT_SHELLOUS_EXEC, opts.pos_args[0])
744            with pty_util.set_ignore_child_watcher(
745                BSD_DERIVED and opts.pty_fds is not None
746            ):
747                self._proc = await asyncio.create_subprocess_exec(
748                    *opts.pos_args,
749                    **opts.kwd_args,
750                )
751
752    @log_method(LOG_DETAIL)
753    async def _waiter(self):
754        "Run task that waits for process to exit."
755        assert self._proc is not None  # (pyright)
756
757        try:
758            if self._is_bsd_pty():
759                await self._wait_pid()
760            else:
761                await self._proc.wait()
762        finally:
763            self._stop_timer()
764
765    def _set_cancelled(self):
766        "Set the cancelled flag, and cancel any inflight timers."
767        self._cancelled = True
768        self._stop_timer()
769
770    def _start_timer(self, timeout: Optional[float]):
771        "Start an optional timer to cancel the process if `timeout` desired."
772        assert self._timer is None
773        if timeout is not None:
774            loop = asyncio.get_running_loop()
775            task = asyncio.current_task()
776            assert task is not None
777            self._timer = loop.call_later(
778                timeout,
779                self._set_timer_expired,
780                task,
781            )
782
783    def _set_timer_expired(self, main_task: asyncio.Task[Any]):
784        "Set a flag when the timer expires and cancel the main task."
785        self._timed_out = True
786        self._timer = None
787        main_task.cancel()
788
789    def _stop_timer(self):
790        if self._timer:
791            self._timer.cancel()
792            self._timer = None
793
794    def _setup_input_source(
795        self,
796        stream: asyncio.StreamWriter,
797        opts: _RunOptions,
798    ):
799        "Set up a task to read from custom input source."
800        tag = "stdin"
801        eof = opts.pty_fds.eof if opts.pty_fds else None
802
803        if opts.input_bytes is not None:
804            self.add_task(redir.write_stream(opts.input_bytes, stream, eof), tag)
805            return None
806
807        source = opts.command.options.input
808
809        if isinstance(source, asyncio.StreamReader):
810            self.add_task(redir.write_reader(source, stream, eof), tag)
811            return None
812
813        if isinstance(source, io.BytesIO):
814            self.add_task(redir.write_stream(source.getvalue(), stream, eof), tag)
815            return None
816
817        if isinstance(source, io.StringIO):
818            input_bytes = encode_bytes(source.getvalue(), opts.encoding)
819            self.add_task(redir.write_stream(input_bytes, stream, eof), tag)
820            return None
821
822        return stream
823
824    def _setup_output_sink(
825        self,
826        stream: asyncio.StreamReader,
827        sink: Any,
828        encoding: str,
829        tag: str,
830        limit: Optional[int] = None,
831    ) -> Optional[asyncio.StreamReader]:
832        "Set up a task to write to custom output sink."
833        if isinstance(sink, io.StringIO):
834            self.add_task(redir.copy_stringio(stream, sink, encoding), tag)
835            return None
836
837        if isinstance(sink, io.BytesIO):
838            self.add_task(redir.copy_bytesio(stream, sink), tag)
839            return None
840
841        if isinstance(sink, bytearray):
842            # N.B. `limit` is only supported for bytearray.
843            if limit is not None:
844                self.add_task(redir.copy_bytearray_limit(stream, sink, limit), tag)
845            else:
846                self.add_task(redir.copy_bytearray(stream, sink), tag)
847            return None
848
849        if isinstance(sink, Logger):
850            self.add_task(redir.copy_logger(stream, sink, encoding), tag)
851            return None
852
853        if isinstance(sink, asyncio.StreamWriter):
854            self.add_task(redir.copy_streamwriter(stream, sink), tag)
855            return None
856
857        return stream
858
859    @log_method(LOG_DETAIL)
860    async def __aexit__(
861        self,
862        _exc_type: Union[type[BaseException], None],
863        exc_value: Union[BaseException, None],
864        _exc_tb: Optional[TracebackType],
865    ):
866        "Wait for process to exit and handle cancellation."
867        suppress = False
868        try:
869            suppress = await self._finish(exc_value)
870        except asyncio.CancelledError:
871            LOGGER.debug("Runner cancelled inside _finish %r", self)
872            self._set_cancelled()
873        finally:
874            self._stop_timer()  # failsafe just in case
875            self._audit_callback("stop")
876        # If `timeout` expired, raise TimeoutError rather than CancelledError.
877        if (
878            self._cancelled
879            and self._timed_out
880            and not self.command.options._catch_cancelled_error
881        ):
882            raise asyncio.TimeoutError()
883        return suppress
884
885    @log_method(LOG_DETAIL)
886    async def _finish(self, exc_value: Union[BaseException, None]):
887        "Finish the run. Return True only if `exc_value` should be suppressed."
888        assert self._proc
889
890        try:
891            if exc_value is not None:
892                if _is_cancelled(exc_value):
893                    self._set_cancelled()
894                await self._kill()
895                return self._cancelled
896
897            await self._wait()
898            return False
899
900        finally:
901            await self._close()
902
903    @log_method(LOG_DETAIL)
904    async def _close(self):
905        "Make sure that our resources are properly closed."
906        assert self._proc is not None
907
908        if self._options.pty_fds:
909            self._options.pty_fds.close()
910
911        # Make sure the transport is closed (for asyncio and uvloop).
912        self._proc._transport.close()  # pyright: ignore
913
914        # _close can be called when unwinding exceptions. We need to handle
915        # the case that the process has not exited yet.
916        if self._proc.returncode is None:
917            LOGGER.critical("Runner._close process still running %r", self._proc)
918            return
919
920        try:
921            # Make sure that original stdin is properly closed. `wait_closed`
922            # will raise a BrokenPipeError if not all input was properly written.
923            if self._proc.stdin is not None:
924                self._proc.stdin.close()
925                await harvest(
926                    self._proc.stdin.wait_closed(),
927                    timeout=_CLOSE_TIMEOUT,
928                    cancel_finish=True,  # finish `wait_closed` if cancelled
929                    trustee=self,
930                )
931
932        except asyncio.TimeoutError:
933            LOGGER.critical("Runner._close %r timeout stdin=%r", self, self._proc.stdin)
934
935    def _audit_callback(
936        self,
937        phase: str,
938        *,
939        failure: str = "",
940        signal: Optional[int] = None,
941    ):
942        "Call `audit_callback` if there is one."
943        callback = self.command.options.audit_callback
944        if callback:
945            sig = _signame(signal) if phase == "signal" else ""
946            info: shellous.AuditEventInfo = {
947                "runner": self,
948                "failure": failure,
949                "signal": sig,
950            }
951            callback(phase, info)
952
953    def __repr__(self) -> str:
954        "Return string representation of Runner."
955        cancelled = " cancelled" if self._cancelled else ""
956        if self._proc:
957            procinfo = f" pid={self._proc.pid} exit_code={self.returncode}"
958        else:
959            procinfo = " pid=None"
960        return f"<Runner {self.name!r}{cancelled}{procinfo}>"
961
962    async def _readlines(self):
963        "Iterate over lines in stdout/stderr"
964        stream = self.stdout or self.stderr
965        if stream:
966            async for line in redir.read_lines(stream, self._options.encoding):
967                yield line
968
969    def __aiter__(self) -> AsyncIterator[str]:
970        "Return asynchronous iterator over stdout/stderr."
971        return self._readlines()
972
973    @staticmethod
974    async def run_command(
975        command: "shellous.Command[Any]",
976        *,
977        _run_future: Optional[asyncio.Future["Runner"]] = None,
978    ) -> Union[str, Result]:
979        "Run a command. This is the main entry point for Runner."
980        if not _run_future and _is_multiple_capture(command):
981            LOGGER.warning("run_command: multiple capture requires 'async with'")
982            _cleanup(command)
983            raise ValueError("multiple capture requires 'async with'")
984
985        async with Runner(command) as run:
986            if _run_future is not None:
987                # Return streams to caller in another task.
988                _run_future.set_result(run)
989
990        result = run.result()
991        if command.options._return_result:
992            return result
993        return result.output

Runner is an asynchronous context manager that runs a command.

async with Runner(cmd) as run:
    # process streams: run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
Runner(command: Command[typing.Any])
413    def __init__(self, command: "shellous.Command[Any]"):
414        self._options = _RunOptions(command)
415        self._tasks = []
stdin: Optional[asyncio.streams.StreamWriter] = None

Process standard input.

stdout: Optional[asyncio.streams.StreamReader] = None

Process standard output.

stderr: Optional[asyncio.streams.StreamReader] = None

Process standard error.

name: str
417    @property
418    def name(self) -> str:
419        "Return name of process being run."
420        return self.command.name

Return name of process being run.

options: Options
422    @property
423    def options(self) -> "shellous.Options":
424        "Return options for process being run."
425        return self.command.options

Return options for process being run.

command: Command[typing.Any]
427    @property
428    def command(self) -> "shellous.Command[Any]":
429        "Return the command being run."
430        return self._options.command

Return the command being run.

pid: Optional[int]
432    @property
433    def pid(self) -> Optional[int]:
434        "Return the command's process ID."
435        if not self._proc:
436            return None
437        return self._proc.pid

Return the command's process ID.

returncode: Optional[int]
439    @property
440    def returncode(self) -> Optional[int]:
441        "Process's exit code."
442        if not self._proc:
443            if self._cancelled:
444                # The process was cancelled before starting.
445                return CANCELLED_EXIT_CODE
446            return None
447        code = self._proc.returncode
448        if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None:
449            # After sending a signal, `waitpid` may fail to locate the child
450            # process. In this case, map the status to the last signal we sent.
451            # For more on this, see https://github.com/python/cpython/issues/87744
452            return -self._last_signal  # pylint: disable=invalid-unary-operand-type
453        return code

Process's exit code.

cancelled: bool
455    @property
456    def cancelled(self) -> bool:
457        "Return True if the command was cancelled."
458        return self._cancelled

Return True if the command was cancelled.

pty_fd: Optional[int]
460    @property
461    def pty_fd(self) -> Optional[int]:
462        """The file descriptor used to communicate with the child PTY process.
463
464        Returns None if the process is not using a PTY.
465        """
466        pty_fds = self._options.pty_fds
467        if pty_fds is not None:
468            return pty_fds.parent_fd
469        return None

The file descriptor used to communicate with the child PTY process.

Returns None if the process is not using a PTY.

pty_eof: Optional[bytes]
471    @property
472    def pty_eof(self) -> Optional[bytes]:
473        """Byte sequence used to indicate EOF when written to the PTY child.
474
475        Returns None if process is not using a PTY.
476        """
477        pty_fds = self._options.pty_fds
478        if pty_fds is not None:
479            return pty_fds.eof
480        return None

Byte sequence used to indicate EOF when written to the PTY child.

Returns None if process is not using a PTY.

def result(self, *, check: bool = True) -> Result:
482    def result(self, *, check: bool = True) -> Result:
483        "Check process exit code and raise a ResultError if necessary."
484        code = self.returncode
485        if code is None:
486            raise TypeError("Runner.result(): Process has not exited")
487
488        result = Result(
489            exit_code=code,
490            output_bytes=bytes(self._options.output_bytes or b""),
491            error_bytes=bytes(self._options.error_bytes or b""),
492            cancelled=self._cancelled,
493            encoding=self._options.encoding,
494        )
495
496        if not check:
497            return result
498
499        return check_result(
500            result,
501            self.command.options,
502            self._cancelled,
503            self._timed_out,
504        )

Check process exit code and raise a ResultError if necessary.

def add_task( self, coro: Coroutine[Any, Any, ~_T], tag: str = '') -> _asyncio.Task[~_T]:
506    def add_task(
507        self,
508        coro: Coroutine[Any, Any, _T],
509        tag: str = "",
510    ) -> asyncio.Task[_T]:
511        "Add a background task."
512        task_name = f"{self.name}#{tag}"
513        task = asyncio.create_task(coro, name=task_name)
514        self._tasks.append(task)
515        return task

Add a background task.

def send_signal(self, sig: int) -> None:
517    def send_signal(self, sig: int) -> None:
518        "Send an arbitrary signal to the process if it is running."
519        if self.returncode is None:
520            self._signal(sig)

Send an arbitrary signal to the process if it is running.

def cancel(self) -> None:
522    def cancel(self) -> None:
523        "Cancel the running process if it is running."
524        if self.returncode is None:
525            self._signal(self.command.options.cancel_signal)

Cancel the running process if it is running.

@log_method(LOG_DETAIL)
async def __aenter__(self):
630    @log_method(LOG_DETAIL)
631    async def __aenter__(self):
632        "Set up redirections and launch subprocess."
633        self._audit_callback("start")
634        try:
635            return await self._start()
636        except BaseException as ex:
637            self._stop_timer()  # failsafe just in case
638            self._audit_callback("stop", failure=type(ex).__name__)
639            raise
640        finally:
641            if self._cancelled and self.command.options._catch_cancelled_error:
642                # Raises ResultError instead of CancelledError.
643                self.result()

Set up redirections and launch subprocess.

def __aiter__(self) -> AsyncIterator[str]:
969    def __aiter__(self) -> AsyncIterator[str]:
970        "Return asynchronous iterator over stdout/stderr."
971        return self._readlines()

Return asynchronous iterator over stdout/stderr.

@staticmethod
async def run_command( command: Command[typing.Any], *, _run_future: Optional[_asyncio.Future[Runner]] = None) -> Union[str, Result]:
973    @staticmethod
974    async def run_command(
975        command: "shellous.Command[Any]",
976        *,
977        _run_future: Optional[asyncio.Future["Runner"]] = None,
978    ) -> Union[str, Result]:
979        "Run a command. This is the main entry point for Runner."
980        if not _run_future and _is_multiple_capture(command):
981            LOGGER.warning("run_command: multiple capture requires 'async with'")
982            _cleanup(command)
983            raise ValueError("multiple capture requires 'async with'")
984
985        async with Runner(command) as run:
986            if _run_future is not None:
987                # Return streams to caller in another task.
988                _run_future.set_result(run)
989
990        result = run.result()
991        if command.options._return_result:
992            return result
993        return result.output

Run a command. This is the main entry point for Runner.

class PipeRunner:
 996class PipeRunner:
 997    """PipeRunner is an asynchronous context manager that runs a pipeline.
 998
 999    ```
1000    async with pipe.run() as run:
1001        # process run.stdin, run.stdout, run.stderr (if not None)
1002    result = run.result()
1003    ```
1004    """
1005
1006    stdin: Optional[asyncio.StreamWriter] = None
1007    "Pipeline standard input."
1008
1009    stdout: Optional[asyncio.StreamReader] = None
1010    "Pipeline standard output."
1011
1012    stderr: Optional[asyncio.StreamReader] = None
1013    "Pipeline standard error."
1014
1015    _pipe: "shellous.Pipeline[Any]"
1016    _capturing: bool
1017    _tasks: list[asyncio.Task[Any]]
1018    _encoding: str
1019    _cancelled: bool = False
1020    _results: Optional[list[Union[BaseException, Result]]] = None
1021    _pid: int = -1
1022
1023    def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool):
1024        """`capturing=True` indicates we are within an `async with` block and
1025        client needs to access `stdin` and `stderr` streams.
1026        """
1027        assert len(pipe.commands) > 1
1028
1029        self._pipe = pipe
1030        self._cancelled = False
1031        self._tasks = []
1032        self._capturing = capturing
1033        self._encoding = pipe.options.encoding
1034
1035    @property
1036    def name(self) -> str:
1037        "Return name of the pipeline."
1038        return self._pipe.name
1039
1040    @property
1041    def options(self) -> "shellous.Options":
1042        """Return options for pipeline being run.
1043
1044        These are the options for the last command in the pipeline.
1045        """
1046        return self._pipe.options
1047
1048    @property
1049    def pid(self) -> Optional[int]:
1050        """Return the process ID for the first command in the pipeline.
1051
1052        The PID is only available when `capturing=True`.
1053        """
1054        if self._pid < 0:
1055            return None
1056        return self._pid
1057
1058    def result(self, *, check: bool = True) -> Result:
1059        "Return `Result` object for PipeRunner."
1060        assert self._results is not None
1061
1062        result = convert_result_list(self._results, self._cancelled)
1063        if not check:
1064            return result
1065
1066        return check_result(result, self._pipe.options, self._cancelled)
1067
1068    def add_task(
1069        self,
1070        coro: Coroutine[Any, Any, _T],
1071        tag: str = "",
1072    ) -> asyncio.Task[_T]:
1073        "Add a background task."
1074        task_name = f"{self.name}#{tag}"
1075        task = asyncio.create_task(coro, name=task_name)
1076        self._tasks.append(task)
1077        return task
1078
1079    @log_method(LOG_DETAIL)
1080    async def _wait(self, *, kill: bool = False):
1081        "Wait for pipeline to finish."
1082        assert self._results is None
1083
1084        if kill:
1085            LOGGER.debug("PipeRunner.wait killing pipe %r", self)
1086            for task in self._tasks:
1087                task.cancel()
1088
1089        cancelled, self._results = await harvest_results(*self._tasks, trustee=self)
1090        if cancelled:
1091            self._cancelled = True
1092        self._tasks.clear()  # clear all tasks when done
1093
1094    @log_method(LOG_DETAIL)
1095    async def __aenter__(self) -> "PipeRunner":
1096        "Set up redirections and launch pipeline."
1097        try:
1098            return await self._start()
1099        except (Exception, asyncio.CancelledError) as ex:
1100            LOGGER.warning("PipeRunner enter %r ex=%r", self, ex)
1101            if _is_cancelled(ex):
1102                self._cancelled = True
1103            await self._wait(kill=True)
1104            raise
1105
1106    @log_method(LOG_DETAIL)
1107    async def __aexit__(
1108        self,
1109        _exc_type: Union[type[BaseException], None],
1110        exc_value: Union[BaseException, None],
1111        _exc_tb: Optional[TracebackType],
1112    ):
1113        "Wait for pipeline to exit and handle cancellation."
1114        suppress = False
1115        try:
1116            suppress = await self._finish(exc_value)
1117        except asyncio.CancelledError:
1118            LOGGER.warning("PipeRunner cancelled inside _finish %r", self)
1119            self._cancelled = True
1120        return suppress
1121
1122    @log_method(LOG_DETAIL)
1123    async def _finish(self, exc_value: Optional[BaseException]) -> bool:
1124        "Wait for pipeline to exit and handle cancellation."
1125        if exc_value is not None:
1126            LOGGER.warning("PipeRunner._finish exc_value=%r", exc_value)
1127            if _is_cancelled(exc_value):
1128                self._cancelled = True
1129            await self._wait(kill=True)
1130            return self._cancelled
1131
1132        await self._wait()
1133        return False
1134
1135    @log_method(LOG_DETAIL)
1136    async def _start(self):
1137        "Set up redirection and launch pipeline."
1138        open_fds: list[int] = []
1139
1140        try:
1141            stdin = None
1142            stdout = None
1143            stderr = None
1144
1145            cmds = self._setup_pipeline(open_fds)
1146
1147            if self._capturing:
1148                stdin, stdout, stderr = await self._setup_capturing(cmds)
1149            else:
1150                for cmd in cmds:
1151                    self.add_task(cmd.coro())
1152
1153            self.stdin = stdin
1154            self.stdout = stdout
1155            self.stderr = stderr
1156
1157            return self
1158
1159        except BaseException:  # pylint: disable=broad-except
1160            # Clean up after any exception *including* CancelledError.
1161            close_fds(open_fds)
1162            raise
1163
1164    def _setup_pipeline(self, open_fds: list[int]):
1165        """Return the pipeline stitched together with pipe fd's.
1166
1167        Each created open file descriptor is added to `open_fds` so it can
1168        be closed if there's an exception later.
1169        """
1170        cmds = list(self._pipe.commands)
1171
1172        cmd_count = len(cmds)
1173        for i in range(cmd_count - 1):
1174            read_fd, write_fd = os.pipe()
1175            open_fds.extend((read_fd, write_fd))
1176
1177            cmds[i] = cmds[i].stdout(write_fd, close=True)
1178            cmds[i + 1] = cmds[i + 1].stdin(read_fd, close=True)
1179
1180        for i in range(cmd_count):
1181            cmds[i] = cmds[i].set(_return_result=True, _catch_cancelled_error=True)
1182
1183        return cmds
1184
1185    @log_method(LOG_DETAIL)
1186    async def _setup_capturing(self, cmds: "list[shellous.Command[Any]]"):
1187        """Set up capturing and return (stdin, stdout, stderr) streams."""
1188        loop = asyncio.get_event_loop()
1189        first_fut = loop.create_future()
1190        last_fut = loop.create_future()
1191
1192        first_coro = cmds[0].coro(_run_future=first_fut)
1193        last_coro = cmds[-1].coro(_run_future=last_fut)
1194        middle_coros = [cmd.coro() for cmd in cmds[1:-1]]
1195
1196        # Tag each task name with the index of the command in the pipe.
1197        self.add_task(first_coro, "0")
1198        for i, coro in enumerate(middle_coros):
1199            self.add_task(coro, str(i + 1))
1200        self.add_task(last_coro, str(len(cmds) - 1))
1201
1202        # When capturing, we need the first and last commands in the
1203        # pipe to signal when they are ready.
1204        first_ready, last_ready = await asyncio.gather(first_fut, last_fut)
1205
1206        stdin, stdout, stderr = (
1207            first_ready.stdin,
1208            last_ready.stdout,
1209            last_ready.stderr,
1210        )
1211        self._pid = first_ready.pid
1212
1213        return (stdin, stdout, stderr)
1214
1215    def __repr__(self) -> str:
1216        "Return string representation of PipeRunner."
1217        cancelled_info = ""
1218        if self._cancelled:
1219            cancelled_info = " cancelled"
1220        result_info = ""
1221        if self._results:
1222            result_info = f" results={self._results!r}"
1223        return f"<PipeRunner {self.name!r}{cancelled_info}{result_info}>"
1224
1225    async def _readlines(self) -> AsyncIterator[str]:
1226        "Iterate over lines in stdout/stderr"
1227        stream = self.stdout or self.stderr
1228        if stream:
1229            async for line in redir.read_lines(stream, self._encoding):
1230                yield line
1231
1232    def __aiter__(self) -> AsyncIterator[str]:
1233        "Return asynchronous iterator over stdout/stderr."
1234        return self._readlines()
1235
1236    @staticmethod
1237    async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]:
1238        "Run a pipeline. This is the main entry point for PipeRunner."
1239        run = PipeRunner(pipe, capturing=False)
1240        async with run:
1241            pass
1242
1243        result = run.result()
1244        if pipe.options._return_result:
1245            return result
1246        return result.output

PipeRunner is an asynchronous context manager that runs a pipeline.

async with pipe.run() as run:
    # process run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
PipeRunner(pipe: Pipeline[typing.Any], *, capturing: bool)
1023    def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool):
1024        """`capturing=True` indicates we are within an `async with` block and
1025        client needs to access `stdin` and `stderr` streams.
1026        """
1027        assert len(pipe.commands) > 1
1028
1029        self._pipe = pipe
1030        self._cancelled = False
1031        self._tasks = []
1032        self._capturing = capturing
1033        self._encoding = pipe.options.encoding

capturing=True indicates we are within an async with block and client needs to access stdin and stderr streams.

stdin: Optional[asyncio.streams.StreamWriter] = None

Pipeline standard input.

stdout: Optional[asyncio.streams.StreamReader] = None

Pipeline standard output.

stderr: Optional[asyncio.streams.StreamReader] = None

Pipeline standard error.

name: str
1035    @property
1036    def name(self) -> str:
1037        "Return name of the pipeline."
1038        return self._pipe.name

Return name of the pipeline.

options: Options
1040    @property
1041    def options(self) -> "shellous.Options":
1042        """Return options for pipeline being run.
1043
1044        These are the options for the last command in the pipeline.
1045        """
1046        return self._pipe.options

Return options for pipeline being run.

These are the options for the last command in the pipeline.

pid: Optional[int]
1048    @property
1049    def pid(self) -> Optional[int]:
1050        """Return the process ID for the first command in the pipeline.
1051
1052        The PID is only available when `capturing=True`.
1053        """
1054        if self._pid < 0:
1055            return None
1056        return self._pid

Return the process ID for the first command in the pipeline.

The PID is only available when capturing=True.

def result(self, *, check: bool = True) -> Result:
1058    def result(self, *, check: bool = True) -> Result:
1059        "Return `Result` object for PipeRunner."
1060        assert self._results is not None
1061
1062        result = convert_result_list(self._results, self._cancelled)
1063        if not check:
1064            return result
1065
1066        return check_result(result, self._pipe.options, self._cancelled)

Return Result object for PipeRunner.

def add_task( self, coro: Coroutine[Any, Any, ~_T], tag: str = '') -> _asyncio.Task[~_T]:
1068    def add_task(
1069        self,
1070        coro: Coroutine[Any, Any, _T],
1071        tag: str = "",
1072    ) -> asyncio.Task[_T]:
1073        "Add a background task."
1074        task_name = f"{self.name}#{tag}"
1075        task = asyncio.create_task(coro, name=task_name)
1076        self._tasks.append(task)
1077        return task

Add a background task.

@log_method(LOG_DETAIL)
async def __aenter__(self) -> PipeRunner:
1094    @log_method(LOG_DETAIL)
1095    async def __aenter__(self) -> "PipeRunner":
1096        "Set up redirections and launch pipeline."
1097        try:
1098            return await self._start()
1099        except (Exception, asyncio.CancelledError) as ex:
1100            LOGGER.warning("PipeRunner enter %r ex=%r", self, ex)
1101            if _is_cancelled(ex):
1102                self._cancelled = True
1103            await self._wait(kill=True)
1104            raise

Set up redirections and launch pipeline.

def __aiter__(self) -> AsyncIterator[str]:
1232    def __aiter__(self) -> AsyncIterator[str]:
1233        "Return asynchronous iterator over stdout/stderr."
1234        return self._readlines()

Return asynchronous iterator over stdout/stderr.

@staticmethod
async def run_pipeline( pipe: Pipeline[typing.Any]) -> Union[str, Result]:
1236    @staticmethod
1237    async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]:
1238        "Run a pipeline. This is the main entry point for PipeRunner."
1239        run = PipeRunner(pipe, capturing=False)
1240        async with run:
1241            pass
1242
1243        result = run.result()
1244        if pipe.options._return_result:
1245            return result
1246        return result.output

Run a pipeline. This is the main entry point for PipeRunner.

class AuditEventInfo(typing.TypedDict):
75class AuditEventInfo(TypedDict):
76    """Info attached to each audit callback event.
77
78    See `audit_callback` in `Command.set` for more information.
79    """
80
81    runner: Runner
82    "Reference to the Runner object."
83
84    failure: str
85    "When phase is 'stop', the name of the exception from starting the process."
86
87    signal: str
88    "When phase is 'signal', the signal name/number sent to the process."

Info attached to each audit callback event.

See audit_callback in Command.set for more information.

runner: Runner

Reference to the Runner object.

failure: str

When phase is 'stop', the name of the exception from starting the process.

signal: str

When phase is 'signal', the signal name/number sent to the process.