shellous

Async Processes and Pipelines

PyPI docs CI codecov Downloads

shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.

import asyncio
from shellous import sh

async def main():
    result = await sh("echo", "hello")
    print(result)

asyncio.run(main())

Benefits

  • Run programs asynchronously in a single line.
  • Redirect stdin, stdout and stderr to files, memory buffers, async streams or loggers.
  • Iterate asynchronously over subprocess output.
  • Set timeouts and reliably cancel running processes.
  • Run a program with a pseudo-terminal (pty).
  • Use send() and expect() to manually control a subprocess.
  • Construct pipelines and use process substitution directly from Python (no shell required).
  • Runs on Linux, MacOS, FreeBSD and Windows.
  • Monitor processes being started and stopped with audit_callback API.

Requirements

  • Requires Python 3.9 or later.
  • Requires an asyncio event loop.
  • Pseudo-terminals require a Unix system.
  • Process substitution requires a Unix system with /dev/fd support.

Running a Command

The tutorial in this README uses the asyncio REPL built into Python. In these examples, >>> is the REPL prompt.

Start the asyncio REPL by typing python3 -m asyncio, and import sh from the shellous module:

>>> from shellous import sh

Here's a command that runs echo "hello, world".

>>> await sh("echo", "hello, world")
'hello, world\n'

The first argument to sh is the program name. It is followed by zero or more arguments. Each argument will be converted to a string. If an argument is a list or tuple, it is flattened recursively.

>>> await sh("echo", 1, 2, [3, 4, (5, 6)])
'1 2 3 4 5 6\n'

A command does not run until you await it. When you run a command using await, it returns the value of the standard output interpreted as a UTF-8 string. It is safe to await the same command object more than once.

Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc") will run the same command as echo -n "abc".

>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'

Commands are immutable objects that represent a program invocation: program name, arguments, environment variables, redirection operators and other settings. When you use a method to modify a Command, you are returning a new Command object. The original object is unchanged.

You can wrap your commands in a function to improve type safety:

>>> from shellous import Command
>>> def exclaim(word: str) -> Command[str]:
...   return sh("echo", "-n", f"{word}!!")
... 
>>> await exclaim("Oh")
'Oh!!'

The type hint Command[str] indicates that the command returns a str.

Arguments

Commands use positional arguments only; keyword arguments are not supported.

In most cases, shellous automatically converts Python objects passed as command arguments to str or bytes. As described above, the list and tuple types are an exception; they are recursively flattened before their elements are converted to strings.

Dicts, sets, and generator types are not supported as arguments. Their string format doesn't make sense as a command line argument.

Results

When a command completes successfully, it returns the standard output (or "" if stdout is redirected). For a more detailed response, you can specify that the command should return a Result object by using the .result modifier:

>>> await echo.result("abc")
Result(exit_code=0, output_bytes=b'abc', error_bytes=b'', cancelled=False, encoding='utf-8')

A Result object contains the command's exit_code in addition to its output. A Result is True if the command's exit_code is zero. You can access the string value of the output using the .output property:

if result := await sh.result("cat", "some-file"):
    output = result.output
else:
    print(f"Command failed with exit_code={result.exit_code})

You can retrieve the string value of the standard error using the .error property. (By default, only the first 1024 bytes of standard error is stored.)

If a command was terminated by a signal, the exit_code will be the negative signal number.

The return value of sh.result("cmd", ...) uses the type hint Command[Result].

ResultError

If you are not using the .result modifier and a command fails, it raises a ResultError exception:

>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'cat: does_not_exist: No such file or directory\n', cancelled=False, encoding='utf-8')

The ResultError exception contains a Result object with the exit_code and the first 1024 bytes of standard error.

In some cases, you want to ignore certain exit code values. That is, you want to treat them as if they are normal. To do this, you can set the exit_codes option:

>>> await sh("cat", "does_not_exist").set(exit_codes={0,1})
''

If there is a problem launching a process, shellous can also raise a separate FileNotFoundError or PermissionError exception.

Async For

Using await to run a command collects the entire output of the command in memory before returning it. You can also iterate over the output lines as they arrive using async for.

>>> [line async for line in echo("hi\n", "there")]
['hi\n', ' there']

Use an async for loop when you want to examine the stream of output from a command, line by line. For example, suppose you want to run tail on a log file.

async for line in sh("tail", "-f", "/var/log/syslog"):
    if "ERROR" in line:
        print(line.rstrip())

Async With

You can use a command as an asynchronous context manager. There are two ways to run a program using a context manager: a low-level API and a high-level API.

Byte-by-Byte (Low Level)

Use async with directly when you need byte-by-byte control over the individual streams: stdin, stdout and stderr. To control a standard stream, you must tell shellous to "capture" it (For more on this, see Redirection.)

cmd = sh("cat").stdin(sh.CAPTURE).stdout(sh.CAPTURE)
async with cmd as run:
    run.stdin.write(b"abc")
    run.stdin.close()
    print(await run.stdout.readline())

result = run.result()

The streams run.stdout and run.stderr are asyncio.StreamReader objects. The stream run.stdin is an asyncio.StreamWriter object. If we didn't specify that stdin/stdout are sh.CAPTURE, the streams run.stdin and run.stdout would be None.

The return value of run.result() is a Result object. Depending on the command settings, this function may raise a ResultError on a non-zero exit code.

:warning: When reading or writing individual streams, you are responsible for managing reads and writes so they don't deadlock. You may use run.create_task to schedule a concurrent task.

You can also use async with to run a server. When you do so, you must tell the server to stop using run.cancel(). Otherwise, the context manager will wait forever for the process to exit.

async with sh("some-server") as run:
    # Send commands to the server here...
    # Manually signal the server to stop.
    run.cancel()

Prompt with Send/Expect (High Level API)

Use the prompt() method to control a process using send and expect. The prompt() method returns an asynchronous context manager (the Prompt class) that facilitates reading and writing strings and matching regular expressions.

cmd = sh("cat").set(pty=True)

async with cmd.prompt() as client:
  await client.send("abc")
  output, _ = await client.expect("\r\n")
  print(output)

The Prompt API automatically captures stdin and stdout.

Here is another example of controlling a bash co-process running in a docker container.

async def list_packages():
    "Run bash in an ubuntu docker container and list packages."
    bash_prompt = re.compile("root@[0-9a-f]+:/[^#]*# ")
    cmd = sh("docker", "run", "-it", "--rm", "-e", "TERM=dumb", "ubuntu")

    async with cmd.set(pty=True).prompt(bash_prompt, timeout=3) as cli:
        # Read up to first prompt.
        await cli.expect()

        # Disable echo. The `command()` method combines send *and* expect methods.
        await cli.command("stty -echo")

        # Return list of packages.
        result = await cli.command("apt-cache pkgnames")
        return result.strip().split("\r\n")

    # You can check the result object's exit code. You can only
    # access `cli.result` outside the `async with` block.
    assert cli.result.exit_code == 0

The prompt() API does not raise a ResultError when a command exits with an error status. Typically, you'll see an EOFError when you were expecting to read a response. You can check the exit status by retrieving the Prompt's result property outside of the async with block.

Redirection

shellous supports the redirection operators | and >>. They work similar to how they work in the unix shell. Shellous does not support use of < or > for redirection. Instead, replace these with |.

To redirect to or from a file, use a pathlib.Path object. Alternatively, you can redirect input/output to a StringIO object, an open file, a Logger, or use a special redirection constant like sh.DEVNULL.

:warning: When combining the redirect operators with await, you must use parentheses; await has higher precedence than | and >>.

Redirecting Standard Input

To redirect standard input, use the pipe operator | with the argument on the left-side. Here is an example that passes the string "abc" as standard input.

>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
'       3\n'

To read input from a file, use a Path object from pathlib.

>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
'     201\n'

Shellous supports different STDIN behavior when using different Python types.

Python Type Behavior as STDIN
str Read input from string object.
bytes, bytearray Read input from bytes object.
Path Read input from file specified by Path.
File, StringIO, ByteIO Read input from open file object.
int Read input from existing file descriptor.
asyncio.StreamReader Read input from StreamReader.
sh.DEVNULL Read input from /dev/null.
sh.INHERIT Read input from existing sys.stdin.
sh.CAPTURE You will write to stdin interactively.

Redirecting Standard Output

To redirect standard output, use the pipe operator | with the argument on the right-side. Here is an example that writes to a temporary file.

>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'

To redirect standard output with append, use the >> operator.

>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'

Shellous supports different STDOUT behavior when using different Python types.

Python Type Behavior as STDOUT/STDERR
Path Write output to the file path specified by Path.
bytearray Write output to a mutable byte array.
File, StringIO, ByteIO Write output to an open file object.
int Write output to existing file descriptor at its current position. â—†
logging.Logger Log each line of output. â—†
asyncio.StreamWriter Write output to StreamWriter. â—†
sh.CAPTURE Capture output for async with. â—†
sh.DEVNULL Write output to /dev/null. â—†
sh.INHERIT Write output to existing sys.stdout or sys.stderr. â—†

â—† For these types, there is no difference between using | and >>.

Shellous does not support redirecting standard output/error to a plain str or bytes object. If you intend to redirect output to a file, you must use a pathlib.Path object.

Redirecting Standard Error

By default, the first 1024 bytes read from standard error are stored in the Result object. Any further bytes are discarded. You can change the 1024 byte limit using the error_limit option.

To redirect standard error, use the stderr method. Standard error supports the same Python types as standard output. To append, set append=True in the stderr method.

To redirect stderr to the same place as stdout, use the sh.STDOUT constant. If you also redirect stdout to sh.DEVNULL, you will only receive the standard error.

>>> cmd = sh("cat", "does_not_exist").stderr(sh.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'

To redirect standard error to the hosting program's sys.stderr, use the sh.INHERIT redirect option.

>>> cmd = sh("cat", "does_not_exist").stderr(sh.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'', cancelled=False, encoding='utf-8')

If you redirect stderr, it will no longer be stored in the Result object, and the error_limit option will not apply.

Default Redirections

For regular commands, the default redirections are:

  • Standard input is read from the empty string ("").
  • Standard out is buffered and stored in the Result object (BUFFER).
  • First 1024 bytes of standard error is buffered and stored in the Result object (BUFFER).

However, the default redirections are adjusted when using a pseudo-terminal (pty):

  • Standard input is captured and ignored (CAPTURE).
  • Standard out is buffered and stored in the Result object (BUFFER).
  • Standard error is redirected to standard output (STDOUT).

When you use the Prompt API, the standard input and standard output are automatically redirected to CAPTURE.

Pipelines

You can create a pipeline by combining commands using the | operator. A pipeline feeds the standard out of one process into the next process as standard input. Here is the shellous equivalent to the bash command: ls | grep README

>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'

A pipeline returns a Result if the last command in the pipeline has the .result modifier. To set other options like encoding for a Pipeline, set them on the last command.

>>> pipe = sh("ls") | sh("grep", "README").result
>>> await pipe
Result(exit_code=0, output_bytes=b'README.md\n', error_bytes=b'', cancelled=False, encoding='utf-8')

Error reporting for a pipeline is implemented similar to using the -o pipefail shell option.

Pipelines support the same await/async for/async with operations that work on a single command, including the Prompt API.

>>> [line.strip() async for line in pipe]
['README.md']

Process Substitution (Unix Only)

You can pass a shell command as an argument to another. Here is the shellous equivalent to the bash command: grep README <(ls).

>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'

Use .writable to write to a command instead.

>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | sh.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')

The above example is equivalent to ls | tee >(grep README > buf) > /dev/null.

Timeouts

You can specify a timeout using the timeout option. If the timeout expires, shellous will raise a TimeoutError.

>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
  ...
TimeoutError

Timeouts are just a special case of cancellation. When a command is cancelled, shellous terminates the running process and raises a CancelledError.

>>> t = asyncio.create_task(sh("sleep", 60).coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
  ...
CancelledError

By default, shellous will send a SIGTERM signal to the process to tell it to exit. If the process does not exit within 3 seconds, shellous will send a SIGKILL signal. You can change these defaults with the cancel_signal and cancel_timeout settings. A command is not considered fully cancelled until the process exits.

Pseudo-Terminal Support (Unix Only)

To run a command through a pseudo-terminal, set the pty option to True.

>>> await sh("echo", "in a pty").set(pty=True)
'in a pty\r\n'

Alternatively, you can pass a pty function to configure the tty mode and size.

>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'

Shellous provides three built-in helper functions: shellous.cooked(), shellous.raw() and shellous.cbreak().

Context Objects

You can store shared command settings in an immutable context object (CmdContext). To create a new context object, specify your changes to the default context sh:

>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh_audit = sh.set(audit_callback=auditor)

Now all commands created with sh_audit will log their progress using the audit callback.

>>> await sh_audit("echo", "goodbye")
start echo
stop echo
'goodbye\n'

You can also create a context object that specifies all return values are Result objects.

>>> rsh = sh.result
>>> await rsh("echo", "whatever")
Result(exit_code=0, output_bytes=b'whatever\n', error_bytes=b'', cancelled=False, encoding='utf-8')

Options

Both Command and CmdContext support options to control their runtime behavior. Some of these options (timeout, pty, audit_callback, and exit_codes) have been described above. See the shellous.Options class for more information.

You can retrieve an option from cmd with cmd.options.<option>. For example, use cmd.options.encoding to obtain the encoding:

>>> cmd = sh("echo").set(encoding="latin1")
>>> cmd.options.encoding
'latin1'

Command and CmdContext use the .set() method to specify most options:

Option Description
path Search path to use instead of the PATH environment variable. (Default=None)
env Additional environment variables to pass to the command. (Default={})
inherit_env True if command should inherit the environment variables from the current process. (Default=True)
encoding Text encoding of input/output streams. You can specify an error handling scheme by including it after a space, e.g. "ascii backslashreplace". (Default="utf-8 strict")
exit_codes Set of exit codes that do not raise a ResultError. (Default={0})
timeout Timeout in seconds to wait before cancelling the process. (Default=None)
cancel_timeout Timeout in seconds to wait for a cancelled process to exit before forcefully terminating it. (Default=3s)
cancel_signal The signal sent to a process when it is cancelled. (Default=SIGTERM)
alt_name Alternate name for the process used for debug logging. (Default=None)
pass_fds Additional file descriptors to pass to the process. (Default={})
pass_fds_close True if descriptors in pass_fds should be closed after the child process is launched. (Default=False)
pty Used to allocate a pseudo-terminal (PTY). (Default=False)
close_fds True if process should close all file descriptors when it starts. This setting defaults to False to align with posix_spawn requirements. (Default=False)
audit_callback Provide function to audit stages of process execution. (Default=None)
coerce_arg Provide function to coerce Command arguments to strings when str() is not sufficient. For example, you can provide your own function that converts a dictionary argument to a sequence of strings. (Default=None)
error_limit Maximum number of initial bytes of STDERR to store in Result object. (Default=1024)

env

Use the env() method to add to the list of environment variables. The env() method supports keyword parameters. You can call env() more than once and the effect is additive.

>>> cmd = sh("echo").env(ENV1="a", ENV2="b").env(ENV2=3)
>>> cmd.options.env
{'ENV1': 'a', 'ENV2': '3'}

Use the env option with set() when you want to replace all the environment variables.

input, output, error

When you apply a redirection operator to a Command or CmdContext, the redirection targets are also stored in the Options object. To change these, use the .stdin(), .stdout(), or .stderr() methods or the redirection operator |.

Option Description
input The redirection target for standard input.
input_close True if standard input should be closed after the process is launched.
output The redirection target for standard output.
output_append True if standard output should be open for append.
output_close True if standard output should be closed after the process is launched.
error The redirection target for standard error.
error_append True if standard error should be open for append.
error_close True if standard error should be closed after the process is launched.

Type Checking

Shellous fully supports PEP 484 type hints.

Commands

Commands are generic on the return type, either str or Result. You will specify the type of a command object as Command[str] or Command[Result].

Use the result modifier to obtain a Command[Result] from a Command[str].

from shellous import sh, Command, Result

cmd1: Command[str] = sh("echo", "abc")
# When you `await cmd1`, the result is a `str` object.

cmd2: Command[Result] = sh.result("echo", "abc")
# When you `await cmd2`, the result is a `Result` object.

CmdContext

The CmdContext class is also generic on either str or Result.

from shellous import sh, CmdContext, Result

sh1: CmdContext[str] = sh.set(path="/bin:/usr/bin")
# When you use `sh1` to create commands, it produces `Command[str]` object with the given path.

sh2: CmdContext[Result] = sh.result.set(path="/bin:/usr/bin")
# When you use `sh2` to create commands, it produces `Command[Result]` objects with the given path.

Logging

For verbose logging, shellous supports a SHELLOUS_TRACE environment variable. Set the value of SHELLOUS_TRACE to a comma-delimited list of options:

  • detail: Enables detailed logging used to trace the steps of running a command.

  • prompt: Enables logging in the Prompt class when controlling a program using send/expect.

  • all: Enables all logging options.

Shellous uses the built-in Python logging module. After enabling these options, the shellous logger will display log messages at the INFO level.

Without these options enabled, Shellous generates almost no log messages.

 1"""
 2.. include:: ../README.md
 3"""
 4
 5# pylint: disable=cyclic-import
 6# pyright: reportUnusedImport=false
 7
 8__version__ = "0.38.0"
 9
10import sys
11import warnings
12
13from .command import AuditEventInfo, CmdContext, Command, Options
14from .pipeline import Pipeline
15from .prompt import Prompt
16from .pty_util import cbreak, cooked, raw
17from .result import Result, ResultError
18from .runner import PipeRunner, Runner
19
20if sys.version_info[:3] in [(3, 10, 9), (3, 11, 1)]:
21    # Warn about these specific Python releases: 3.10.9 and 3.11.1
22    # These releases have a known race condition.
23    warnings.warn(  # pragma: no cover
24        "Python 3.10.9 and Python 3.11.1 are unreliable with respect to "
25        + "asyncio subprocesses. Consider a newer Python release: 3.10.10+ "
26        + "or 3.11.2+. (https://github.com/python/cpython/issues/100133)",
27        RuntimeWarning,
28    )
29
30
31sh: CmdContext[str] = CmdContext()
32"""`sh` is the default command context (`CmdContext`).
33
34Use `sh` to create commands or new command contexts.
35
36```python
37from shellous import sh
38result = await sh("echo", "hello")
39```
40"""
41
42__all__ = [
43    "sh",
44    "CmdContext",
45    "Command",
46    "Options",
47    "Pipeline",
48    "Prompt",
49    "cbreak",
50    "cooked",
51    "raw",
52    "Result",
53    "ResultError",
54    "Runner",
55    "PipeRunner",
56    "AuditEventInfo",
57]
sh: CmdContext[str] = CmdContext(options=Options(path=None, inherit_env=True, input=<Redirect.DEFAULT: -20>, input_close=False, output=<Redirect.DEFAULT: -20>, output_append=False, output_close=False, error=<Redirect.DEFAULT: -20>, error_append=False, error_close=False, error_limit=1024, encoding='utf-8', _return_result=False, _catch_cancelled_error=False, exit_codes=None, timeout=None, cancel_timeout=3.0, cancel_signal=<Signals.SIGTERM: 15>, alt_name=None, pass_fds=(), pass_fds_close=False, _writable=False, _start_new_session=False, _preexec_fn=None, pty=False, close_fds=True, audit_callback=None, coerce_arg=None))

sh is the default command context (CmdContext).

Use sh to create commands or new command contexts.

from shellous import sh
result = await sh("echo", "hello")
@dataclass(frozen=True)
class CmdContext(typing.Generic[~_RT]):
278@dataclass(frozen=True)
279class CmdContext(Generic[_RT]):
280    """Concrete class for an immutable execution context."""
281
282    CAPTURE: ClassVar[Redirect] = Redirect.CAPTURE
283    "Capture and read/write stream manually."
284
285    DEVNULL: ClassVar[Redirect] = Redirect.DEVNULL
286    "Redirect to /dev/null."
287
288    INHERIT: ClassVar[Redirect] = Redirect.INHERIT
289    "Redirect to same place as existing stdin/stderr/stderr."
290
291    STDOUT: ClassVar[Redirect] = Redirect.STDOUT
292    "Redirect stderr to same place as stdout."
293
294    BUFFER: ClassVar[Redirect] = Redirect.BUFFER
295    "Redirect output to a buffer in the Result object. This is the default for stdout/stderr."
296
297    options: Options = field(default_factory=Options)
298    "Default command options."
299
300    def stdin(
301        self,
302        input_: Any,
303        *,
304        close: bool = False,
305    ) -> "CmdContext[_RT]":
306        "Return new context with updated `input` settings."
307        new_options = self.options.set_stdin(input_, close)
308        return CmdContext(new_options)
309
310    def stdout(
311        self,
312        output: Any,
313        *,
314        append: bool = False,
315        close: bool = False,
316    ) -> "CmdContext[_RT]":
317        "Return new context with updated `output` settings."
318        new_options = self.options.set_stdout(output, append, close)
319        return CmdContext(new_options)
320
321    def stderr(
322        self,
323        error: Any,
324        *,
325        append: bool = False,
326        close: bool = False,
327    ) -> "CmdContext[_RT]":
328        "Return new context with updated `error` settings."
329        new_options = self.options.set_stderr(error, append, close)
330        return CmdContext(new_options)
331
332    def env(self, **kwds: Any) -> "CmdContext[_RT]":
333        """Return new context with augmented environment."""
334        new_options = self.options.add_env(kwds)
335        return CmdContext(new_options)
336
337    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
338        self,
339        *,
340        path: Unset[Optional[str]] = _UNSET,
341        env: Unset[dict[str, Any]] = _UNSET,
342        inherit_env: Unset[bool] = _UNSET,
343        encoding: Unset[str] = _UNSET,
344        _return_result: Unset[bool] = _UNSET,
345        _catch_cancelled_error: Unset[bool] = _UNSET,
346        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
347        timeout: Unset[Optional[float]] = _UNSET,
348        cancel_timeout: Unset[float] = _UNSET,
349        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
350        alt_name: Unset[Optional[str]] = _UNSET,
351        pass_fds: Unset[Iterable[int]] = _UNSET,
352        pass_fds_close: Unset[bool] = _UNSET,
353        _writable: Unset[bool] = _UNSET,
354        _start_new_session: Unset[bool] = _UNSET,
355        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
356        pty: Unset[PtyAdapterOrBool] = _UNSET,
357        close_fds: Unset[bool] = _UNSET,
358        audit_callback: Unset[_AuditFnT] = _UNSET,
359        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
360        error_limit: Unset[Optional[int]] = _UNSET,
361    ) -> "CmdContext[_RT]":
362        """Return new context with custom options set.
363
364        See `Command.set` for option reference.
365        """
366        kwargs = locals()
367        del kwargs["self"]
368        if not encoding:
369            raise TypeError("invalid encoding")
370        return CmdContext(self.options.set(kwargs))
371
372    def __call__(self, *args: Any) -> "Command[_RT]":
373        "Construct a new command."
374        return Command(coerce(args, self.options.coerce_arg), self.options)
375
376    @property
377    def result(self) -> "CmdContext[shellous.Result]":
378        "Set `_return_result` and `exit_codes`."
379        return cast(
380            CmdContext[shellous.Result],
381            self.set(
382                _return_result=True,
383                exit_codes=range(-255, 256),
384            ),
385        )
386
387    def find_command(self, name: str) -> Optional[Path]:
388        """Find the command with the given name and return its filesystem path.
389
390        Return None if the command name is not found in the search path.
391
392        Use the `path` variable specified by the context if set. Otherwise, the
393        default behavior is to use the `PATH` environment variable with a
394        fallback to the value of `os.defpath`.
395        """
396        result = self.options.which(name)
397        if not result:
398            return None
399        return Path(result)

Concrete class for an immutable execution context.

CmdContext(options: Options = <factory>)
CAPTURE: ClassVar[shellous.redirect.Redirect] = <Redirect.CAPTURE: -10>

Capture and read/write stream manually.

DEVNULL: ClassVar[shellous.redirect.Redirect] = <Redirect.DEVNULL: -3>

Redirect to /dev/null.

INHERIT: ClassVar[shellous.redirect.Redirect] = <Redirect.INHERIT: -11>

Redirect to same place as existing stdin/stderr/stderr.

STDOUT: ClassVar[shellous.redirect.Redirect] = <Redirect.STDOUT: -2>

Redirect stderr to same place as stdout.

BUFFER: ClassVar[shellous.redirect.Redirect] = <Redirect.BUFFER: -12>

Redirect output to a buffer in the Result object. This is the default for stdout/stderr.

options: Options

Default command options.

def stdin( self, input_: Any, *, close: bool = False) -> CmdContext[~_RT]:
300    def stdin(
301        self,
302        input_: Any,
303        *,
304        close: bool = False,
305    ) -> "CmdContext[_RT]":
306        "Return new context with updated `input` settings."
307        new_options = self.options.set_stdin(input_, close)
308        return CmdContext(new_options)

Return new context with updated input settings.

def stdout( self, output: Any, *, append: bool = False, close: bool = False) -> CmdContext[~_RT]:
310    def stdout(
311        self,
312        output: Any,
313        *,
314        append: bool = False,
315        close: bool = False,
316    ) -> "CmdContext[_RT]":
317        "Return new context with updated `output` settings."
318        new_options = self.options.set_stdout(output, append, close)
319        return CmdContext(new_options)

Return new context with updated output settings.

def stderr( self, error: Any, *, append: bool = False, close: bool = False) -> CmdContext[~_RT]:
321    def stderr(
322        self,
323        error: Any,
324        *,
325        append: bool = False,
326        close: bool = False,
327    ) -> "CmdContext[_RT]":
328        "Return new context with updated `error` settings."
329        new_options = self.options.set_stderr(error, append, close)
330        return CmdContext(new_options)

Return new context with updated error settings.

def env(self, **kwds: Any) -> CmdContext[~_RT]:
332    def env(self, **kwds: Any) -> "CmdContext[_RT]":
333        """Return new context with augmented environment."""
334        new_options = self.options.add_env(kwds)
335        return CmdContext(new_options)

Return new context with augmented environment.

def set( self, *, path: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, env: Union[dict[str, Any], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, inherit_env: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, encoding: Union[str, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _return_result: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _catch_cancelled_error: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, exit_codes: Union[Container[int], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, timeout: Union[float, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_timeout: Union[float, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_signal: Union[signal.Signals, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, alt_name: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds: Union[Iterable[int], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds_close: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _writable: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _start_new_session: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _preexec_fn: Union[Callable[[], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pty: Union[Callable[[int], NoneType], bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, close_fds: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, audit_callback: Union[Callable[[str, AuditEventInfo], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, coerce_arg: Union[Callable[[Any], Any], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, error_limit: Union[int, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>) -> CmdContext[~_RT]:
337    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
338        self,
339        *,
340        path: Unset[Optional[str]] = _UNSET,
341        env: Unset[dict[str, Any]] = _UNSET,
342        inherit_env: Unset[bool] = _UNSET,
343        encoding: Unset[str] = _UNSET,
344        _return_result: Unset[bool] = _UNSET,
345        _catch_cancelled_error: Unset[bool] = _UNSET,
346        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
347        timeout: Unset[Optional[float]] = _UNSET,
348        cancel_timeout: Unset[float] = _UNSET,
349        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
350        alt_name: Unset[Optional[str]] = _UNSET,
351        pass_fds: Unset[Iterable[int]] = _UNSET,
352        pass_fds_close: Unset[bool] = _UNSET,
353        _writable: Unset[bool] = _UNSET,
354        _start_new_session: Unset[bool] = _UNSET,
355        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
356        pty: Unset[PtyAdapterOrBool] = _UNSET,
357        close_fds: Unset[bool] = _UNSET,
358        audit_callback: Unset[_AuditFnT] = _UNSET,
359        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
360        error_limit: Unset[Optional[int]] = _UNSET,
361    ) -> "CmdContext[_RT]":
362        """Return new context with custom options set.
363
364        See `Command.set` for option reference.
365        """
366        kwargs = locals()
367        del kwargs["self"]
368        if not encoding:
369            raise TypeError("invalid encoding")
370        return CmdContext(self.options.set(kwargs))

Return new context with custom options set.

See Command.set for option reference.

result: CmdContext[Result]
376    @property
377    def result(self) -> "CmdContext[shellous.Result]":
378        "Set `_return_result` and `exit_codes`."
379        return cast(
380            CmdContext[shellous.Result],
381            self.set(
382                _return_result=True,
383                exit_codes=range(-255, 256),
384            ),
385        )

Set _return_result and exit_codes.

def find_command(self, name: str) -> Optional[pathlib.Path]:
387    def find_command(self, name: str) -> Optional[Path]:
388        """Find the command with the given name and return its filesystem path.
389
390        Return None if the command name is not found in the search path.
391
392        Use the `path` variable specified by the context if set. Otherwise, the
393        default behavior is to use the `PATH` environment variable with a
394        fallback to the value of `os.defpath`.
395        """
396        result = self.options.which(name)
397        if not result:
398            return None
399        return Path(result)

Find the command with the given name and return its filesystem path.

Return None if the command name is not found in the search path.

Use the path variable specified by the context if set. Otherwise, the default behavior is to use the PATH environment variable with a fallback to the value of os.defpath.

@dataclass(frozen=True)
class Command(typing.Generic[~_RT]):
402@dataclass(frozen=True)
403class Command(Generic[_RT]):
404    """A Command instance is lightweight and immutable object that specifies the
405    arguments and options used to run a program. Commands do not do anything
406    until they are awaited.
407
408    Commands are always created by a CmdContext.
409
410    ```
411    from shellous import sh
412
413    # Create a new command from the context.
414    echo = sh("echo", "hello, world")
415
416    # Run the command.
417    result = await echo
418    ```
419    """
420
421    args: "tuple[Union[str, bytes, os.PathLike[Any], Command[Any], shellous.Pipeline[Any]], ...]"
422    "Command arguments including the program name as first argument."
423
424    options: Options
425    "Command options."
426
427    def __post_init__(self) -> None:
428        "Validate the command."
429        if len(self.args) == 0:
430            raise ValueError("Command must include program name")
431
432    @property
433    def name(self) -> str:
434        """Returns the name of the program being run.
435
436        Names longer than 31 characters are truncated. If `alt_name` option
437        is set, return that instead.
438        """
439        if self.options.alt_name:
440            return self.options.alt_name
441        name = str(self.args[0])
442        if len(name) > 31:
443            return f"...{name[-31:]}"
444        return name
445
446    def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]":
447        "Pass `input` to command's standard input."
448        new_options = self.options.set_stdin(input_, close)
449        return Command(self.args, new_options)
450
451    def stdout(
452        self,
453        output: Any,
454        *,
455        append: bool = False,
456        close: bool = False,
457    ) -> "Command[_RT]":
458        "Redirect standard output to `output`."
459        new_options = self.options.set_stdout(output, append, close)
460        return Command(self.args, new_options)
461
462    def stderr(
463        self,
464        error: Any,
465        *,
466        append: bool = False,
467        close: bool = False,
468    ) -> "Command[_RT]":
469        "Redirect standard error to `error`."
470        new_options = self.options.set_stderr(error, append, close)
471        return Command(self.args, new_options)
472
473    def env(self, **kwds: Any) -> "Command[_RT]":
474        """Return new command with augmented environment.
475
476        The changes to the environment variables made by this method are
477        additive. For example, calling `cmd.env(A=1).env(B=2)` produces a
478        command with the environment set to `{"A": "1", "B": "2"}`.
479
480        To clear the environment, use the `cmd.set(env={})` method.
481        """
482        new_options = self.options.add_env(kwds)
483        return Command(self.args, new_options)
484
485    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
486        self,
487        *,
488        path: Unset[Optional[str]] = _UNSET,
489        env: Unset[dict[str, Any]] = _UNSET,
490        inherit_env: Unset[bool] = _UNSET,
491        encoding: Unset[str] = _UNSET,
492        _return_result: Unset[bool] = _UNSET,
493        _catch_cancelled_error: Unset[bool] = _UNSET,
494        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
495        timeout: Unset[Optional[float]] = _UNSET,
496        cancel_timeout: Unset[float] = _UNSET,
497        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
498        alt_name: Unset[Optional[str]] = _UNSET,
499        pass_fds: Unset[Iterable[int]] = _UNSET,
500        pass_fds_close: Unset[bool] = _UNSET,
501        _writable: Unset[bool] = _UNSET,
502        _start_new_session: Unset[bool] = _UNSET,
503        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
504        pty: Unset[PtyAdapterOrBool] = _UNSET,
505        close_fds: Unset[bool] = _UNSET,
506        audit_callback: Unset[_AuditFnT] = _UNSET,
507        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
508        error_limit: Unset[Optional[int]] = _UNSET,
509    ) -> "Command[_RT]":
510        """Return new command with custom options set.
511
512        **path** (str | None) default=None<br>
513        Search path for locating command executable. By default, `path` is None
514        which causes shellous to rely on the `PATH` environment variable.
515
516        **env** (dict[str, str]) default={}<br>
517        Set the environment variables for the subprocess. If `inherit_env` is
518        True, the subprocess will also inherit the environment variables
519        specified by the parent process.
520
521        Using `set(env=...)` will replace all environment variables using the
522        dictionary argument. You can also use the `env(...)` method to modify
523        the existing environment incrementally.
524
525        **inherit_env** (bool) default=True<br>
526        Subprocess should inherit the parent process environment. If this is
527        False, the subprocess will only have environment variables specified
528        by `Command.env`. If `inherit_env` is True, the parent process
529        environment is augmented/overridden by any variables specified in
530        `Command.env`.
531
532        **encoding** (str) default="utf-8"<br>
533        String encoding to use for subprocess input/output. To specify `errors`,
534        append it after a space. For example, use "utf-8 replace" to specify
535        "utf-8" with errors "replace".
536
537        **_return_result** (bool) default=False<br>
538        When True, return a `Result` object instead of the standard output.
539        Private API -- use the `result` modifier instead.
540
541        **_catch_cancelled_error** (bool) default=False<br>
542        When True, raise a `ResultError` when the command is cancelled.
543        Private API -- used internally by PipeRunner.
544
545        **exit_codes** (set[int] | None) default=None<br>
546        Set of allowed exit codes that will not raise a `ResultError`. By default,
547        `exit_codes` is `None` which indicates that 0 is the only valid exit
548        status. Any other exit status will raise a `ResultError`. In addition to
549        sets of integers, you can use a `range` object, e.g. `range(256)` for
550        any positive exit status.
551
552        **timeout** (float | None) default=None<br>
553        Timeout in seconds to wait before we cancel the process. The timer
554        begins immediately after the process is launched. This differs from
555        using `asyncio.wait_for` which includes the process launch time also.
556        If timeout is None (the default), there is no timeout.
557
558        **cancel_timeout** (float) default=3.0 seconds<br>
559        Timeout in seconds to wait for a process to exit after sending it a
560        `cancel_signal`. If the process does not exit after waiting for
561        `cancel_timeout` seconds, we send a kill signal to the process.
562
563        **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br>
564        Signal sent to a process when it is cancelled. If `cancel_signal` is
565        None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on
566        Windows.
567
568        **alt_name** (str| None) default=None<br>
569        Alternative name of the command displayed in logs. Used to resolve
570        ambiguity when the actual command name is a scripting language.
571
572        **pass_fds** (Iterable[int]) default=()<br>
573        Specify open file descriptors to pass to the subprocess.
574
575        **pass_fds_close** (bool) default=False<br>
576        Close the file descriptors in `pass_fds` immediately in the current
577        process immediately after launching the subprocess.
578
579        **_writable** (bool) default=False<br>
580        Used to indicate process substitution is writing.
581        Private API -- use the `writable` modifier instead.
582
583        **_start_new_session** (bool) default=False<br>
584        Private API -- provided for testing purposes only.
585
586        **_preexec_fn** (Callable() | None) default=None<br>
587        Private API -- provided for testing purposes only.
588
589        **pty** (bool | Callable(int)) default=False<br>
590        If True, use a pseudo-terminal (pty) to control the child process.
591        If `pty` is set to a callable, the function must take one int argument
592        for the child side of the pty. The function is called to set the child
593        pty's termios settings before spawning the subprocess.
594
595        shellous provides three utility functions: `shellous.cooked`,
596        `shellous.raw` and `shellous.cbreak` that can be used as arguments to
597        the `pty` option.
598
599        **close_fds** (bool) default=True<br>
600        Close all unnecessary file descriptors in the child process. This
601        defaults to True to align with the default behavior of the subprocess
602        module.
603
604        **audit_callback** (Callable(phase, info) | None) default=None<br>
605        Specify a function to call as the command execution goes through its
606        lifecycle. `audit_callback` is a function called with two arguments,
607        *phase* and *info*.
608
609        *phase* can be one of three values:
610
611            "start": The process is about to start.
612
613            "stop": The process finally stopped.
614
615            "signal": The process is being sent a signal.
616
617        *info* is a dictionary providing more information for the callback. The
618        following keys are currently defined:
619
620            "runner" (Runner): Reference to the Runner object.
621
622            "failure" (str): When phase is "stop", optional string with the
623            name of the exception from launching the process.
624
625            "signal" (str): When phase is "signal", the signal name/number
626            sent to the process, e.g. "SIGTERM".
627
628        The primary use case for `audit_callback` is measuring how long each
629        command takes to run and exporting this information to a metrics
630        framework like Prometheus.
631
632        **coerce_arg** (Callable(arg) | None) default=None<br>
633        Specify a function to call on each command line argument. This function
634        can specify how to coerce unsupported argument types (e.g. dict) to
635        a sequence of strings. This function should return the original value
636        unchanged if there is no conversion needed.
637
638        **error_limit** (int | None) default=1024<br>
639        Specify the number of bytes to store when redirecting STDERR to BUFFER.
640        After reading up to `error_limit` bytes, shellous will continue to
641        read from stderr, but will not store any additional bytes. Setting
642        `error_limit` only affects the internal BUFFER; it has no effect when
643        using other redirection types.
644
645        """
646        kwargs = locals()
647        del kwargs["self"]
648        if not encoding:
649            raise TypeError("invalid encoding")
650        return Command(self.args, self.options.set(kwargs))
651
652    def _replace_args(self, new_args: Sequence[Any]) -> "Command[_RT]":
653        """Return new command with arguments replaced by `new_args`.
654
655        Arguments are NOT type-checked by the context. Program name must be the
656        exact same object.
657        """
658        assert new_args
659        assert new_args[0] is self.args[0]
660        return Command(tuple(new_args), self.options)
661
662    def coro(
663        self,
664        *,
665        _run_future: Optional[asyncio.Future[Runner]] = None,
666    ) -> Coroutine[Any, Any, _RT]:
667        "Return coroutine object to run awaitable."
668        return cast(
669            Coroutine[Any, Any, _RT],
670            Runner.run_command(self, _run_future=_run_future),
671        )
672
673    @contextlib.asynccontextmanager
674    async def prompt(
675        self,
676        prompt: Union[str, list[str], re.Pattern[str], None] = None,
677        *,
678        timeout: Optional[float] = None,
679        normalize_newlines: bool = False,
680    ) -> AsyncIterator[Prompt]:
681        """Run command using the send/expect API.
682
683        This method should be called using `async with`. It returns a `Prompt`
684        object with send() and expect() methods.
685
686        You can optionally set a default `prompt`. This is used by `expect()`
687        when you don't provide another value.
688
689        Use the `timeout` parameter to set the default timeout for operations.
690
691        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
692        This conversion is done before matching with `expect()`. This option
693        does not affect strings sent with `send()`.
694        """
695        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
696
697        cli = None
698        try:
699            async with Runner(cmd) as run:
700                cli = Prompt(
701                    run,
702                    default_prompt=prompt,
703                    default_timeout=timeout,
704                    normalize_newlines=normalize_newlines,
705                )
706                yield cli
707                cli.close()
708        finally:
709            if cli is not None:
710                cli._finish_()  # pyright: ignore[reportPrivateUsage]
711
712    def __await__(self) -> "Generator[Any, None, _RT]":
713        "Run process and return the standard output."
714        return self.coro().__await__()
715
716    async def __aenter__(self) -> Runner:
717        "Enter the async context manager."
718        return await context_aenter(self, Runner(self))
719
720    async def __aexit__(
721        self,
722        exc_type: Optional[type[BaseException]],
723        exc_value: Optional[BaseException],
724        exc_tb: Optional[TracebackType],
725    ) -> Optional[bool]:
726        "Exit the async context manager."
727        return await context_aexit(self, exc_type, exc_value, exc_tb)
728
729    def __aiter__(self) -> AsyncIterator[str]:
730        "Return async iterator to iterate over output lines."
731        return aiter_preflight(self)._readlines()
732
733    async def _readlines(self) -> AsyncIterator[str]:
734        "Async generator to iterate over lines."
735        async with Runner(self) as run:
736            async for line in run:
737                yield line
738
739    def __call__(self, *args: Any) -> "Command[_RT]":
740        "Apply more arguments to the end of the command."
741        if not args:
742            return self
743        new_args = self.args + coerce(args, self.options.coerce_arg)
744        return Command(new_args, self.options)
745
746    def __str__(self) -> str:
747        """Return string representation for command.
748
749        Display the full name of the command only. Don't include arguments or
750        environment variables.
751        """
752        return str(self.args[0])
753
754    @overload
755    def __or__(self, rhs: StdoutType) -> "Command[_RT]": ...  # pragma: no cover
756
757    @overload
758    def __or__(
759        self, rhs: "Command[str]"
760    ) -> "shellous.Pipeline[str]": ...  # pragma: no cover
761
762    @overload
763    def __or__(
764        self,
765        rhs: "Command[shellous.Result]",
766    ) -> "shellous.Pipeline[shellous.Result]": ...  # pragma: no cover
767
768    def __or__(self, rhs: Any) -> Any:
769        "Bitwise or operator is used to build pipelines."
770        if isinstance(rhs, STDOUT_TYPES):
771            return self.stdout(rhs)
772        return shellous.Pipeline.create(self) | rhs
773
774    def __ror__(self, lhs: StdinType) -> "Command[_RT]":
775        "Bitwise or operator is used to build pipelines."
776        if isinstance(lhs, STDIN_TYPES):  # pyright: ignore[reportUnnecessaryIsInstance]
777            return self.stdin(lhs)
778        return NotImplemented
779
780    def __rshift__(self, rhs: StdoutType) -> "Command[_RT]":
781        "Right shift operator is used to build pipelines."
782        if isinstance(
783            rhs, STDOUT_TYPES
784        ):  # pyright: ignore[reportUnnecessaryIsInstance]
785            return self.stdout(rhs, append=True)
786        return NotImplemented
787
788    @property
789    def writable(self) -> "Command[_RT]":
790        "Set `writable` to True."
791        return self.set(_writable=True)
792
793    @property
794    def result(self) -> "Command[shellous.Result]":
795        "Set `_return_result` and `exit_codes`."
796        return cast(
797            Command[shellous.Result],
798            self.set(_return_result=True, exit_codes=range(-255, 256)),
799        )

A Command instance is lightweight and immutable object that specifies the arguments and options used to run a program. Commands do not do anything until they are awaited.

Commands are always created by a CmdContext.

from shellous import sh

# Create a new command from the context.
echo = sh("echo", "hello, world")

# Run the command.
result = await echo
Command( args: tuple[typing.Union[str, bytes, os.PathLike[typing.Any], Command[typing.Any], Pipeline[typing.Any]], ...], options: Options)
args: tuple[typing.Union[str, bytes, os.PathLike[typing.Any], Command[typing.Any], Pipeline[typing.Any]], ...]

Command arguments including the program name as first argument.

options: Options

Command options.

name: str
432    @property
433    def name(self) -> str:
434        """Returns the name of the program being run.
435
436        Names longer than 31 characters are truncated. If `alt_name` option
437        is set, return that instead.
438        """
439        if self.options.alt_name:
440            return self.options.alt_name
441        name = str(self.args[0])
442        if len(name) > 31:
443            return f"...{name[-31:]}"
444        return name

Returns the name of the program being run.

Names longer than 31 characters are truncated. If alt_name option is set, return that instead.

def stdin( self, input_: Any, *, close: bool = False) -> Command[~_RT]:
446    def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]":
447        "Pass `input` to command's standard input."
448        new_options = self.options.set_stdin(input_, close)
449        return Command(self.args, new_options)

Pass input to command's standard input.

def stdout( self, output: Any, *, append: bool = False, close: bool = False) -> Command[~_RT]:
451    def stdout(
452        self,
453        output: Any,
454        *,
455        append: bool = False,
456        close: bool = False,
457    ) -> "Command[_RT]":
458        "Redirect standard output to `output`."
459        new_options = self.options.set_stdout(output, append, close)
460        return Command(self.args, new_options)

Redirect standard output to output.

def stderr( self, error: Any, *, append: bool = False, close: bool = False) -> Command[~_RT]:
462    def stderr(
463        self,
464        error: Any,
465        *,
466        append: bool = False,
467        close: bool = False,
468    ) -> "Command[_RT]":
469        "Redirect standard error to `error`."
470        new_options = self.options.set_stderr(error, append, close)
471        return Command(self.args, new_options)

Redirect standard error to error.

def env(self, **kwds: Any) -> Command[~_RT]:
473    def env(self, **kwds: Any) -> "Command[_RT]":
474        """Return new command with augmented environment.
475
476        The changes to the environment variables made by this method are
477        additive. For example, calling `cmd.env(A=1).env(B=2)` produces a
478        command with the environment set to `{"A": "1", "B": "2"}`.
479
480        To clear the environment, use the `cmd.set(env={})` method.
481        """
482        new_options = self.options.add_env(kwds)
483        return Command(self.args, new_options)

Return new command with augmented environment.

The changes to the environment variables made by this method are additive. For example, calling cmd.env(A=1).env(B=2) produces a command with the environment set to {"A": "1", "B": "2"}.

To clear the environment, use the cmd.set(env={}) method.

def set( self, *, path: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, env: Union[dict[str, Any], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, inherit_env: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, encoding: Union[str, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _return_result: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _catch_cancelled_error: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, exit_codes: Union[Container[int], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, timeout: Union[float, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_timeout: Union[float, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, cancel_signal: Union[signal.Signals, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, alt_name: Union[str, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds: Union[Iterable[int], shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pass_fds_close: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _writable: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _start_new_session: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, _preexec_fn: Union[Callable[[], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, pty: Union[Callable[[int], NoneType], bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, close_fds: Union[bool, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, audit_callback: Union[Callable[[str, AuditEventInfo], NoneType], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, coerce_arg: Union[Callable[[Any], Any], NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>, error_limit: Union[int, NoneType, shellous.command._UnsetEnum] = <_UnsetEnum.UNSET: 1>) -> Command[~_RT]:
485    def set(  # pylint: disable=unused-argument, too-many-locals, too-many-arguments
486        self,
487        *,
488        path: Unset[Optional[str]] = _UNSET,
489        env: Unset[dict[str, Any]] = _UNSET,
490        inherit_env: Unset[bool] = _UNSET,
491        encoding: Unset[str] = _UNSET,
492        _return_result: Unset[bool] = _UNSET,
493        _catch_cancelled_error: Unset[bool] = _UNSET,
494        exit_codes: Unset[Optional[Container[int]]] = _UNSET,
495        timeout: Unset[Optional[float]] = _UNSET,
496        cancel_timeout: Unset[float] = _UNSET,
497        cancel_signal: Unset[Optional[signal.Signals]] = _UNSET,
498        alt_name: Unset[Optional[str]] = _UNSET,
499        pass_fds: Unset[Iterable[int]] = _UNSET,
500        pass_fds_close: Unset[bool] = _UNSET,
501        _writable: Unset[bool] = _UNSET,
502        _start_new_session: Unset[bool] = _UNSET,
503        _preexec_fn: Unset[_PreexecFnT] = _UNSET,
504        pty: Unset[PtyAdapterOrBool] = _UNSET,
505        close_fds: Unset[bool] = _UNSET,
506        audit_callback: Unset[_AuditFnT] = _UNSET,
507        coerce_arg: Unset[_CoerceArgFnT] = _UNSET,
508        error_limit: Unset[Optional[int]] = _UNSET,
509    ) -> "Command[_RT]":
510        """Return new command with custom options set.
511
512        **path** (str | None) default=None<br>
513        Search path for locating command executable. By default, `path` is None
514        which causes shellous to rely on the `PATH` environment variable.
515
516        **env** (dict[str, str]) default={}<br>
517        Set the environment variables for the subprocess. If `inherit_env` is
518        True, the subprocess will also inherit the environment variables
519        specified by the parent process.
520
521        Using `set(env=...)` will replace all environment variables using the
522        dictionary argument. You can also use the `env(...)` method to modify
523        the existing environment incrementally.
524
525        **inherit_env** (bool) default=True<br>
526        Subprocess should inherit the parent process environment. If this is
527        False, the subprocess will only have environment variables specified
528        by `Command.env`. If `inherit_env` is True, the parent process
529        environment is augmented/overridden by any variables specified in
530        `Command.env`.
531
532        **encoding** (str) default="utf-8"<br>
533        String encoding to use for subprocess input/output. To specify `errors`,
534        append it after a space. For example, use "utf-8 replace" to specify
535        "utf-8" with errors "replace".
536
537        **_return_result** (bool) default=False<br>
538        When True, return a `Result` object instead of the standard output.
539        Private API -- use the `result` modifier instead.
540
541        **_catch_cancelled_error** (bool) default=False<br>
542        When True, raise a `ResultError` when the command is cancelled.
543        Private API -- used internally by PipeRunner.
544
545        **exit_codes** (set[int] | None) default=None<br>
546        Set of allowed exit codes that will not raise a `ResultError`. By default,
547        `exit_codes` is `None` which indicates that 0 is the only valid exit
548        status. Any other exit status will raise a `ResultError`. In addition to
549        sets of integers, you can use a `range` object, e.g. `range(256)` for
550        any positive exit status.
551
552        **timeout** (float | None) default=None<br>
553        Timeout in seconds to wait before we cancel the process. The timer
554        begins immediately after the process is launched. This differs from
555        using `asyncio.wait_for` which includes the process launch time also.
556        If timeout is None (the default), there is no timeout.
557
558        **cancel_timeout** (float) default=3.0 seconds<br>
559        Timeout in seconds to wait for a process to exit after sending it a
560        `cancel_signal`. If the process does not exit after waiting for
561        `cancel_timeout` seconds, we send a kill signal to the process.
562
563        **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br>
564        Signal sent to a process when it is cancelled. If `cancel_signal` is
565        None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on
566        Windows.
567
568        **alt_name** (str| None) default=None<br>
569        Alternative name of the command displayed in logs. Used to resolve
570        ambiguity when the actual command name is a scripting language.
571
572        **pass_fds** (Iterable[int]) default=()<br>
573        Specify open file descriptors to pass to the subprocess.
574
575        **pass_fds_close** (bool) default=False<br>
576        Close the file descriptors in `pass_fds` immediately in the current
577        process immediately after launching the subprocess.
578
579        **_writable** (bool) default=False<br>
580        Used to indicate process substitution is writing.
581        Private API -- use the `writable` modifier instead.
582
583        **_start_new_session** (bool) default=False<br>
584        Private API -- provided for testing purposes only.
585
586        **_preexec_fn** (Callable() | None) default=None<br>
587        Private API -- provided for testing purposes only.
588
589        **pty** (bool | Callable(int)) default=False<br>
590        If True, use a pseudo-terminal (pty) to control the child process.
591        If `pty` is set to a callable, the function must take one int argument
592        for the child side of the pty. The function is called to set the child
593        pty's termios settings before spawning the subprocess.
594
595        shellous provides three utility functions: `shellous.cooked`,
596        `shellous.raw` and `shellous.cbreak` that can be used as arguments to
597        the `pty` option.
598
599        **close_fds** (bool) default=True<br>
600        Close all unnecessary file descriptors in the child process. This
601        defaults to True to align with the default behavior of the subprocess
602        module.
603
604        **audit_callback** (Callable(phase, info) | None) default=None<br>
605        Specify a function to call as the command execution goes through its
606        lifecycle. `audit_callback` is a function called with two arguments,
607        *phase* and *info*.
608
609        *phase* can be one of three values:
610
611            "start": The process is about to start.
612
613            "stop": The process finally stopped.
614
615            "signal": The process is being sent a signal.
616
617        *info* is a dictionary providing more information for the callback. The
618        following keys are currently defined:
619
620            "runner" (Runner): Reference to the Runner object.
621
622            "failure" (str): When phase is "stop", optional string with the
623            name of the exception from launching the process.
624
625            "signal" (str): When phase is "signal", the signal name/number
626            sent to the process, e.g. "SIGTERM".
627
628        The primary use case for `audit_callback` is measuring how long each
629        command takes to run and exporting this information to a metrics
630        framework like Prometheus.
631
632        **coerce_arg** (Callable(arg) | None) default=None<br>
633        Specify a function to call on each command line argument. This function
634        can specify how to coerce unsupported argument types (e.g. dict) to
635        a sequence of strings. This function should return the original value
636        unchanged if there is no conversion needed.
637
638        **error_limit** (int | None) default=1024<br>
639        Specify the number of bytes to store when redirecting STDERR to BUFFER.
640        After reading up to `error_limit` bytes, shellous will continue to
641        read from stderr, but will not store any additional bytes. Setting
642        `error_limit` only affects the internal BUFFER; it has no effect when
643        using other redirection types.
644
645        """
646        kwargs = locals()
647        del kwargs["self"]
648        if not encoding:
649            raise TypeError("invalid encoding")
650        return Command(self.args, self.options.set(kwargs))

Return new command with custom options set.

path (str | None) default=None
Search path for locating command executable. By default, path is None which causes shellous to rely on the PATH environment variable.

env (dict[str, str]) default={}
Set the environment variables for the subprocess. If inherit_env is True, the subprocess will also inherit the environment variables specified by the parent process.

Using set(env=...) will replace all environment variables using the dictionary argument. You can also use the env(...) method to modify the existing environment incrementally.

inherit_env (bool) default=True
Subprocess should inherit the parent process environment. If this is False, the subprocess will only have environment variables specified by Command.env. If inherit_env is True, the parent process environment is augmented/overridden by any variables specified in Command.env.

encoding (str) default="utf-8"
String encoding to use for subprocess input/output. To specify errors, append it after a space. For example, use "utf-8 replace" to specify "utf-8" with errors "replace".

_return_result (bool) default=False
When True, return a Result object instead of the standard output. Private API -- use the result modifier instead.

_catch_cancelled_error (bool) default=False
When True, raise a ResultError when the command is cancelled. Private API -- used internally by PipeRunner.

exit_codes (set[int] | None) default=None
Set of allowed exit codes that will not raise a ResultError. By default, exit_codes is None which indicates that 0 is the only valid exit status. Any other exit status will raise a ResultError. In addition to sets of integers, you can use a range object, e.g. range(256) for any positive exit status.

timeout (float | None) default=None
Timeout in seconds to wait before we cancel the process. The timer begins immediately after the process is launched. This differs from using asyncio.wait_for which includes the process launch time also. If timeout is None (the default), there is no timeout.

cancel_timeout (float) default=3.0 seconds
Timeout in seconds to wait for a process to exit after sending it a cancel_signal. If the process does not exit after waiting for cancel_timeout seconds, we send a kill signal to the process.

cancel_signal (signals.Signal | None) default=signal.SIGTERM
Signal sent to a process when it is cancelled. If cancel_signal is None, send a SIGKILL on Unix and SIGTERM (TerminateProcess) on Windows.

alt_name (str| None) default=None
Alternative name of the command displayed in logs. Used to resolve ambiguity when the actual command name is a scripting language.

pass_fds (Iterable[int]) default=()
Specify open file descriptors to pass to the subprocess.

pass_fds_close (bool) default=False
Close the file descriptors in pass_fds immediately in the current process immediately after launching the subprocess.

_writable (bool) default=False
Used to indicate process substitution is writing. Private API -- use the writable modifier instead.

_start_new_session (bool) default=False
Private API -- provided for testing purposes only.

_preexec_fn (Callable() | None) default=None
Private API -- provided for testing purposes only.

pty (bool | Callable(int)) default=False
If True, use a pseudo-terminal (pty) to control the child process. If pty is set to a callable, the function must take one int argument for the child side of the pty. The function is called to set the child pty's termios settings before spawning the subprocess.

shellous provides three utility functions: shellous.cooked, shellous.raw and shellous.cbreak that can be used as arguments to the pty option.

close_fds (bool) default=True
Close all unnecessary file descriptors in the child process. This defaults to True to align with the default behavior of the subprocess module.

audit_callback (Callable(phase, info) | None) default=None
Specify a function to call as the command execution goes through its lifecycle. audit_callback is a function called with two arguments, phase and info.

phase can be one of three values:

"start": The process is about to start.

"stop": The process finally stopped.

"signal": The process is being sent a signal.

info is a dictionary providing more information for the callback. The following keys are currently defined:

"runner" (Runner): Reference to the Runner object.

"failure" (str): When phase is "stop", optional string with the
name of the exception from launching the process.

"signal" (str): When phase is "signal", the signal name/number
sent to the process, e.g. "SIGTERM".

The primary use case for audit_callback is measuring how long each command takes to run and exporting this information to a metrics framework like Prometheus.

coerce_arg (Callable(arg) | None) default=None
Specify a function to call on each command line argument. This function can specify how to coerce unsupported argument types (e.g. dict) to a sequence of strings. This function should return the original value unchanged if there is no conversion needed.

error_limit (int | None) default=1024
Specify the number of bytes to store when redirecting STDERR to BUFFER. After reading up to error_limit bytes, shellous will continue to read from stderr, but will not store any additional bytes. Setting error_limit only affects the internal BUFFER; it has no effect when using other redirection types.

def coro( self, *, _run_future: Optional[_asyncio.Future[Runner]] = None) -> Coroutine[Any, Any, ~_RT]:
662    def coro(
663        self,
664        *,
665        _run_future: Optional[asyncio.Future[Runner]] = None,
666    ) -> Coroutine[Any, Any, _RT]:
667        "Return coroutine object to run awaitable."
668        return cast(
669            Coroutine[Any, Any, _RT],
670            Runner.run_command(self, _run_future=_run_future),
671        )

Return coroutine object to run awaitable.

@contextlib.asynccontextmanager
async def prompt( self, prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, *, timeout: Optional[float] = None, normalize_newlines: bool = False) -> AsyncIterator[Prompt]:
673    @contextlib.asynccontextmanager
674    async def prompt(
675        self,
676        prompt: Union[str, list[str], re.Pattern[str], None] = None,
677        *,
678        timeout: Optional[float] = None,
679        normalize_newlines: bool = False,
680    ) -> AsyncIterator[Prompt]:
681        """Run command using the send/expect API.
682
683        This method should be called using `async with`. It returns a `Prompt`
684        object with send() and expect() methods.
685
686        You can optionally set a default `prompt`. This is used by `expect()`
687        when you don't provide another value.
688
689        Use the `timeout` parameter to set the default timeout for operations.
690
691        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
692        This conversion is done before matching with `expect()`. This option
693        does not affect strings sent with `send()`.
694        """
695        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
696
697        cli = None
698        try:
699            async with Runner(cmd) as run:
700                cli = Prompt(
701                    run,
702                    default_prompt=prompt,
703                    default_timeout=timeout,
704                    normalize_newlines=normalize_newlines,
705                )
706                yield cli
707                cli.close()
708        finally:
709            if cli is not None:
710                cli._finish_()  # pyright: ignore[reportPrivateUsage]

Run command using the send/expect API.

This method should be called using async with. It returns a Prompt object with send() and expect() methods.

You can optionally set a default prompt. This is used by expect() when you don't provide another value.

Use the timeout parameter to set the default timeout for operations.

Set normalize_newlines to True to convert incoming CR and CR-LF to LF. This conversion is done before matching with expect(). This option does not affect strings sent with send().

def __await__(self) -> Generator[Any, NoneType, ~_RT]:
712    def __await__(self) -> "Generator[Any, None, _RT]":
713        "Run process and return the standard output."
714        return self.coro().__await__()

Run process and return the standard output.

async def __aenter__(self) -> Runner:
716    async def __aenter__(self) -> Runner:
717        "Enter the async context manager."
718        return await context_aenter(self, Runner(self))

Enter the async context manager.

def __aiter__(self) -> AsyncIterator[str]:
729    def __aiter__(self) -> AsyncIterator[str]:
730        "Return async iterator to iterate over output lines."
731        return aiter_preflight(self)._readlines()

Return async iterator to iterate over output lines.

writable: Command[~_RT]
788    @property
789    def writable(self) -> "Command[_RT]":
790        "Set `writable` to True."
791        return self.set(_writable=True)

Set writable to True.

result: Command[Result]
793    @property
794    def result(self) -> "Command[shellous.Result]":
795        "Set `_return_result` and `exit_codes`."
796        return cast(
797            Command[shellous.Result],
798            self.set(_return_result=True, exit_codes=range(-255, 256)),
799        )

Set _return_result and exit_codes.

@dataclass(frozen=True)
class Options:
 94@dataclass(frozen=True)
 95class Options:  # pylint: disable=too-many-instance-attributes
 96    "Concrete class for per-command options."
 97
 98    path: Optional[str] = None
 99    "Optional search path to use instead of PATH environment variable."
100
101    env: Optional[EnvironmentDict] = field(default=None, repr=False)
102    "Additional environment variables for command."
103
104    inherit_env: bool = True
105    "True if subprocess should inherit the current environment variables."
106
107    input: _RedirectT = Redirect.DEFAULT
108    "Input object to bind to stdin."
109
110    input_close: bool = False
111    "True if input object should be closed after subprocess launch."
112
113    output: _RedirectT = Redirect.DEFAULT
114    "Output object to bind to stdout."
115
116    output_append: bool = False
117    "True if output object should be opened in append mode."
118
119    output_close: bool = False
120    "True if output object should be closed after subprocess launch."
121
122    error: _RedirectT = Redirect.DEFAULT
123    "Error object to bind to stderr."
124
125    error_append: bool = False
126    "True if error object should be opened in append mode."
127
128    error_close: bool = False
129    "True if error object should be closed after subprocess launch."
130
131    error_limit: Optional[int] = DEFAULT_ERROR_LIMIT
132    "Bytes of stderr to buffer in memory (`sh.BUFFER`). None means unlimited."
133
134    encoding: str = "utf-8"
135    "Specifies encoding of input/output."
136
137    _return_result: bool = False
138    "True if we should return `Result` object instead of the output text/bytes."
139
140    _catch_cancelled_error: bool = False
141    "True if we should raise `ResultError` after clean up from cancelled task."
142
143    exit_codes: Optional[Container[int]] = None
144    "Set of exit codes that do not raise a `ResultError`. None means {0}."
145
146    timeout: Optional[float] = None
147    "Timeout in seconds that we wait before cancelling the process."
148
149    cancel_timeout: float = 3.0
150    "Timeout in seconds that we wait for a cancelled process to terminate."
151
152    cancel_signal: Optional[signal.Signals] = signal.SIGTERM
153    "The signal sent to terminate a cancelled process."
154
155    alt_name: Optional[str] = None
156    "Alternate name for the command to use when logging."
157
158    pass_fds: Iterable[int] = ()
159    "File descriptors to pass to the command."
160
161    pass_fds_close: bool = False
162    "True if pass_fds should be closed after subprocess launch."
163
164    _writable: bool = False
165    "True if using process substitution in write mode."
166
167    _start_new_session: bool = False
168    "True if child process should start a new session with `setsid` call."
169
170    _preexec_fn: _PreexecFnT = None
171    "Function to call in child process after fork from parent."
172
173    pty: PtyAdapterOrBool = False
174    "True if child process should be controlled using a pseudo-terminal (pty)."
175
176    close_fds: bool = True
177    "True if child process should close all file descriptors."
178
179    audit_callback: _AuditFnT = None
180    "Function called to audit stages of process execution."
181
182    coerce_arg: _CoerceArgFnT = None
183    "Function called to coerce top level arguments."
184
185    def runtime_env(self) -> Optional[dict[str, str]]:
186        "@private Return our `env` merged with the global environment."
187        if self.inherit_env:
188            if not self.env:
189                return None
190            return os.environ | self.env
191
192        if self.env:
193            return dict(self.env)  # make copy of dict
194        return {}
195
196    def set_stdin(self, input_: Any, close: bool) -> "Options":
197        "@private Return new options with `input` configured."
198        if input_ is None:
199            raise TypeError("invalid stdin")
200
201        if input_ == Redirect.STDOUT:
202            raise ValueError("STDOUT is only supported by stderr")
203
204        return dataclasses.replace(
205            self,
206            input=input_,
207            input_close=close,
208        )
209
210    def set_stdout(self, output: Any, append: bool, close: bool) -> "Options":
211        "@private Return new options with `output` configured."
212        if output is None:
213            raise TypeError("invalid stdout")
214
215        if output == Redirect.STDOUT:
216            raise ValueError("STDOUT is only supported by stderr")
217
218        return dataclasses.replace(
219            self,
220            output=output,
221            output_append=append,
222            output_close=close,
223        )
224
225    def set_stderr(self, error: Any, append: bool, close: bool) -> "Options":
226        "@private Return new options with `error` configured."
227        if error is None:
228            raise TypeError("invalid stderr")
229
230        return dataclasses.replace(
231            self,
232            error=error,
233            error_append=append,
234            error_close=close,
235        )
236
237    def add_env(self, updates: dict[str, Any]) -> "Options":
238        "@private Return new options with augmented environment."
239        new_env = EnvironmentDict(self.env, updates)
240        return dataclasses.replace(self, env=new_env)
241
242    def set(self, kwds: dict[str, Any]) -> "Options":
243        """@private Return new options with given properties updated.
244
245        See `Command.set` for option reference.
246        """
247        kwds = {key: value for key, value in kwds.items() if value is not _UNSET}
248        if "env" in kwds:
249            # The "env" property is stored as an `EnvironmentDict`.
250            new_env = kwds["env"]
251            if new_env:
252                kwds["env"] = EnvironmentDict(None, new_env)
253            else:
254                kwds["env"] = None
255        return dataclasses.replace(self, **kwds)
256
257    @overload
258    def which(self, name: bytes) -> Optional[bytes]:
259        "@private Find the command with the given name and return its path."
260
261    @overload
262    def which(
263        self, name: Union[str, os.PathLike[Any]]
264    ) -> Optional[Union[str, os.PathLike[Any]]]:
265        "@private Find the command with the given name and return its path."
266
267    def which(
268        self, name: Union[str, bytes, os.PathLike[Any]]
269    ) -> Optional[Union[str, bytes, os.PathLike[Any]]]:
270        "@private Find the command with the given name and return its path."
271        return shutil.which(name, path=self.path)

Concrete class for per-command options.

Options( path: Optional[str] = None, env: Optional[shellous.util.EnvironmentDict] = None, inherit_env: bool = True, input: Any = <Redirect.DEFAULT: -20>, input_close: bool = False, output: Any = <Redirect.DEFAULT: -20>, output_append: bool = False, output_close: bool = False, error: Any = <Redirect.DEFAULT: -20>, error_append: bool = False, error_close: bool = False, error_limit: Optional[int] = 1024, encoding: str = 'utf-8', _return_result: bool = False, _catch_cancelled_error: bool = False, exit_codes: Optional[Container[int]] = None, timeout: Optional[float] = None, cancel_timeout: float = 3.0, cancel_signal: Optional[signal.Signals] = <Signals.SIGTERM: 15>, alt_name: Optional[str] = None, pass_fds: Iterable[int] = (), pass_fds_close: bool = False, _writable: bool = False, _start_new_session: bool = False, _preexec_fn: Optional[Callable[[], NoneType]] = None, pty: Union[Callable[[int], NoneType], bool] = False, close_fds: bool = True, audit_callback: Optional[Callable[[str, AuditEventInfo], NoneType]] = None, coerce_arg: Optional[Callable[[Any], Any]] = None)
path: Optional[str] = None

Optional search path to use instead of PATH environment variable.

env: Optional[shellous.util.EnvironmentDict] = None

Additional environment variables for command.

inherit_env: bool = True

True if subprocess should inherit the current environment variables.

input: Any = <Redirect.DEFAULT: -20>

Input object to bind to stdin.

input_close: bool = False

True if input object should be closed after subprocess launch.

output: Any = <Redirect.DEFAULT: -20>

Output object to bind to stdout.

output_append: bool = False

True if output object should be opened in append mode.

output_close: bool = False

True if output object should be closed after subprocess launch.

error: Any = <Redirect.DEFAULT: -20>

Error object to bind to stderr.

error_append: bool = False

True if error object should be opened in append mode.

error_close: bool = False

True if error object should be closed after subprocess launch.

error_limit: Optional[int] = 1024

Bytes of stderr to buffer in memory (sh.BUFFER). None means unlimited.

encoding: str = 'utf-8'

Specifies encoding of input/output.

exit_codes: Optional[Container[int]] = None

Set of exit codes that do not raise a ResultError. None means {0}.

timeout: Optional[float] = None

Timeout in seconds that we wait before cancelling the process.

cancel_timeout: float = 3.0

Timeout in seconds that we wait for a cancelled process to terminate.

cancel_signal: Optional[signal.Signals] = <Signals.SIGTERM: 15>

The signal sent to terminate a cancelled process.

alt_name: Optional[str] = None

Alternate name for the command to use when logging.

pass_fds: Iterable[int] = ()

File descriptors to pass to the command.

pass_fds_close: bool = False

True if pass_fds should be closed after subprocess launch.

pty: Union[Callable[[int], NoneType], bool] = False

True if child process should be controlled using a pseudo-terminal (pty).

close_fds: bool = True

True if child process should close all file descriptors.

audit_callback: Optional[Callable[[str, AuditEventInfo], NoneType]] = None

Function called to audit stages of process execution.

coerce_arg: Optional[Callable[[Any], Any]] = None

Function called to coerce top level arguments.

@dataclass(frozen=True)
class Pipeline(typing.Generic[~_RT]):
 40@dataclass(frozen=True)
 41class Pipeline(Generic[_RT]):
 42    "A Pipeline is a sequence of commands."
 43
 44    commands: tuple[shellous.Command[_RT], ...] = ()
 45
 46    @staticmethod
 47    def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]":
 48        "Create a new Pipeline."
 49        return Pipeline(commands)
 50
 51    def __post_init__(self) -> None:
 52        "Validate the pipeline."
 53        if len(self.commands) == 0:
 54            raise ValueError("Pipeline must include at least one command")
 55
 56    @property
 57    def name(self) -> str:
 58        "Return the name of the pipeline."
 59        return "|".join(cmd.name for cmd in self.commands)
 60
 61    @property
 62    def options(self) -> shellous.Options:
 63        "Return the last command's options."
 64        return self.commands[-1].options
 65
 66    def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]":
 67        "Set stdin on the first command of the pipeline."
 68        new_first = self.commands[0].stdin(input_, close=close)
 69        new_commands = (new_first,) + self.commands[1:]
 70        return dataclasses.replace(self, commands=new_commands)
 71
 72    def stdout(
 73        self,
 74        output: Any,
 75        *,
 76        append: bool = False,
 77        close: bool = False,
 78    ) -> "Pipeline[_RT]":
 79        "Set stdout on the last command of the pipeline."
 80        new_last = self.commands[-1].stdout(output, append=append, close=close)
 81        new_commands = self.commands[0:-1] + (new_last,)
 82        return dataclasses.replace(self, commands=new_commands)
 83
 84    def stderr(
 85        self,
 86        error: Any,
 87        *,
 88        append: bool = False,
 89        close: bool = False,
 90    ) -> "Pipeline[_RT]":
 91        "Set stderr on the last command of the pipeline."
 92        new_last = self.commands[-1].stderr(error, append=append, close=close)
 93        new_commands = self.commands[0:-1] + (new_last,)
 94        return dataclasses.replace(self, commands=new_commands)
 95
 96    def _set(self, **kwds: Any):
 97        "Set options on last command of the pipeline."
 98        new_last = self.commands[-1].set(**kwds)
 99        new_commands = self.commands[0:-1] + (new_last,)
100        return dataclasses.replace(self, commands=new_commands)
101
102    def coro(self) -> Coroutine[Any, Any, _RT]:
103        "Return coroutine object for pipeline."
104        return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self))
105
106    @contextlib.asynccontextmanager
107    async def prompt(
108        self,
109        prompt: Union[str, list[str], re.Pattern[str], None] = None,
110        *,
111        timeout: Optional[float] = None,
112        normalize_newlines: bool = False,
113    ) -> AsyncIterator[Prompt]:
114        """Run pipeline using the send/expect API.
115
116        This method should be called using `async with`. It returns a `Prompt`
117        object with send() and expect() methods.
118
119        You can optionally set a default `prompt`. This is used by `expect()`
120        when you don't provide another value.
121
122        Use the `timeout` parameter to set the default timeout for operations.
123
124        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
125        This conversion is done before matching with `expect()`. This option
126        does not affect strings sent with `send()`.
127        """
128        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
129
130        cli = None
131        try:
132            async with PipeRunner(cmd, capturing=True) as run:
133                cli = Prompt(
134                    run,
135                    default_prompt=prompt,
136                    default_timeout=timeout,
137                    normalize_newlines=normalize_newlines,
138                )
139                yield cli
140                cli.close()
141        finally:
142            if cli is not None:
143                cli._finish_()  # pyright: ignore[reportPrivateUsage]
144
145    def _add(self, item: Union["shellous.Command[Any]", "Pipeline[Any]"]):
146        if isinstance(item, shellous.Command):
147            return dataclasses.replace(self, commands=(*self.commands, item))
148        return dataclasses.replace(
149            self,
150            commands=self.commands + item.commands,
151        )
152
153    def __len__(self) -> int:
154        "Return number of commands in pipe."
155        return len(self.commands)
156
157    def __getitem__(self, key: int) -> shellous.Command[Any]:
158        "Return specified command by index."
159        return self.commands[key]
160
161    def __call__(self, *args: Any) -> "Pipeline[_RT]":
162        if args:
163            raise TypeError("Calling pipeline with 1 or more arguments.")
164        return self
165
166    @overload
167    def __or__(
168        self, rhs: "Union[shellous.Command[shellous.Result], Pipeline[shellous.Result]]"
169    ) -> "Pipeline[shellous.Result]": ...  # pragma: no cover
170
171    @overload
172    def __or__(
173        self, rhs: "Union[shellous.Command[str], Pipeline[str]]"
174    ) -> "Pipeline[str]": ...  # pragma: no cover
175
176    @overload
177    def __or__(self, rhs: StdoutType) -> "Pipeline[_RT]": ...  # pragma: no cover
178
179    def __or__(self, rhs: Any) -> "Pipeline[Any]":
180        if isinstance(rhs, (shellous.Command, Pipeline)):
181            return self._add(rhs)  # pyright: ignore[reportUnknownArgumentType]
182        if isinstance(rhs, STDOUT_TYPES):
183            return self.stdout(rhs)
184        if isinstance(rhs, (str, bytes)):
185            raise TypeError(
186                f"{type(rhs)!r} unsupported for | output (Use 'pathlib.Path')"
187            )
188        return NotImplemented
189
190    def __ror__(self, lhs: StdinType) -> "Pipeline[_RT]":
191        if isinstance(lhs, STDIN_TYPES):  # pyright: ignore[reportUnnecessaryIsInstance]
192            return self.stdin(lhs)
193        return NotImplemented
194
195    def __rshift__(self, rhs: StdoutType) -> "Pipeline[_RT]":
196        if isinstance(
197            rhs, STDOUT_TYPES
198        ):  # pyright: ignore[reportUnnecessaryIsInstance]
199            return self.stdout(rhs, append=True)
200        if isinstance(rhs, (str, bytes)):
201            raise TypeError(
202                f"{type(rhs)!r} unsupported for >> output (Use 'pathlib.Path')"
203            )
204        return NotImplemented
205
206    @property
207    def writable(self) -> "Pipeline[_RT]":
208        "Set writable=True option on last command of pipeline."
209        return self._set(_writable=True)
210
211    @property
212    def result(self) -> "Pipeline[shellous.Result]":
213        "Set `_return_result` and `exit_codes`."
214        return cast(
215            Pipeline[shellous.Result],
216            self._set(_return_result=True, exit_codes=range(-255, 256)),
217        )
218
219    def __await__(self) -> "Generator[Any, None, _RT]":
220        return self.coro().__await__()  # FP pylint: disable=no-member
221
222    async def __aenter__(self) -> PipeRunner:
223        "Enter the async context manager."
224        return await context_aenter(self, PipeRunner(self, capturing=True))
225
226    async def __aexit__(
227        self,
228        exc_type: Optional[type[BaseException]],
229        exc_value: Optional[BaseException],
230        exc_tb: Optional[TracebackType],
231    ) -> Optional[bool]:
232        "Exit the async context manager."
233        return await context_aexit(self, exc_type, exc_value, exc_tb)
234
235    def __aiter__(self) -> AsyncIterator[str]:
236        "Return async iterator to iterate over output lines."
237        return aiter_preflight(self)._readlines()
238
239    async def _readlines(self):
240        "Async generator to iterate over lines."
241        async with PipeRunner(self, capturing=True) as run:
242            async for line in run:
243                yield line

A Pipeline is a sequence of commands.

Pipeline(commands: tuple[Command[~_RT], ...] = ())
commands: tuple[Command[~_RT], ...] = ()
@staticmethod
def create( *commands: Command[~_T]) -> Pipeline[~_T]:
46    @staticmethod
47    def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]":
48        "Create a new Pipeline."
49        return Pipeline(commands)

Create a new Pipeline.

name: str
56    @property
57    def name(self) -> str:
58        "Return the name of the pipeline."
59        return "|".join(cmd.name for cmd in self.commands)

Return the name of the pipeline.

options: Options
61    @property
62    def options(self) -> shellous.Options:
63        "Return the last command's options."
64        return self.commands[-1].options

Return the last command's options.

def stdin( self, input_: Any, *, close: bool = False) -> Pipeline[~_RT]:
66    def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]":
67        "Set stdin on the first command of the pipeline."
68        new_first = self.commands[0].stdin(input_, close=close)
69        new_commands = (new_first,) + self.commands[1:]
70        return dataclasses.replace(self, commands=new_commands)

Set stdin on the first command of the pipeline.

def stdout( self, output: Any, *, append: bool = False, close: bool = False) -> Pipeline[~_RT]:
72    def stdout(
73        self,
74        output: Any,
75        *,
76        append: bool = False,
77        close: bool = False,
78    ) -> "Pipeline[_RT]":
79        "Set stdout on the last command of the pipeline."
80        new_last = self.commands[-1].stdout(output, append=append, close=close)
81        new_commands = self.commands[0:-1] + (new_last,)
82        return dataclasses.replace(self, commands=new_commands)

Set stdout on the last command of the pipeline.

def stderr( self, error: Any, *, append: bool = False, close: bool = False) -> Pipeline[~_RT]:
84    def stderr(
85        self,
86        error: Any,
87        *,
88        append: bool = False,
89        close: bool = False,
90    ) -> "Pipeline[_RT]":
91        "Set stderr on the last command of the pipeline."
92        new_last = self.commands[-1].stderr(error, append=append, close=close)
93        new_commands = self.commands[0:-1] + (new_last,)
94        return dataclasses.replace(self, commands=new_commands)

Set stderr on the last command of the pipeline.

def coro(self) -> Coroutine[Any, Any, ~_RT]:
102    def coro(self) -> Coroutine[Any, Any, _RT]:
103        "Return coroutine object for pipeline."
104        return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self))

Return coroutine object for pipeline.

@contextlib.asynccontextmanager
async def prompt( self, prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, *, timeout: Optional[float] = None, normalize_newlines: bool = False) -> AsyncIterator[Prompt]:
106    @contextlib.asynccontextmanager
107    async def prompt(
108        self,
109        prompt: Union[str, list[str], re.Pattern[str], None] = None,
110        *,
111        timeout: Optional[float] = None,
112        normalize_newlines: bool = False,
113    ) -> AsyncIterator[Prompt]:
114        """Run pipeline using the send/expect API.
115
116        This method should be called using `async with`. It returns a `Prompt`
117        object with send() and expect() methods.
118
119        You can optionally set a default `prompt`. This is used by `expect()`
120        when you don't provide another value.
121
122        Use the `timeout` parameter to set the default timeout for operations.
123
124        Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF.
125        This conversion is done before matching with `expect()`. This option
126        does not affect strings sent with `send()`.
127        """
128        cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE)
129
130        cli = None
131        try:
132            async with PipeRunner(cmd, capturing=True) as run:
133                cli = Prompt(
134                    run,
135                    default_prompt=prompt,
136                    default_timeout=timeout,
137                    normalize_newlines=normalize_newlines,
138                )
139                yield cli
140                cli.close()
141        finally:
142            if cli is not None:
143                cli._finish_()  # pyright: ignore[reportPrivateUsage]

Run pipeline using the send/expect API.

This method should be called using async with. It returns a Prompt object with send() and expect() methods.

You can optionally set a default prompt. This is used by expect() when you don't provide another value.

Use the timeout parameter to set the default timeout for operations.

Set normalize_newlines to True to convert incoming CR and CR-LF to LF. This conversion is done before matching with expect(). This option does not affect strings sent with send().

def __len__(self) -> int:
153    def __len__(self) -> int:
154        "Return number of commands in pipe."
155        return len(self.commands)

Return number of commands in pipe.

def __getitem__(self, key: int) -> Command[typing.Any]:
157    def __getitem__(self, key: int) -> shellous.Command[Any]:
158        "Return specified command by index."
159        return self.commands[key]

Return specified command by index.

writable: Pipeline[~_RT]
206    @property
207    def writable(self) -> "Pipeline[_RT]":
208        "Set writable=True option on last command of pipeline."
209        return self._set(_writable=True)

Set writable=True option on last command of pipeline.

result: Pipeline[Result]
211    @property
212    def result(self) -> "Pipeline[shellous.Result]":
213        "Set `_return_result` and `exit_codes`."
214        return cast(
215            Pipeline[shellous.Result],
216            self._set(_return_result=True, exit_codes=range(-255, 256)),
217        )

Set _return_result and exit_codes.

def __await__(self) -> Generator[Any, NoneType, ~_RT]:
219    def __await__(self) -> "Generator[Any, None, _RT]":
220        return self.coro().__await__()  # FP pylint: disable=no-member
async def __aenter__(self) -> PipeRunner:
222    async def __aenter__(self) -> PipeRunner:
223        "Enter the async context manager."
224        return await context_aenter(self, PipeRunner(self, capturing=True))

Enter the async context manager.

def __aiter__(self) -> AsyncIterator[str]:
235    def __aiter__(self) -> AsyncIterator[str]:
236        "Return async iterator to iterate over output lines."
237        return aiter_preflight(self)._readlines()

Return async iterator to iterate over output lines.

class Prompt:
 24class Prompt:
 25    """Utility class to help with an interactive prompt session.
 26
 27    When you are controlling a co-process you will usually "send" some
 28    text to it, and then "expect" a response. The "expect" operation can use
 29    a regular expression to match different types of responses.
 30
 31    Create a new `Prompt` instance using the `prompt()` API.
 32
 33    ```
 34    # In this example, we are using a default prompt of "??? ".
 35    # Setting the PS1 environment variable tells the shell to use this as
 36    # the shell prompt.
 37    cmd = sh("sh").env(PS1="??? ").set(pty=True)
 38
 39    async with cmd.prompt("??? ", timeout=3.0) as cli:
 40        # Turn off terminal echo.
 41        cli.echo = False
 42
 43        # Wait for greeting and initial prompt. Calling expect() with no
 44        # argument will match the default prompt "??? ".
 45        greeting, _ = await cli.expect()
 46
 47        # Send a command and wait for the response.
 48        await cli.send("echo hello")
 49        answer, _ = await cli.expect()
 50        assert answer == "hello\\r\\n"
 51    ```
 52    """
 53
 54    _runner: Union[Runner, PipeRunner]
 55    _encoding: str
 56    _default_prompt: Optional[re.Pattern[str]]
 57    _default_timeout: Optional[float]
 58    _normalize_newlines: bool
 59    _chunk_size: int
 60    _decoder: codecs.IncrementalDecoder
 61    _pending: str = ""
 62    _at_eof: bool = False
 63    _result: Optional[Result] = None
 64
 65    def __init__(
 66        self,
 67        runner: Runner,
 68        *,
 69        default_prompt: Union[str, list[str], re.Pattern[str], None] = None,
 70        default_timeout: Optional[float] = None,
 71        normalize_newlines: bool = False,
 72        _chunk_size: int = _DEFAULT_CHUNK_SIZE,
 73    ):
 74        assert runner.stdin is not None
 75        assert runner.stdout is not None
 76
 77        if isinstance(default_prompt, (str, list)):
 78            default_prompt = _regex_compile_exact(default_prompt)
 79
 80        self._runner = runner
 81        self._encoding = runner.options.encoding
 82        self._default_prompt = default_prompt
 83        self._default_timeout = default_timeout
 84        self._normalize_newlines = normalize_newlines
 85        self._chunk_size = _chunk_size
 86        self._decoder = _make_decoder(self._encoding, normalize_newlines)
 87
 88        if LOG_PROMPT:
 89            LOGGER.info(
 90                "Prompt[pid=%s]: --- BEGIN --- name=%r",
 91                self._runner.pid,
 92                self._runner.name,
 93            )
 94
 95    @property
 96    def at_eof(self) -> bool:
 97        "True if the prompt reader is at the end of file."
 98        return self._at_eof and not self._pending
 99
100    @property
101    def pending(self) -> str:
102        """Characters that still remain in the `pending` buffer."""
103        return self._pending
104
105    @property
106    def echo(self) -> bool:
107        """True if PTY is in echo mode.
108
109        If the runner is not using a PTY, always return False.
110
111        When the process is using a PTY, you can enable/disable terminal echo
112        mode by setting the `echo` property to True/False.
113        """
114        if not isinstance(self._runner, Runner) or self._runner.pty_fd is None:
115            return False
116        return pty_util.get_term_echo(self._runner.pty_fd)
117
118    @echo.setter
119    def echo(self, value: bool) -> None:
120        """Set echo mode for the PTY.
121
122        Raise an error if the runner is not using a PTY.
123        """
124        if not isinstance(self._runner, Runner) or self._runner.pty_fd is None:
125            raise RuntimeError("Cannot set echo mode. Not running in a PTY.")
126        pty_util.set_term_echo(self._runner.pty_fd, value)
127
128    @property
129    def result(self) -> Result:
130        """The `Result` of the co-process when it exited.
131
132        You can only retrieve this property *after* the `async with` block
133        exits where the co-process is running:
134
135        ```
136        async with cmd.prompt() as cli:
137            ...
138        # Access `cli.result` here.
139        ```
140
141        Inside the `async with` block, raise a RuntimeError because the process
142        has not exited yet.
143        """
144        if self._result is None:
145            raise RuntimeError("Prompt process is still running")
146        return self._result
147
148    async def send(
149        self,
150        text: Union[bytes, str],
151        *,
152        end: Optional[str] = _DEFAULT_LINE_END,
153        no_echo: bool = False,
154        timeout: Optional[float] = None,
155    ) -> None:
156        """Write some `text` to co-process standard input and append a newline.
157
158        The `text` parameter is the string that you want to write. Use a `bytes`
159        object to send some raw bytes (e.g. to the terminal driver).
160
161        The default line ending is "\n". Use the `end` parameter to change the
162        line ending. To omit the line ending entirely, specify `end=None`.
163
164        Set `no_echo` to True when you are writing a password. When you set
165        `no_echo` to True, the `send` method will wait for terminal
166        echo mode to be disabled before writing the text. If shellous logging
167        is enabled, the sensitive information will **not** be logged.
168
169        Use the `timeout` parameter to override the default timeout. Normally,
170        data is delivered immediately and this method returns making a trip
171        through the event loop. However, there are situations where the
172        co-process input pipeline fills and we have to wait for it to
173        drain.
174
175        When this method needs to wait for the co-process input pipe to drain,
176        this method will concurrently read from the output pipe into the pending
177        buffer. This is necessary to avoid a deadlock situation where everything
178        stops because neither process can make progress.
179        """
180        if end is None:
181            end = ""
182
183        if no_echo:
184            await self._wait_no_echo()
185
186        if isinstance(text, bytes):
187            data = text + encode_bytes(end, self._encoding)
188        else:
189            data = encode_bytes(text + end, self._encoding)
190
191        stdin = self._runner.stdin
192        assert stdin is not None
193        stdin.write(data)
194
195        if LOG_PROMPT:
196            self._log_send(data, no_echo)
197
198        # Drain our write to stdin.
199        cancelled, ex = await harvest_results(
200            self._drain(stdin),
201            timeout=timeout or self._default_timeout,
202        )
203        if cancelled:
204            raise asyncio.CancelledError()
205        if isinstance(ex[0], Exception):
206            raise ex[0]
207
208    async def expect(
209        self,
210        prompt: Union[str, list[str], re.Pattern[str], None] = None,
211        *,
212        timeout: Optional[float] = None,
213    ) -> tuple[str, re.Match[str]]:
214        """Read from co-process standard output until `prompt` pattern matches.
215
216        Returns a 2-tuple of (output, match) where `output` is the text *before*
217        the prompt pattern and `match` is a `re.Match` object for the prompt
218        text itself.
219
220        If `expect` reaches EOF or a timeout occurs before the prompt pattern
221        matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread
222        characters will be available in the `pending` buffer.
223
224        After this method returns, there may still be characters read from stdout
225        that remain in the `pending` buffer. These are the characters *after* the
226        prompt pattern. You can examine these using the `pending` property.
227        Subsequent calls to expect will examine this buffer first before
228        reading new data from the output pipe.
229
230        By default, this method will use the default `timeout` for the `Prompt`
231        object if one is set. You can use the `timeout` parameter to specify
232        a custom timeout in seconds.
233
234        Prompt Patterns
235        ~~~~~~~~~~~~~~~
236
237        The `expect()` method supports matching fixed strings and regular
238        expressions. The type of the `prompt` parameter determines the type of
239        search.
240
241        No argument or `None`:
242            Use the default prompt pattern. If there is no default prompt,
243            raise a TypeError.
244        `str`:
245            Match this string exactly.
246        `list[str]`:
247            Match one of these strings exactly.
248        `re.Pattern[str]`:
249            Match the given regular expression.
250
251        When matching a regular expression, only a single Pattern object is
252        supported. To match multiple regular expressions, combine them into a
253        single regular expression using *alternation* syntax (|).
254
255        The `expect()` method returns a 2-tuple (output, match). The `match`
256        is the result of the regular expression search (re.Match). If you
257        specify your prompt as a string or list of strings, it is still compiled
258        into a regular expression that produces an `re.Match` object. You can
259        examine the `match` object to determine the prompt value found.
260
261        This method conducts a regular expression search on streaming data. The
262        `expect()` method reads a new chunk of data into the `pending` buffer
263        and then searches it. You must be careful in writing a regular
264        expression so that the search is agnostic to how the incoming chunks of
265        data arrive. Consider including a boundary condition at the end of your pattern.
266        For example, instead of searching for the open-ended pattern`[a-z]+`,
267        search for the pattern `[a-z]+[^a-z]` which ends with a non-letter
268        character.
269
270        Examples
271        ~~~~~~~~
272
273        Expect an exact string:
274
275        ```
276        await cli.expect("ftp> ")
277        await cli.send(command)
278        response, _ = await cli.expect("ftp> ")
279        ```
280
281        Expect a choice of strings:
282
283        ```
284        _, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
285        match m[0]:
286            case "Login: ":
287                await cli.send(login)
288            case "Password: ":
289                await cli.send(password)
290            case "ftp> ":
291                await cli.send(command)
292        ```
293
294        Read until EOF:
295
296        ```
297        data = await cli.read_all()
298        ```
299
300        Read the contents of the `pending` buffer without filling the buffer
301        with any new data from the co-process pipe:
302
303        ```
304        data = await cli.read_pending()
305        ```
306        """
307        if prompt is None:
308            prompt = self._default_prompt
309            if prompt is None:
310                raise TypeError("prompt is required when default prompt is not set")
311        elif isinstance(prompt, (str, list)):
312            prompt = _regex_compile_exact(prompt)
313
314        if self._pending:
315            result = self._search_pending(prompt)
316            if result is not None:
317                return result
318
319        if self._at_eof:
320            raise EOFError("Prompt has reached EOF")
321
322        cancelled, (result,) = await harvest_results(
323            self._read_to_pattern(prompt),
324            timeout=timeout or self._default_timeout,
325        )
326        if cancelled:
327            raise asyncio.CancelledError()
328        if isinstance(result, Exception):
329            raise result
330
331        return result
332
333    async def read_all(
334        self,
335        *,
336        timeout: Optional[float] = None,
337    ) -> str:
338        """Read from co-process output until EOF.
339
340        If we are already at EOF, return "".
341        """
342        if not self._at_eof:
343            cancelled, (result,) = await harvest_results(
344                self._read_some(tag="@read_all"),
345                timeout=timeout or self._default_timeout,
346            )
347            if cancelled:
348                raise asyncio.CancelledError()
349            if isinstance(result, Exception):
350                raise result
351
352        return self.read_pending()
353
354    def read_pending(self) -> str:
355        """Read the contents of the pending buffer and empty it.
356
357        This method does not fill the pending buffer with any new data from the
358        co-process output pipe. If the pending buffer is already empty, return
359        "".
360        """
361        result = self._pending
362        self._pending = ""
363        return result
364
365    async def command(
366        self,
367        text: str,
368        *,
369        end: str = _DEFAULT_LINE_END,
370        no_echo: bool = False,
371        prompt: Union[str, re.Pattern[str], None] = None,
372        timeout: Optional[float] = None,
373        allow_eof: bool = False,
374    ) -> str:
375        """Send some text to the co-process and return the response.
376
377        This method is equivalent to calling send() following by expect().
378        However, the return value is simpler; `command()` does not return the
379        `re.Match` object.
380
381        If you call this method *after* the co-process output pipe has already
382        returned EOF, raise `EOFError`.
383
384        If `allow_eof` is True, this method will read data up to EOF instead of
385        raising an EOFError.
386        """
387        if self._at_eof:
388            raise EOFError("Prompt has reached EOF")
389
390        await self.send(text, end=end, no_echo=no_echo, timeout=timeout)
391        try:
392            result, _ = await self.expect(prompt, timeout=timeout)
393        except EOFError:
394            if not allow_eof:
395                raise
396            result = self.read_pending()
397
398        return result
399
400    def close(self) -> None:
401        "Close stdin to end the prompt session."
402        stdin = self._runner.stdin
403        assert stdin is not None
404
405        if isinstance(self._runner, Runner) and self._runner.pty_eof:
406            # Write EOF twice; once to end the current line, and the second
407            # time to signal the end.
408            stdin.write(self._runner.pty_eof * 2)
409            if LOG_PROMPT:
410                LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid)
411
412        else:
413            stdin.close()
414            if LOG_PROMPT:
415                LOGGER.info("Prompt[pid=%s] close", self._runner.pid)
416
417    def _finish_(self) -> None:
418        "Internal method called when process exits to fetch the `Result` and cache it."
419        self._result = self._runner.result(check=False)
420        if LOG_PROMPT:
421            LOGGER.info(
422                "Prompt[pid=%s]: --- END --- result=%r",
423                self._runner.pid,
424                self._result,
425            )
426
427    async def _read_to_pattern(
428        self,
429        pattern: re.Pattern[str],
430    ) -> tuple[str, re.Match[str]]:
431        """Read text up to part that matches the pattern.
432
433        Returns 2-tuple with (text, match).
434        """
435        stdout = self._runner.stdout
436        assert stdout is not None
437        assert self._chunk_size > 0
438
439        while not self._at_eof:
440            _prev_len = len(self._pending)  # debug check
441
442            try:
443                # Read chunk and check for EOF.
444                chunk = await stdout.read(self._chunk_size)
445                if not chunk:
446                    self._at_eof = True
447            except asyncio.CancelledError:
448                if LOG_PROMPT:
449                    LOGGER.info(
450                        "Prompt[pid=%s] receive cancelled: pending=%r",
451                        self._runner.pid,
452                        self._pending,
453                    )
454                raise
455
456            if LOG_PROMPT:
457                self._log_receive(chunk)
458
459            # Decode eligible bytes into our buffer.
460            data = self._decoder.decode(chunk, final=self._at_eof)
461            if not data and not self._at_eof:
462                continue
463            self._pending += data
464
465            result = self._search_pending(pattern)
466            if result is not None:
467                return result
468
469            assert self._at_eof or len(self._pending) > _prev_len  # debug check
470
471        raise EOFError("Prompt has reached EOF")
472
473    def _search_pending(
474        self,
475        pattern: re.Pattern[str],
476    ) -> Optional[tuple[str, re.Match[str]]]:
477        """Search our `pending` buffer for the pattern.
478
479        If we find a match, we return the data up to the portion that matched
480        and leave the trailing data in the `pending` buffer. This method returns
481        (result, match) or None if there is no match.
482        """
483        found = pattern.search(self._pending)
484        if found:
485            result = self._pending[0 : found.start(0)]
486            self._pending = self._pending[found.end(0) :]
487            if LOG_PROMPT:
488                LOGGER.info(
489                    "Prompt[pid=%s] found: %r [%s CHARS PENDING]",
490                    self._runner.pid,
491                    found,
492                    len(self._pending),
493                )
494            return (result, found)
495
496        return None
497
498    async def _drain(self, stream: asyncio.StreamWriter) -> None:
499        "Drain stream while reading into buffer concurrently."
500        read_task = asyncio.create_task(
501            self._read_some(tag="@drain", concurrent_cancel=True)
502        )
503        try:
504            await stream.drain()
505
506        finally:
507            if not read_task.done():
508                read_task.cancel()
509                await read_task
510
511    async def _read_some(
512        self,
513        *,
514        tag: str = "",
515        concurrent_cancel: bool = False,
516    ) -> None:
517        "Read into `pending` buffer until cancelled or EOF."
518        stdout = self._runner.stdout
519        assert stdout is not None
520        assert self._chunk_size > 0
521
522        while not self._at_eof:
523            # Yield time to other tasks; read() doesn't yield as long as there
524            # is data to read. We need to provide a cancel point when this
525            # method is called during `drain`.
526            if concurrent_cancel:
527                await asyncio.sleep(0)
528
529            # Read chunk and check for EOF.
530            chunk = await stdout.read(self._chunk_size)
531            if not chunk:
532                self._at_eof = True
533
534            if LOG_PROMPT:
535                self._log_receive(chunk, tag)
536
537            # Decode eligible bytes into our buffer.
538            data = self._decoder.decode(chunk, final=self._at_eof)
539            self._pending += data
540
541    async def _wait_no_echo(self):
542        "Wait for terminal echo mode to be disabled."
543        if LOG_PROMPT:
544            LOGGER.info("Prompt[pid=%s] wait: no_echo", self._runner.pid)
545
546        for _ in range(4 * 30):
547            if not self.echo:
548                break
549            await asyncio.sleep(0.25)
550        else:
551            raise RuntimeError("Timed out: Terminal echo mode remains enabled.")
552
553    def _log_send(self, data: bytes, no_echo: bool):
554        "Log data as it is being sent."
555        pid = self._runner.pid
556
557        if no_echo:
558            LOGGER.info("Prompt[pid=%s] send: [[HIDDEN]]", pid)
559        else:
560            data_len = len(data)
561            if data_len > _LOG_LIMIT:
562                LOGGER.info(
563                    "Prompt[pid=%s] send: [%d B] %r...%r",
564                    pid,
565                    data_len,
566                    data[: _LOG_LIMIT - _LOG_LIMIT_END],
567                    data[-_LOG_LIMIT_END:],
568                )
569            else:
570                LOGGER.info(
571                    "Prompt[pid=%s] send: [%d B] %r",
572                    pid,
573                    data_len,
574                    data,
575                )
576
577    def _log_receive(self, data: bytes, tag: str = ""):
578        "Log data as it is being received."
579        pid = self._runner.pid
580        data_len = len(data)
581
582        if data_len > _LOG_LIMIT:
583            LOGGER.info(
584                "Prompt[pid=%s] receive%s: [%d B] %r...%r",
585                pid,
586                tag,
587                data_len,
588                data[: _LOG_LIMIT - _LOG_LIMIT_END],
589                data[-_LOG_LIMIT_END:],
590            )
591        else:
592            LOGGER.info(
593                "Prompt[pid=%s] receive%s: [%d B] %r",
594                pid,
595                tag,
596                data_len,
597                data,
598            )

Utility class to help with an interactive prompt session.

When you are controlling a co-process you will usually "send" some text to it, and then "expect" a response. The "expect" operation can use a regular expression to match different types of responses.

Create a new Prompt instance using the prompt() API.

# In this example, we are using a default prompt of "??? ".
# Setting the PS1 environment variable tells the shell to use this as
# the shell prompt.
cmd = sh("sh").env(PS1="??? ").set(pty=True)

async with cmd.prompt("??? ", timeout=3.0) as cli:
    # Turn off terminal echo.
    cli.echo = False

    # Wait for greeting and initial prompt. Calling expect() with no
    # argument will match the default prompt "??? ".
    greeting, _ = await cli.expect()

    # Send a command and wait for the response.
    await cli.send("echo hello")
    answer, _ = await cli.expect()
    assert answer == "hello\r\n"
Prompt( runner: Runner, *, default_prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, default_timeout: Optional[float] = None, normalize_newlines: bool = False, _chunk_size: int = 16384)
65    def __init__(
66        self,
67        runner: Runner,
68        *,
69        default_prompt: Union[str, list[str], re.Pattern[str], None] = None,
70        default_timeout: Optional[float] = None,
71        normalize_newlines: bool = False,
72        _chunk_size: int = _DEFAULT_CHUNK_SIZE,
73    ):
74        assert runner.stdin is not None
75        assert runner.stdout is not None
76
77        if isinstance(default_prompt, (str, list)):
78            default_prompt = _regex_compile_exact(default_prompt)
79
80        self._runner = runner
81        self._encoding = runner.options.encoding
82        self._default_prompt = default_prompt
83        self._default_timeout = default_timeout
84        self._normalize_newlines = normalize_newlines
85        self._chunk_size = _chunk_size
86        self._decoder = _make_decoder(self._encoding, normalize_newlines)
87
88        if LOG_PROMPT:
89            LOGGER.info(
90                "Prompt[pid=%s]: --- BEGIN --- name=%r",
91                self._runner.pid,
92                self._runner.name,
93            )
at_eof: bool
95    @property
96    def at_eof(self) -> bool:
97        "True if the prompt reader is at the end of file."
98        return self._at_eof and not self._pending

True if the prompt reader is at the end of file.

pending: str
100    @property
101    def pending(self) -> str:
102        """Characters that still remain in the `pending` buffer."""
103        return self._pending

Characters that still remain in the pending buffer.

echo: bool
105    @property
106    def echo(self) -> bool:
107        """True if PTY is in echo mode.
108
109        If the runner is not using a PTY, always return False.
110
111        When the process is using a PTY, you can enable/disable terminal echo
112        mode by setting the `echo` property to True/False.
113        """
114        if not isinstance(self._runner, Runner) or self._runner.pty_fd is None:
115            return False
116        return pty_util.get_term_echo(self._runner.pty_fd)

True if PTY is in echo mode.

If the runner is not using a PTY, always return False.

When the process is using a PTY, you can enable/disable terminal echo mode by setting the echo property to True/False.

result: Result
128    @property
129    def result(self) -> Result:
130        """The `Result` of the co-process when it exited.
131
132        You can only retrieve this property *after* the `async with` block
133        exits where the co-process is running:
134
135        ```
136        async with cmd.prompt() as cli:
137            ...
138        # Access `cli.result` here.
139        ```
140
141        Inside the `async with` block, raise a RuntimeError because the process
142        has not exited yet.
143        """
144        if self._result is None:
145            raise RuntimeError("Prompt process is still running")
146        return self._result

The Result of the co-process when it exited.

You can only retrieve this property after the async with block exits where the co-process is running:

async with cmd.prompt() as cli:
    ...
# Access `cli.result` here.

Inside the async with block, raise a RuntimeError because the process has not exited yet.

async def send( self, text: Union[bytes, str], *, end: Optional[str] = '\n', no_echo: bool = False, timeout: Optional[float] = None) -> None:
148    async def send(
149        self,
150        text: Union[bytes, str],
151        *,
152        end: Optional[str] = _DEFAULT_LINE_END,
153        no_echo: bool = False,
154        timeout: Optional[float] = None,
155    ) -> None:
156        """Write some `text` to co-process standard input and append a newline.
157
158        The `text` parameter is the string that you want to write. Use a `bytes`
159        object to send some raw bytes (e.g. to the terminal driver).
160
161        The default line ending is "\n". Use the `end` parameter to change the
162        line ending. To omit the line ending entirely, specify `end=None`.
163
164        Set `no_echo` to True when you are writing a password. When you set
165        `no_echo` to True, the `send` method will wait for terminal
166        echo mode to be disabled before writing the text. If shellous logging
167        is enabled, the sensitive information will **not** be logged.
168
169        Use the `timeout` parameter to override the default timeout. Normally,
170        data is delivered immediately and this method returns making a trip
171        through the event loop. However, there are situations where the
172        co-process input pipeline fills and we have to wait for it to
173        drain.
174
175        When this method needs to wait for the co-process input pipe to drain,
176        this method will concurrently read from the output pipe into the pending
177        buffer. This is necessary to avoid a deadlock situation where everything
178        stops because neither process can make progress.
179        """
180        if end is None:
181            end = ""
182
183        if no_echo:
184            await self._wait_no_echo()
185
186        if isinstance(text, bytes):
187            data = text + encode_bytes(end, self._encoding)
188        else:
189            data = encode_bytes(text + end, self._encoding)
190
191        stdin = self._runner.stdin
192        assert stdin is not None
193        stdin.write(data)
194
195        if LOG_PROMPT:
196            self._log_send(data, no_echo)
197
198        # Drain our write to stdin.
199        cancelled, ex = await harvest_results(
200            self._drain(stdin),
201            timeout=timeout or self._default_timeout,
202        )
203        if cancelled:
204            raise asyncio.CancelledError()
205        if isinstance(ex[0], Exception):
206            raise ex[0]

Write some text to co-process standard input and append a newline.

    The `text` parameter is the string that you want to write. Use a `bytes`
    object to send some raw bytes (e.g. to the terminal driver).

    The default line ending is "

". Use the end parameter to change the line ending. To omit the line ending entirely, specify end=None.

    Set `no_echo` to True when you are writing a password. When you set
    `no_echo` to True, the `send` method will wait for terminal
    echo mode to be disabled before writing the text. If shellous logging
    is enabled, the sensitive information will **not** be logged.

    Use the `timeout` parameter to override the default timeout. Normally,
    data is delivered immediately and this method returns making a trip
    through the event loop. However, there are situations where the
    co-process input pipeline fills and we have to wait for it to
    drain.

    When this method needs to wait for the co-process input pipe to drain,
    this method will concurrently read from the output pipe into the pending
    buffer. This is necessary to avoid a deadlock situation where everything
    stops because neither process can make progress.
async def expect( self, prompt: Union[str, list[str], re.Pattern[str], NoneType] = None, *, timeout: Optional[float] = None) -> tuple[str, re.Match[str]]:
208    async def expect(
209        self,
210        prompt: Union[str, list[str], re.Pattern[str], None] = None,
211        *,
212        timeout: Optional[float] = None,
213    ) -> tuple[str, re.Match[str]]:
214        """Read from co-process standard output until `prompt` pattern matches.
215
216        Returns a 2-tuple of (output, match) where `output` is the text *before*
217        the prompt pattern and `match` is a `re.Match` object for the prompt
218        text itself.
219
220        If `expect` reaches EOF or a timeout occurs before the prompt pattern
221        matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread
222        characters will be available in the `pending` buffer.
223
224        After this method returns, there may still be characters read from stdout
225        that remain in the `pending` buffer. These are the characters *after* the
226        prompt pattern. You can examine these using the `pending` property.
227        Subsequent calls to expect will examine this buffer first before
228        reading new data from the output pipe.
229
230        By default, this method will use the default `timeout` for the `Prompt`
231        object if one is set. You can use the `timeout` parameter to specify
232        a custom timeout in seconds.
233
234        Prompt Patterns
235        ~~~~~~~~~~~~~~~
236
237        The `expect()` method supports matching fixed strings and regular
238        expressions. The type of the `prompt` parameter determines the type of
239        search.
240
241        No argument or `None`:
242            Use the default prompt pattern. If there is no default prompt,
243            raise a TypeError.
244        `str`:
245            Match this string exactly.
246        `list[str]`:
247            Match one of these strings exactly.
248        `re.Pattern[str]`:
249            Match the given regular expression.
250
251        When matching a regular expression, only a single Pattern object is
252        supported. To match multiple regular expressions, combine them into a
253        single regular expression using *alternation* syntax (|).
254
255        The `expect()` method returns a 2-tuple (output, match). The `match`
256        is the result of the regular expression search (re.Match). If you
257        specify your prompt as a string or list of strings, it is still compiled
258        into a regular expression that produces an `re.Match` object. You can
259        examine the `match` object to determine the prompt value found.
260
261        This method conducts a regular expression search on streaming data. The
262        `expect()` method reads a new chunk of data into the `pending` buffer
263        and then searches it. You must be careful in writing a regular
264        expression so that the search is agnostic to how the incoming chunks of
265        data arrive. Consider including a boundary condition at the end of your pattern.
266        For example, instead of searching for the open-ended pattern`[a-z]+`,
267        search for the pattern `[a-z]+[^a-z]` which ends with a non-letter
268        character.
269
270        Examples
271        ~~~~~~~~
272
273        Expect an exact string:
274
275        ```
276        await cli.expect("ftp> ")
277        await cli.send(command)
278        response, _ = await cli.expect("ftp> ")
279        ```
280
281        Expect a choice of strings:
282
283        ```
284        _, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
285        match m[0]:
286            case "Login: ":
287                await cli.send(login)
288            case "Password: ":
289                await cli.send(password)
290            case "ftp> ":
291                await cli.send(command)
292        ```
293
294        Read until EOF:
295
296        ```
297        data = await cli.read_all()
298        ```
299
300        Read the contents of the `pending` buffer without filling the buffer
301        with any new data from the co-process pipe:
302
303        ```
304        data = await cli.read_pending()
305        ```
306        """
307        if prompt is None:
308            prompt = self._default_prompt
309            if prompt is None:
310                raise TypeError("prompt is required when default prompt is not set")
311        elif isinstance(prompt, (str, list)):
312            prompt = _regex_compile_exact(prompt)
313
314        if self._pending:
315            result = self._search_pending(prompt)
316            if result is not None:
317                return result
318
319        if self._at_eof:
320            raise EOFError("Prompt has reached EOF")
321
322        cancelled, (result,) = await harvest_results(
323            self._read_to_pattern(prompt),
324            timeout=timeout or self._default_timeout,
325        )
326        if cancelled:
327            raise asyncio.CancelledError()
328        if isinstance(result, Exception):
329            raise result
330
331        return result

Read from co-process standard output until prompt pattern matches.

Returns a 2-tuple of (output, match) where output is the text before the prompt pattern and match is a re.Match object for the prompt text itself.

If expect reaches EOF or a timeout occurs before the prompt pattern matches, it raises an EOFError or asyncio.TimeoutError. The unread characters will be available in the pending buffer.

After this method returns, there may still be characters read from stdout that remain in the pending buffer. These are the characters after the prompt pattern. You can examine these using the pending property. Subsequent calls to expect will examine this buffer first before reading new data from the output pipe.

By default, this method will use the default timeout for the Prompt object if one is set. You can use the timeout parameter to specify a custom timeout in seconds.

Prompt Patterns ~~~

The expect() method supports matching fixed strings and regular expressions. The type of the prompt parameter determines the type of search.

No argument or None: Use the default prompt pattern. If there is no default prompt, raise a TypeError. str: Match this string exactly. list[str]: Match one of these strings exactly. re.Pattern[str]: Match the given regular expression.

When matching a regular expression, only a single Pattern object is supported. To match multiple regular expressions, combine them into a single regular expression using alternation syntax (|).

The expect() method returns a 2-tuple (output, match). The match is the result of the regular expression search (re.Match). If you specify your prompt as a string or list of strings, it is still compiled into a regular expression that produces an re.Match object. You can examine the match object to determine the prompt value found.

This method conducts a regular expression search on streaming data. The expect() method reads a new chunk of data into the pending buffer and then searches it. You must be careful in writing a regular expression so that the search is agnostic to how the incoming chunks of data arrive. Consider including a boundary condition at the end of your pattern. For example, instead of searching for the open-ended pattern[a-z]+, search for the pattern [a-z]+[^a-z] which ends with a non-letter character.

Examples ~~~~

Expect an exact string:

await cli.expect("ftp> ")
await cli.send(command)
response, _ = await cli.expect("ftp> ")

Expect a choice of strings:

_, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
match m[0]:
    case "Login: ":
        await cli.send(login)
    case "Password: ":
        await cli.send(password)
    case "ftp> ":
        await cli.send(command)

Read until EOF:

data = await cli.read_all()

Read the contents of the pending buffer without filling the buffer with any new data from the co-process pipe:

data = await cli.read_pending()
async def read_all(self, *, timeout: Optional[float] = None) -> str:
333    async def read_all(
334        self,
335        *,
336        timeout: Optional[float] = None,
337    ) -> str:
338        """Read from co-process output until EOF.
339
340        If we are already at EOF, return "".
341        """
342        if not self._at_eof:
343            cancelled, (result,) = await harvest_results(
344                self._read_some(tag="@read_all"),
345                timeout=timeout or self._default_timeout,
346            )
347            if cancelled:
348                raise asyncio.CancelledError()
349            if isinstance(result, Exception):
350                raise result
351
352        return self.read_pending()

Read from co-process output until EOF.

If we are already at EOF, return "".

def read_pending(self) -> str:
354    def read_pending(self) -> str:
355        """Read the contents of the pending buffer and empty it.
356
357        This method does not fill the pending buffer with any new data from the
358        co-process output pipe. If the pending buffer is already empty, return
359        "".
360        """
361        result = self._pending
362        self._pending = ""
363        return result

Read the contents of the pending buffer and empty it.

This method does not fill the pending buffer with any new data from the co-process output pipe. If the pending buffer is already empty, return "".

async def command( self, text: str, *, end: str = '\n', no_echo: bool = False, prompt: Union[str, re.Pattern[str], NoneType] = None, timeout: Optional[float] = None, allow_eof: bool = False) -> str:
365    async def command(
366        self,
367        text: str,
368        *,
369        end: str = _DEFAULT_LINE_END,
370        no_echo: bool = False,
371        prompt: Union[str, re.Pattern[str], None] = None,
372        timeout: Optional[float] = None,
373        allow_eof: bool = False,
374    ) -> str:
375        """Send some text to the co-process and return the response.
376
377        This method is equivalent to calling send() following by expect().
378        However, the return value is simpler; `command()` does not return the
379        `re.Match` object.
380
381        If you call this method *after* the co-process output pipe has already
382        returned EOF, raise `EOFError`.
383
384        If `allow_eof` is True, this method will read data up to EOF instead of
385        raising an EOFError.
386        """
387        if self._at_eof:
388            raise EOFError("Prompt has reached EOF")
389
390        await self.send(text, end=end, no_echo=no_echo, timeout=timeout)
391        try:
392            result, _ = await self.expect(prompt, timeout=timeout)
393        except EOFError:
394            if not allow_eof:
395                raise
396            result = self.read_pending()
397
398        return result

Send some text to the co-process and return the response.

This method is equivalent to calling send() following by expect(). However, the return value is simpler; command() does not return the re.Match object.

If you call this method after the co-process output pipe has already returned EOF, raise EOFError.

If allow_eof is True, this method will read data up to EOF instead of raising an EOFError.

def close(self) -> None:
400    def close(self) -> None:
401        "Close stdin to end the prompt session."
402        stdin = self._runner.stdin
403        assert stdin is not None
404
405        if isinstance(self._runner, Runner) and self._runner.pty_eof:
406            # Write EOF twice; once to end the current line, and the second
407            # time to signal the end.
408            stdin.write(self._runner.pty_eof * 2)
409            if LOG_PROMPT:
410                LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid)
411
412        else:
413            stdin.close()
414            if LOG_PROMPT:
415                LOGGER.info("Prompt[pid=%s] close", self._runner.pid)

Close stdin to end the prompt session.

def cbreak(rows: int = 0, cols: int = 0) -> Callable[[int], NoneType]:
200def cbreak(rows: int = 0, cols: int = 0) -> PtyAdapter:
201    "Return a function that sets PtyOptions.child_fd to cbreak mode."
202
203    def _pty_set_cbreak(fdesc: int):
204        tty.setcbreak(fdesc)
205        if rows or cols:
206            _set_term_size(fdesc, rows, cols)
207        assert _get_eof(fdesc) == b""
208
209    return _pty_set_cbreak

Return a function that sets PtyOptions.child_fd to cbreak mode.

def cooked( rows: int = 0, cols: int = 0, echo: bool = True) -> Callable[[int], NoneType]:
212def cooked(rows: int = 0, cols: int = 0, echo: bool = True) -> PtyAdapter:
213    "Return a function that leaves PtyOptions.child_fd in cooked mode."
214
215    def _pty_set_canonical(fdesc: int):
216        if rows or cols:
217            _set_term_size(fdesc, rows, cols)
218        if not echo:
219            set_term_echo(fdesc, False)
220        assert _get_eof(fdesc) == b"\x04"
221
222    return _pty_set_canonical

Return a function that leaves PtyOptions.child_fd in cooked mode.

def raw(rows: int = 0, cols: int = 0) -> Callable[[int], NoneType]:
188def raw(rows: int = 0, cols: int = 0) -> PtyAdapter:
189    "Return a function that sets PtyOptions.child_fd to raw mode."
190
191    def _pty_set_raw(fdesc: int):
192        tty.setraw(fdesc)
193        if rows or cols:
194            _set_term_size(fdesc, rows, cols)
195        assert _get_eof(fdesc) == b""
196
197    return _pty_set_raw

Return a function that sets PtyOptions.child_fd to raw mode.

@dataclass(frozen=True, **_KW_ONLY)
class Result:
26@dataclass(frozen=True, **_KW_ONLY)
27class Result:
28    "Concrete class for the result of a Command."
29
30    exit_code: int
31    "Command's exit status. If < 0, this is a negated signal number."
32
33    output_bytes: bytes
34    "Output of command as bytes. May be None if there is no output."
35
36    error_bytes: bytes
37    "Limited standard error from command if not redirected."
38
39    cancelled: bool
40    "Command was cancelled."
41
42    encoding: str
43    "Output encoding."
44
45    @property
46    def output(self) -> str:
47        "Output of command as a string."
48        return decode_bytes(self.output_bytes, self.encoding)
49
50    @property
51    def error(self) -> str:
52        "Error from command as a string (if it is not redirected)."
53        return decode_bytes(self.error_bytes, self.encoding)
54
55    @property
56    def exit_signal(self) -> Optional[signal.Signals]:
57        "Signal that caused the command to exit, or None if no signal."
58        if self.exit_code >= 0:
59            return None
60        return signal.Signals(-self.exit_code)
61
62    def __bool__(self) -> bool:
63        "Return true if exit_code is 0."
64        return self.exit_code == 0

Concrete class for the result of a Command.

Result( *, exit_code: int, output_bytes: bytes, error_bytes: bytes, cancelled: bool, encoding: str)
exit_code: int

Command's exit status. If < 0, this is a negated signal number.

output_bytes: bytes

Output of command as bytes. May be None if there is no output.

error_bytes: bytes

Limited standard error from command if not redirected.

cancelled: bool

Command was cancelled.

encoding: str

Output encoding.

output: str
45    @property
46    def output(self) -> str:
47        "Output of command as a string."
48        return decode_bytes(self.output_bytes, self.encoding)

Output of command as a string.

error: str
50    @property
51    def error(self) -> str:
52        "Error from command as a string (if it is not redirected)."
53        return decode_bytes(self.error_bytes, self.encoding)

Error from command as a string (if it is not redirected).

exit_signal: Optional[signal.Signals]
55    @property
56    def exit_signal(self) -> Optional[signal.Signals]:
57        "Signal that caused the command to exit, or None if no signal."
58        if self.exit_code >= 0:
59            return None
60        return signal.Signals(-self.exit_code)

Signal that caused the command to exit, or None if no signal.

class ResultError(builtins.Exception):
14class ResultError(Exception):
15    "Represents a non-zero exit status."
16
17    @property
18    def result(self) -> "shellous.Result":
19        "Result of the command."
20        return self.args[0]

Represents a non-zero exit status.

result: Result
17    @property
18    def result(self) -> "shellous.Result":
19        "Result of the command."
20        return self.args[0]

Result of the command.

class Runner:
382class Runner:
383    """Runner is an asynchronous context manager that runs a command.
384
385    ```
386    async with Runner(cmd) as run:
387        # process streams: run.stdin, run.stdout, run.stderr (if not None)
388    result = run.result()
389    ```
390    """
391
392    stdin: Optional[asyncio.StreamWriter] = None
393    "Process standard input."
394
395    stdout: Optional[asyncio.StreamReader] = None
396    "Process standard output."
397
398    stderr: Optional[asyncio.StreamReader] = None
399    "Process standard error."
400
401    _options: _RunOptions
402    _tasks: list[asyncio.Task[Any]]
403    _proc: Optional["asyncio.subprocess.Process"] = None
404    _cancelled: bool = False
405    _timer: Optional[asyncio.TimerHandle] = None
406    _timed_out: bool = False
407    _last_signal: Optional[int] = None
408
409    def __init__(self, command: "shellous.Command[Any]"):
410        self._options = _RunOptions(command)
411        self._tasks = []
412
413    @property
414    def name(self) -> str:
415        "Return name of process being run."
416        return self.command.name
417
418    @property
419    def options(self) -> "shellous.Options":
420        "Return options for process being run."
421        return self.command.options
422
423    @property
424    def command(self) -> "shellous.Command[Any]":
425        "Return the command being run."
426        return self._options.command
427
428    @property
429    def pid(self) -> Optional[int]:
430        "Return the command's process ID."
431        if not self._proc:
432            return None
433        return self._proc.pid
434
435    @property
436    def returncode(self) -> Optional[int]:
437        "Process's exit code."
438        if not self._proc:
439            if self._cancelled:
440                # The process was cancelled before starting.
441                return CANCELLED_EXIT_CODE
442            return None
443        code = self._proc.returncode
444        if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None:
445            # After sending a signal, `waitpid` may fail to locate the child
446            # process. In this case, map the status to the last signal we sent.
447            # For more on this, see https://github.com/python/cpython/issues/87744
448            return -self._last_signal  # pylint: disable=invalid-unary-operand-type
449        return code
450
451    @property
452    def cancelled(self) -> bool:
453        "Return True if the command was cancelled."
454        return self._cancelled
455
456    @property
457    def pty_fd(self) -> Optional[int]:
458        """The file descriptor used to communicate with the child PTY process.
459
460        Returns None if the process is not using a PTY.
461        """
462        pty_fds = self._options.pty_fds
463        if pty_fds is not None:
464            return pty_fds.parent_fd
465        return None
466
467    @property
468    def pty_eof(self) -> Optional[bytes]:
469        """Byte sequence used to indicate EOF when written to the PTY child.
470
471        Returns None if process is not using a PTY.
472        """
473        pty_fds = self._options.pty_fds
474        if pty_fds is not None:
475            return pty_fds.eof
476        return None
477
478    def result(self, *, check: bool = True) -> Result:
479        "Check process exit code and raise a ResultError if necessary."
480        code = self.returncode
481        if code is None:
482            raise TypeError("Runner.result(): Process has not exited")
483
484        result = Result(
485            exit_code=code,
486            output_bytes=bytes(self._options.output_bytes or b""),
487            error_bytes=bytes(self._options.error_bytes or b""),
488            cancelled=self._cancelled,
489            encoding=self._options.encoding,
490        )
491
492        if not check:
493            return result
494
495        return check_result(
496            result,
497            self.command.options,
498            self._cancelled,
499            self._timed_out,
500        )
501
502    def add_task(
503        self,
504        coro: Coroutine[Any, Any, _T],
505        tag: str = "",
506    ) -> asyncio.Task[_T]:
507        "Add a background task."
508        task_name = f"{self.name}#{tag}"
509        task = asyncio.create_task(coro, name=task_name)
510        self._tasks.append(task)
511        return task
512
513    def send_signal(self, sig: int) -> None:
514        "Send an arbitrary signal to the process if it is running."
515        if self.returncode is None:
516            self._signal(sig)
517
518    def cancel(self) -> None:
519        "Cancel the running process if it is running."
520        if self.returncode is None:
521            self._signal(self.command.options.cancel_signal)
522
523    def _is_bsd_pty(self) -> bool:
524        "Return true if we're running a pty on BSD."
525        return BSD_DERIVED and bool(self._options.pty_fds)
526
527    @log_method(LOG_DETAIL)
528    async def _wait(self) -> None:
529        "Normal wait for background I/O tasks and process to finish."
530        assert self._proc
531
532        try:
533            if self._tasks:
534                await harvest(*self._tasks, trustee=self)
535            if self._is_bsd_pty():
536                await self._waiter()
537
538        except asyncio.CancelledError:
539            LOGGER.debug("Runner.wait cancelled %r", self)
540            self._set_cancelled()
541            self._tasks.clear()  # all tasks were cancelled
542            await self._kill()
543
544        except Exception as ex:
545            LOGGER.debug("Runner.wait exited with error %r ex=%r", self, ex)
546            self._tasks.clear()  # all tasks were cancelled
547            await self._kill()
548            raise  # re-raise exception
549
550    @log_method(LOG_DETAIL)
551    async def _wait_pid(self):
552        "Manually poll `waitpid` until process finishes."
553        assert self._is_bsd_pty()
554
555        while True:
556            assert self._proc is not None  # (pyright)
557
558            if poll_wait_pid(self._proc):
559                break
560            await asyncio.sleep(0.025)
561
562    @log_method(LOG_DETAIL)
563    async def _kill(self):
564        "Kill process and wait for it to finish."
565        assert self._proc
566
567        cancel_timeout = self.command.options.cancel_timeout
568        cancel_signal = self.command.options.cancel_signal
569
570        try:
571            # If not already done, send cancel signal.
572            if self._proc.returncode is None:
573                self._signal(cancel_signal)
574
575            if self._tasks:
576                await harvest(*self._tasks, timeout=cancel_timeout, trustee=self)
577
578            if self._proc.returncode is None:
579                await harvest(self._waiter(), timeout=cancel_timeout, trustee=self)
580
581        except (asyncio.CancelledError, asyncio.TimeoutError) as ex:
582            LOGGER.warning("Runner.kill %r (ex)=%r", self, ex)
583            if _is_cancelled(ex):
584                self._set_cancelled()
585            await self._kill_wait()
586
587        except (Exception, GeneratorExit) as ex:
588            LOGGER.warning("Runner.kill %r ex=%r", self, ex)
589            await self._kill_wait()
590            raise
591
592    def _signal(self, sig: Optional[int]):
593        "Send a signal to the process."
594        assert self._proc is not None  # (pyright)
595
596        if LOG_DETAIL:
597            LOGGER.debug("Runner.signal %r signal=%r", self, sig)
598        self._audit_callback("signal", signal=sig)
599
600        if sig is None:
601            self._proc.kill()
602        else:
603            self._proc.send_signal(sig)
604            self._last_signal = sig
605
606    @log_method(LOG_DETAIL)
607    async def _kill_wait(self):
608        "Wait for killed process to exit."
609        assert self._proc
610
611        # Check if process is already done.
612        if self._proc.returncode is not None:
613            return
614
615        try:
616            self._signal(None)
617            await harvest(self._waiter(), timeout=_KILL_TIMEOUT, trustee=self)
618        except asyncio.TimeoutError as ex:
619            # Manually check if the process is still running.
620            if poll_wait_pid(self._proc):
621                LOGGER.warning("%r process reaped manually %r", self, self._proc)
622            else:
623                LOGGER.error("%r failed to kill process %r", self, self._proc)
624                raise RuntimeError(f"Unable to kill process {self._proc!r}") from ex
625
626    @log_method(LOG_DETAIL)
627    async def __aenter__(self):
628        "Set up redirections and launch subprocess."
629        self._audit_callback("start")
630        try:
631            return await self._start()
632        except BaseException as ex:
633            self._stop_timer()  # failsafe just in case
634            self._audit_callback("stop", failure=type(ex).__name__)
635            raise
636        finally:
637            if self._cancelled and self.command.options._catch_cancelled_error:
638                # Raises ResultError instead of CancelledError.
639                self.result()
640
641    @log_method(LOG_DETAIL)
642    async def _start(self):
643        "Set up redirections and launch subprocess."
644        # assert self._proc is None
645        assert not self._tasks
646
647        try:
648            # Set up subprocess arguments and launch subprocess.
649            with self._options as opts:
650                await self._subprocess_spawn(opts)
651
652            assert self._proc is not None
653            stdin = self._proc.stdin
654            stdout = self._proc.stdout
655            stderr = self._proc.stderr
656
657            # Assign pty streams.
658            if opts.pty_fds:
659                assert (stdin, stdout) == (None, None)
660                stdin, stdout = opts.pty_fds.writer, opts.pty_fds.reader
661
662            if stderr is not None:
663                limit = None
664                if opts.error_bytes is not None:
665                    error = opts.error_bytes
666                    limit = opts.command.options.error_limit
667                elif opts.is_stderr_only:
668                    assert stdout is None
669                    assert opts.output_bytes is not None
670                    error = opts.output_bytes
671                else:
672                    error = opts.command.options.error
673                stderr = self._setup_output_sink(
674                    stderr, error, opts.encoding, "stderr", limit
675                )
676
677            if stdout is not None:
678                limit = None
679                if opts.output_bytes is not None:
680                    output = opts.output_bytes
681                else:
682                    output = opts.command.options.output
683                stdout = self._setup_output_sink(
684                    stdout, output, opts.encoding, "stdout", limit
685                )
686
687            if stdin is not None:
688                stdin = self._setup_input_source(stdin, opts)
689
690        except (Exception, asyncio.CancelledError) as ex:
691            LOGGER.debug("Runner._start %r ex=%r", self, ex)
692            if _is_cancelled(ex):
693                self._set_cancelled()
694            if self._proc:
695                await self._kill()
696            raise
697
698        # Make final streams available. These may be different from `self.proc`
699        # versions.
700        self.stdin = stdin
701        self.stdout = stdout
702        self.stderr = stderr
703
704        # Add a task to monitor for when the process finishes.
705        if not self._is_bsd_pty():
706            self.add_task(self._waiter(), "waiter")
707
708        # Set a timer to cancel the current task after a timeout.
709        self._start_timer(self.command.options.timeout)
710
711        return self
712
713    @log_method(LOG_DETAIL)
714    async def _subprocess_spawn(self, opts: _RunOptions):
715        "Start the subprocess."
716        assert self._proc is None
717
718        # Second half of pty setup.
719        if opts.pty_fds:
720            opts.pty_fds = await opts.pty_fds.open_streams()
721
722        # Check for task cancellation and yield right before exec'ing. If the
723        # current task is already cancelled, this will raise a CancelledError,
724        # and we save ourselves the work of launching and immediately killing
725        # a process.
726        await asyncio.sleep(0)
727
728        # Launch the subprocess (always completes even if cancelled).
729        await uninterrupted(self._subprocess_exec(opts))
730
731        # Launch the process substitution commands (if any).
732        for cmd in opts.subcmds:
733            self.add_task(cmd.coro(), "procsub")
734
735    @log_method(LOG_DETAIL)
736    async def _subprocess_exec(self, opts: _RunOptions):
737        "Start the subprocess and assign to `self.proc`."
738        with log_timer("asyncio.create_subprocess_exec"):
739            sys.audit(EVENT_SHELLOUS_EXEC, opts.pos_args[0])
740            with pty_util.set_ignore_child_watcher(
741                BSD_DERIVED and opts.pty_fds is not None
742            ):
743                self._proc = await asyncio.create_subprocess_exec(
744                    *opts.pos_args,
745                    **opts.kwd_args,
746                )
747
748    @log_method(LOG_DETAIL)
749    async def _waiter(self):
750        "Run task that waits for process to exit."
751        assert self._proc is not None  # (pyright)
752
753        try:
754            if self._is_bsd_pty():
755                await self._wait_pid()
756            else:
757                await self._proc.wait()
758        finally:
759            self._stop_timer()
760
761    def _set_cancelled(self):
762        "Set the cancelled flag, and cancel any inflight timers."
763        self._cancelled = True
764        self._stop_timer()
765
766    def _start_timer(self, timeout: Optional[float]):
767        "Start an optional timer to cancel the process if `timeout` desired."
768        assert self._timer is None
769        if timeout is not None:
770            loop = asyncio.get_running_loop()
771            task = asyncio.current_task()
772            assert task is not None
773            self._timer = loop.call_later(
774                timeout,
775                self._set_timer_expired,
776                task,
777            )
778
779    def _set_timer_expired(self, main_task: asyncio.Task[Any]):
780        "Set a flag when the timer expires and cancel the main task."
781        self._timed_out = True
782        self._timer = None
783        main_task.cancel()
784
785    def _stop_timer(self):
786        if self._timer:
787            self._timer.cancel()
788            self._timer = None
789
790    def _setup_input_source(
791        self,
792        stream: asyncio.StreamWriter,
793        opts: _RunOptions,
794    ):
795        "Set up a task to read from custom input source."
796        tag = "stdin"
797        eof = opts.pty_fds.eof if opts.pty_fds else None
798
799        if opts.input_bytes is not None:
800            self.add_task(redir.write_stream(opts.input_bytes, stream, eof), tag)
801            return None
802
803        source = opts.command.options.input
804
805        if isinstance(source, asyncio.StreamReader):
806            self.add_task(redir.write_reader(source, stream, eof), tag)
807            return None
808
809        if isinstance(source, io.BytesIO):
810            self.add_task(redir.write_stream(source.getvalue(), stream, eof), tag)
811            return None
812
813        if isinstance(source, io.StringIO):
814            input_bytes = encode_bytes(source.getvalue(), opts.encoding)
815            self.add_task(redir.write_stream(input_bytes, stream, eof), tag)
816            return None
817
818        return stream
819
820    def _setup_output_sink(
821        self,
822        stream: asyncio.StreamReader,
823        sink: Any,
824        encoding: str,
825        tag: str,
826        limit: Optional[int] = None,
827    ) -> Optional[asyncio.StreamReader]:
828        "Set up a task to write to custom output sink."
829        if isinstance(sink, io.StringIO):
830            self.add_task(redir.copy_stringio(stream, sink, encoding), tag)
831            return None
832
833        if isinstance(sink, io.BytesIO):
834            self.add_task(redir.copy_bytesio(stream, sink), tag)
835            return None
836
837        if isinstance(sink, bytearray):
838            # N.B. `limit` is only supported for bytearray.
839            if limit is not None:
840                self.add_task(redir.copy_bytearray_limit(stream, sink, limit), tag)
841            else:
842                self.add_task(redir.copy_bytearray(stream, sink), tag)
843            return None
844
845        if isinstance(sink, Logger):
846            self.add_task(redir.copy_logger(stream, sink, encoding), tag)
847            return None
848
849        if isinstance(sink, asyncio.StreamWriter):
850            self.add_task(redir.copy_streamwriter(stream, sink), tag)
851            return None
852
853        return stream
854
855    @log_method(LOG_DETAIL)
856    async def __aexit__(
857        self,
858        _exc_type: Union[type[BaseException], None],
859        exc_value: Union[BaseException, None],
860        _exc_tb: Optional[TracebackType],
861    ):
862        "Wait for process to exit and handle cancellation."
863        suppress = False
864        try:
865            suppress = await self._finish(exc_value)
866        except asyncio.CancelledError:
867            LOGGER.debug("Runner cancelled inside _finish %r", self)
868            self._set_cancelled()
869        finally:
870            self._stop_timer()  # failsafe just in case
871            self._audit_callback("stop")
872        # If `timeout` expired, raise TimeoutError rather than CancelledError.
873        if (
874            self._cancelled
875            and self._timed_out
876            and not self.command.options._catch_cancelled_error
877        ):
878            raise asyncio.TimeoutError()
879        return suppress
880
881    @log_method(LOG_DETAIL)
882    async def _finish(self, exc_value: Union[BaseException, None]):
883        "Finish the run. Return True only if `exc_value` should be suppressed."
884        assert self._proc
885
886        try:
887            if exc_value is not None:
888                if _is_cancelled(exc_value):
889                    self._set_cancelled()
890                await self._kill()
891                return self._cancelled
892
893            await self._wait()
894            return False
895
896        finally:
897            await self._close()
898
899    @log_method(LOG_DETAIL)
900    async def _close(self):
901        "Make sure that our resources are properly closed."
902        assert self._proc is not None
903
904        if self._options.pty_fds:
905            self._options.pty_fds.close()
906
907        # Make sure the transport is closed (for asyncio and uvloop).
908        self._proc._transport.close()  # pyright: ignore
909
910        # _close can be called when unwinding exceptions. We need to handle
911        # the case that the process has not exited yet.
912        if self._proc.returncode is None:
913            LOGGER.critical("Runner._close process still running %r", self._proc)
914            return
915
916        try:
917            # Make sure that original stdin is properly closed. `wait_closed`
918            # will raise a BrokenPipeError if not all input was properly written.
919            if self._proc.stdin is not None:
920                self._proc.stdin.close()
921                await harvest(
922                    self._proc.stdin.wait_closed(),
923                    timeout=_CLOSE_TIMEOUT,
924                    cancel_finish=True,  # finish `wait_closed` if cancelled
925                    trustee=self,
926                )
927
928        except asyncio.TimeoutError:
929            LOGGER.critical("Runner._close %r timeout stdin=%r", self, self._proc.stdin)
930
931    def _audit_callback(
932        self,
933        phase: str,
934        *,
935        failure: str = "",
936        signal: Optional[int] = None,
937    ):
938        "Call `audit_callback` if there is one."
939        callback = self.command.options.audit_callback
940        if callback:
941            sig = _signame(signal) if phase == "signal" else ""
942            info: shellous.AuditEventInfo = {
943                "runner": self,
944                "failure": failure,
945                "signal": sig,
946            }
947            callback(phase, info)
948
949    def __repr__(self) -> str:
950        "Return string representation of Runner."
951        cancelled = " cancelled" if self._cancelled else ""
952        if self._proc:
953            procinfo = f" pid={self._proc.pid} exit_code={self.returncode}"
954        else:
955            procinfo = " pid=None"
956        return f"<Runner {self.name!r}{cancelled}{procinfo}>"
957
958    async def _readlines(self):
959        "Iterate over lines in stdout/stderr"
960        stream = self.stdout or self.stderr
961        if stream:
962            async for line in redir.read_lines(stream, self._options.encoding):
963                yield line
964
965    def __aiter__(self) -> AsyncIterator[str]:
966        "Return asynchronous iterator over stdout/stderr."
967        return self._readlines()
968
969    @staticmethod
970    async def run_command(
971        command: "shellous.Command[Any]",
972        *,
973        _run_future: Optional[asyncio.Future["Runner"]] = None,
974    ) -> Union[str, Result]:
975        "Run a command. This is the main entry point for Runner."
976        if not _run_future and _is_multiple_capture(command):
977            LOGGER.warning("run_command: multiple capture requires 'async with'")
978            _cleanup(command)
979            raise ValueError("multiple capture requires 'async with'")
980
981        async with Runner(command) as run:
982            if _run_future is not None:
983                # Return streams to caller in another task.
984                _run_future.set_result(run)
985
986        result = run.result()
987        if command.options._return_result:
988            return result
989        return result.output

Runner is an asynchronous context manager that runs a command.

async with Runner(cmd) as run:
    # process streams: run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
Runner(command: Command[typing.Any])
409    def __init__(self, command: "shellous.Command[Any]"):
410        self._options = _RunOptions(command)
411        self._tasks = []
stdin: Optional[asyncio.streams.StreamWriter] = None

Process standard input.

stdout: Optional[asyncio.streams.StreamReader] = None

Process standard output.

stderr: Optional[asyncio.streams.StreamReader] = None

Process standard error.

name: str
413    @property
414    def name(self) -> str:
415        "Return name of process being run."
416        return self.command.name

Return name of process being run.

options: Options
418    @property
419    def options(self) -> "shellous.Options":
420        "Return options for process being run."
421        return self.command.options

Return options for process being run.

command: Command[typing.Any]
423    @property
424    def command(self) -> "shellous.Command[Any]":
425        "Return the command being run."
426        return self._options.command

Return the command being run.

pid: Optional[int]
428    @property
429    def pid(self) -> Optional[int]:
430        "Return the command's process ID."
431        if not self._proc:
432            return None
433        return self._proc.pid

Return the command's process ID.

returncode: Optional[int]
435    @property
436    def returncode(self) -> Optional[int]:
437        "Process's exit code."
438        if not self._proc:
439            if self._cancelled:
440                # The process was cancelled before starting.
441                return CANCELLED_EXIT_CODE
442            return None
443        code = self._proc.returncode
444        if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None:
445            # After sending a signal, `waitpid` may fail to locate the child
446            # process. In this case, map the status to the last signal we sent.
447            # For more on this, see https://github.com/python/cpython/issues/87744
448            return -self._last_signal  # pylint: disable=invalid-unary-operand-type
449        return code

Process's exit code.

cancelled: bool
451    @property
452    def cancelled(self) -> bool:
453        "Return True if the command was cancelled."
454        return self._cancelled

Return True if the command was cancelled.

pty_fd: Optional[int]
456    @property
457    def pty_fd(self) -> Optional[int]:
458        """The file descriptor used to communicate with the child PTY process.
459
460        Returns None if the process is not using a PTY.
461        """
462        pty_fds = self._options.pty_fds
463        if pty_fds is not None:
464            return pty_fds.parent_fd
465        return None

The file descriptor used to communicate with the child PTY process.

Returns None if the process is not using a PTY.

pty_eof: Optional[bytes]
467    @property
468    def pty_eof(self) -> Optional[bytes]:
469        """Byte sequence used to indicate EOF when written to the PTY child.
470
471        Returns None if process is not using a PTY.
472        """
473        pty_fds = self._options.pty_fds
474        if pty_fds is not None:
475            return pty_fds.eof
476        return None

Byte sequence used to indicate EOF when written to the PTY child.

Returns None if process is not using a PTY.

def result(self, *, check: bool = True) -> Result:
478    def result(self, *, check: bool = True) -> Result:
479        "Check process exit code and raise a ResultError if necessary."
480        code = self.returncode
481        if code is None:
482            raise TypeError("Runner.result(): Process has not exited")
483
484        result = Result(
485            exit_code=code,
486            output_bytes=bytes(self._options.output_bytes or b""),
487            error_bytes=bytes(self._options.error_bytes or b""),
488            cancelled=self._cancelled,
489            encoding=self._options.encoding,
490        )
491
492        if not check:
493            return result
494
495        return check_result(
496            result,
497            self.command.options,
498            self._cancelled,
499            self._timed_out,
500        )

Check process exit code and raise a ResultError if necessary.

def add_task( self, coro: Coroutine[Any, Any, ~_T], tag: str = '') -> _asyncio.Task[~_T]:
502    def add_task(
503        self,
504        coro: Coroutine[Any, Any, _T],
505        tag: str = "",
506    ) -> asyncio.Task[_T]:
507        "Add a background task."
508        task_name = f"{self.name}#{tag}"
509        task = asyncio.create_task(coro, name=task_name)
510        self._tasks.append(task)
511        return task

Add a background task.

def send_signal(self, sig: int) -> None:
513    def send_signal(self, sig: int) -> None:
514        "Send an arbitrary signal to the process if it is running."
515        if self.returncode is None:
516            self._signal(sig)

Send an arbitrary signal to the process if it is running.

def cancel(self) -> None:
518    def cancel(self) -> None:
519        "Cancel the running process if it is running."
520        if self.returncode is None:
521            self._signal(self.command.options.cancel_signal)

Cancel the running process if it is running.

@log_method(LOG_DETAIL)
async def __aenter__(self):
626    @log_method(LOG_DETAIL)
627    async def __aenter__(self):
628        "Set up redirections and launch subprocess."
629        self._audit_callback("start")
630        try:
631            return await self._start()
632        except BaseException as ex:
633            self._stop_timer()  # failsafe just in case
634            self._audit_callback("stop", failure=type(ex).__name__)
635            raise
636        finally:
637            if self._cancelled and self.command.options._catch_cancelled_error:
638                # Raises ResultError instead of CancelledError.
639                self.result()

Set up redirections and launch subprocess.

def __aiter__(self) -> AsyncIterator[str]:
965    def __aiter__(self) -> AsyncIterator[str]:
966        "Return asynchronous iterator over stdout/stderr."
967        return self._readlines()

Return asynchronous iterator over stdout/stderr.

@staticmethod
async def run_command( command: Command[typing.Any], *, _run_future: Optional[_asyncio.Future[Runner]] = None) -> Union[str, Result]:
969    @staticmethod
970    async def run_command(
971        command: "shellous.Command[Any]",
972        *,
973        _run_future: Optional[asyncio.Future["Runner"]] = None,
974    ) -> Union[str, Result]:
975        "Run a command. This is the main entry point for Runner."
976        if not _run_future and _is_multiple_capture(command):
977            LOGGER.warning("run_command: multiple capture requires 'async with'")
978            _cleanup(command)
979            raise ValueError("multiple capture requires 'async with'")
980
981        async with Runner(command) as run:
982            if _run_future is not None:
983                # Return streams to caller in another task.
984                _run_future.set_result(run)
985
986        result = run.result()
987        if command.options._return_result:
988            return result
989        return result.output

Run a command. This is the main entry point for Runner.

class PipeRunner:
 992class PipeRunner:
 993    """PipeRunner is an asynchronous context manager that runs a pipeline.
 994
 995    ```
 996    async with pipe.run() as run:
 997        # process run.stdin, run.stdout, run.stderr (if not None)
 998    result = run.result()
 999    ```
1000    """
1001
1002    stdin: Optional[asyncio.StreamWriter] = None
1003    "Pipeline standard input."
1004
1005    stdout: Optional[asyncio.StreamReader] = None
1006    "Pipeline standard output."
1007
1008    stderr: Optional[asyncio.StreamReader] = None
1009    "Pipeline standard error."
1010
1011    _pipe: "shellous.Pipeline[Any]"
1012    _capturing: bool
1013    _tasks: list[asyncio.Task[Any]]
1014    _encoding: str
1015    _cancelled: bool = False
1016    _results: Optional[list[Union[BaseException, Result]]] = None
1017    _pid: int = -1
1018
1019    def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool):
1020        """`capturing=True` indicates we are within an `async with` block and
1021        client needs to access `stdin` and `stderr` streams.
1022        """
1023        assert len(pipe.commands) > 1
1024
1025        self._pipe = pipe
1026        self._cancelled = False
1027        self._tasks = []
1028        self._capturing = capturing
1029        self._encoding = pipe.options.encoding
1030
1031    @property
1032    def name(self) -> str:
1033        "Return name of the pipeline."
1034        return self._pipe.name
1035
1036    @property
1037    def options(self) -> "shellous.Options":
1038        """Return options for pipeline being run.
1039
1040        These are the options for the last command in the pipeline.
1041        """
1042        return self._pipe.options
1043
1044    @property
1045    def pid(self) -> Optional[int]:
1046        """Return the process ID for the first command in the pipeline.
1047
1048        The PID is only available when `capturing=True`.
1049        """
1050        if self._pid < 0:
1051            return None
1052        return self._pid
1053
1054    def result(self, *, check: bool = True) -> Result:
1055        "Return `Result` object for PipeRunner."
1056        assert self._results is not None
1057
1058        result = convert_result_list(self._results, self._cancelled)
1059        if not check:
1060            return result
1061
1062        return check_result(result, self._pipe.options, self._cancelled)
1063
1064    def add_task(
1065        self,
1066        coro: Coroutine[Any, Any, _T],
1067        tag: str = "",
1068    ) -> asyncio.Task[_T]:
1069        "Add a background task."
1070        task_name = f"{self.name}#{tag}"
1071        task = asyncio.create_task(coro, name=task_name)
1072        self._tasks.append(task)
1073        return task
1074
1075    @log_method(LOG_DETAIL)
1076    async def _wait(self, *, kill: bool = False):
1077        "Wait for pipeline to finish."
1078        assert self._results is None
1079
1080        if kill:
1081            LOGGER.debug("PipeRunner.wait killing pipe %r", self)
1082            for task in self._tasks:
1083                task.cancel()
1084
1085        cancelled, self._results = await harvest_results(*self._tasks, trustee=self)
1086        if cancelled:
1087            self._cancelled = True
1088        self._tasks.clear()  # clear all tasks when done
1089
1090    @log_method(LOG_DETAIL)
1091    async def __aenter__(self) -> "PipeRunner":
1092        "Set up redirections and launch pipeline."
1093        try:
1094            return await self._start()
1095        except (Exception, asyncio.CancelledError) as ex:
1096            LOGGER.warning("PipeRunner enter %r ex=%r", self, ex)
1097            if _is_cancelled(ex):
1098                self._cancelled = True
1099            await self._wait(kill=True)
1100            raise
1101
1102    @log_method(LOG_DETAIL)
1103    async def __aexit__(
1104        self,
1105        _exc_type: Union[type[BaseException], None],
1106        exc_value: Union[BaseException, None],
1107        _exc_tb: Optional[TracebackType],
1108    ):
1109        "Wait for pipeline to exit and handle cancellation."
1110        suppress = False
1111        try:
1112            suppress = await self._finish(exc_value)
1113        except asyncio.CancelledError:
1114            LOGGER.warning("PipeRunner cancelled inside _finish %r", self)
1115            self._cancelled = True
1116        return suppress
1117
1118    @log_method(LOG_DETAIL)
1119    async def _finish(self, exc_value: Optional[BaseException]) -> bool:
1120        "Wait for pipeline to exit and handle cancellation."
1121        if exc_value is not None:
1122            LOGGER.warning("PipeRunner._finish exc_value=%r", exc_value)
1123            if _is_cancelled(exc_value):
1124                self._cancelled = True
1125            await self._wait(kill=True)
1126            return self._cancelled
1127
1128        await self._wait()
1129        return False
1130
1131    @log_method(LOG_DETAIL)
1132    async def _start(self):
1133        "Set up redirection and launch pipeline."
1134        open_fds: list[int] = []
1135
1136        try:
1137            stdin = None
1138            stdout = None
1139            stderr = None
1140
1141            cmds = self._setup_pipeline(open_fds)
1142
1143            if self._capturing:
1144                stdin, stdout, stderr = await self._setup_capturing(cmds)
1145            else:
1146                for cmd in cmds:
1147                    self.add_task(cmd.coro())
1148
1149            self.stdin = stdin
1150            self.stdout = stdout
1151            self.stderr = stderr
1152
1153            return self
1154
1155        except BaseException:  # pylint: disable=broad-except
1156            # Clean up after any exception *including* CancelledError.
1157            close_fds(open_fds)
1158            raise
1159
1160    def _setup_pipeline(self, open_fds: list[int]):
1161        """Return the pipeline stitched together with pipe fd's.
1162
1163        Each created open file descriptor is added to `open_fds` so it can
1164        be closed if there's an exception later.
1165        """
1166        cmds = list(self._pipe.commands)
1167
1168        cmd_count = len(cmds)
1169        for i in range(cmd_count - 1):
1170            (read_fd, write_fd) = os.pipe()
1171            open_fds.extend((read_fd, write_fd))
1172
1173            cmds[i] = cmds[i].stdout(write_fd, close=True)
1174            cmds[i + 1] = cmds[i + 1].stdin(read_fd, close=True)
1175
1176        for i in range(cmd_count):
1177            cmds[i] = cmds[i].set(_return_result=True, _catch_cancelled_error=True)
1178
1179        return cmds
1180
1181    @log_method(LOG_DETAIL)
1182    async def _setup_capturing(self, cmds: "list[shellous.Command[Any]]"):
1183        """Set up capturing and return (stdin, stdout, stderr) streams."""
1184        loop = asyncio.get_event_loop()
1185        first_fut = loop.create_future()
1186        last_fut = loop.create_future()
1187
1188        first_coro = cmds[0].coro(_run_future=first_fut)
1189        last_coro = cmds[-1].coro(_run_future=last_fut)
1190        middle_coros = [cmd.coro() for cmd in cmds[1:-1]]
1191
1192        # Tag each task name with the index of the command in the pipe.
1193        self.add_task(first_coro, "0")
1194        for i, coro in enumerate(middle_coros):
1195            self.add_task(coro, str(i + 1))
1196        self.add_task(last_coro, str(len(cmds) - 1))
1197
1198        # When capturing, we need the first and last commands in the
1199        # pipe to signal when they are ready.
1200        first_ready, last_ready = await asyncio.gather(first_fut, last_fut)
1201
1202        stdin, stdout, stderr = (
1203            first_ready.stdin,
1204            last_ready.stdout,
1205            last_ready.stderr,
1206        )
1207        self._pid = first_ready.pid
1208
1209        return (stdin, stdout, stderr)
1210
1211    def __repr__(self) -> str:
1212        "Return string representation of PipeRunner."
1213        cancelled_info = ""
1214        if self._cancelled:
1215            cancelled_info = " cancelled"
1216        result_info = ""
1217        if self._results:
1218            result_info = f" results={self._results!r}"
1219        return f"<PipeRunner {self.name!r}{cancelled_info}{result_info}>"
1220
1221    async def _readlines(self) -> AsyncIterator[str]:
1222        "Iterate over lines in stdout/stderr"
1223        stream = self.stdout or self.stderr
1224        if stream:
1225            async for line in redir.read_lines(stream, self._encoding):
1226                yield line
1227
1228    def __aiter__(self) -> AsyncIterator[str]:
1229        "Return asynchronous iterator over stdout/stderr."
1230        return self._readlines()
1231
1232    @staticmethod
1233    async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]:
1234        "Run a pipeline. This is the main entry point for PipeRunner."
1235        run = PipeRunner(pipe, capturing=False)
1236        async with run:
1237            pass
1238
1239        result = run.result()
1240        if pipe.options._return_result:
1241            return result
1242        return result.output

PipeRunner is an asynchronous context manager that runs a pipeline.

async with pipe.run() as run:
    # process run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
PipeRunner(pipe: Pipeline[typing.Any], *, capturing: bool)
1019    def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool):
1020        """`capturing=True` indicates we are within an `async with` block and
1021        client needs to access `stdin` and `stderr` streams.
1022        """
1023        assert len(pipe.commands) > 1
1024
1025        self._pipe = pipe
1026        self._cancelled = False
1027        self._tasks = []
1028        self._capturing = capturing
1029        self._encoding = pipe.options.encoding

capturing=True indicates we are within an async with block and client needs to access stdin and stderr streams.

stdin: Optional[asyncio.streams.StreamWriter] = None

Pipeline standard input.

stdout: Optional[asyncio.streams.StreamReader] = None

Pipeline standard output.

stderr: Optional[asyncio.streams.StreamReader] = None

Pipeline standard error.

name: str
1031    @property
1032    def name(self) -> str:
1033        "Return name of the pipeline."
1034        return self._pipe.name

Return name of the pipeline.

options: Options
1036    @property
1037    def options(self) -> "shellous.Options":
1038        """Return options for pipeline being run.
1039
1040        These are the options for the last command in the pipeline.
1041        """
1042        return self._pipe.options

Return options for pipeline being run.

These are the options for the last command in the pipeline.

pid: Optional[int]
1044    @property
1045    def pid(self) -> Optional[int]:
1046        """Return the process ID for the first command in the pipeline.
1047
1048        The PID is only available when `capturing=True`.
1049        """
1050        if self._pid < 0:
1051            return None
1052        return self._pid

Return the process ID for the first command in the pipeline.

The PID is only available when capturing=True.

def result(self, *, check: bool = True) -> Result:
1054    def result(self, *, check: bool = True) -> Result:
1055        "Return `Result` object for PipeRunner."
1056        assert self._results is not None
1057
1058        result = convert_result_list(self._results, self._cancelled)
1059        if not check:
1060            return result
1061
1062        return check_result(result, self._pipe.options, self._cancelled)

Return Result object for PipeRunner.

def add_task( self, coro: Coroutine[Any, Any, ~_T], tag: str = '') -> _asyncio.Task[~_T]:
1064    def add_task(
1065        self,
1066        coro: Coroutine[Any, Any, _T],
1067        tag: str = "",
1068    ) -> asyncio.Task[_T]:
1069        "Add a background task."
1070        task_name = f"{self.name}#{tag}"
1071        task = asyncio.create_task(coro, name=task_name)
1072        self._tasks.append(task)
1073        return task

Add a background task.

@log_method(LOG_DETAIL)
async def __aenter__(self) -> PipeRunner:
1090    @log_method(LOG_DETAIL)
1091    async def __aenter__(self) -> "PipeRunner":
1092        "Set up redirections and launch pipeline."
1093        try:
1094            return await self._start()
1095        except (Exception, asyncio.CancelledError) as ex:
1096            LOGGER.warning("PipeRunner enter %r ex=%r", self, ex)
1097            if _is_cancelled(ex):
1098                self._cancelled = True
1099            await self._wait(kill=True)
1100            raise

Set up redirections and launch pipeline.

def __aiter__(self) -> AsyncIterator[str]:
1228    def __aiter__(self) -> AsyncIterator[str]:
1229        "Return asynchronous iterator over stdout/stderr."
1230        return self._readlines()

Return asynchronous iterator over stdout/stderr.

@staticmethod
async def run_pipeline( pipe: Pipeline[typing.Any]) -> Union[str, Result]:
1232    @staticmethod
1233    async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]:
1234        "Run a pipeline. This is the main entry point for PipeRunner."
1235        run = PipeRunner(pipe, capturing=False)
1236        async with run:
1237            pass
1238
1239        result = run.result()
1240        if pipe.options._return_result:
1241            return result
1242        return result.output

Run a pipeline. This is the main entry point for PipeRunner.

class AuditEventInfo(typing.TypedDict):
74class AuditEventInfo(TypedDict):
75    """Info attached to each audit callback event.
76
77    See `audit_callback` in `Command.set` for more information.
78    """
79
80    runner: Runner
81    "Reference to the Runner object."
82
83    failure: str
84    "When phase is 'stop', the name of the exception from starting the process."
85
86    signal: str
87    "When phase is 'signal', the signal name/number sent to the process."

Info attached to each audit callback event.

See audit_callback in Command.set for more information.

runner: Runner

Reference to the Runner object.

failure: str

When phase is 'stop', the name of the exception from starting the process.

signal: str

When phase is 'signal', the signal name/number sent to the process.