shellous
Async Processes and Pipelines
shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.
import asyncio
from shellous import sh
async def main():
result = await sh("echo", "hello")
print(result)
asyncio.run(main())
Benefits
- Run programs asynchronously in a single line.
- Redirect stdin, stdout and stderr to files, memory buffers, async streams or loggers.
- Iterate asynchronously over subprocess output.
- Set timeouts and reliably cancel running processes.
- Run a program with a pseudo-terminal (pty).
- Use send() and expect() to manually control a subprocess.
- Construct pipelines and use process substitution directly from Python (no shell required).
- Runs on Linux, MacOS, FreeBSD and Windows.
- Monitor processes being started and stopped with
audit_callback
API.
Requirements
- Requires Python 3.9 or later.
- Requires an asyncio event loop.
- Pseudo-terminals require a Unix system.
- Process substitution requires a Unix system with /dev/fd support.
Running a Command
The tutorial in this README uses the asyncio REPL built into Python. In these examples, >>>
is the REPL prompt.
Start the asyncio REPL by typing python3 -m asyncio
, and import sh from the shellous module:
>>> from shellous import sh
Here's a command that runs echo "hello, world"
.
>>> await sh("echo", "hello, world")
'hello, world\n'
The first argument to sh
is the program name. It is followed by zero or more arguments. Each argument will be
converted to a string. If an argument is a list or tuple, it is flattened recursively.
>>> await sh("echo", 1, 2, [3, 4, (5, 6)])
'1 2 3 4 5 6\n'
A command does not run until you await
it. When you run a command using await
, it returns the value of the standard output interpreted as a UTF-8 string.
It is safe to await
the same command object more than once.
Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc")
will run the same command as echo -n "abc"
.
>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'
Commands are immutable objects that represent a program invocation: program name, arguments, environment
variables, redirection operators and other settings. When you use a method to modify a Command
, you are
returning a new Command
object. The original object is unchanged.
You can wrap your commands in a function to improve type safety:
>>> from shellous import Command
>>> def exclaim(word: str) -> Command[str]:
... return sh("echo", "-n", f"{word}!!")
...
>>> await exclaim("Oh")
'Oh!!'
The type hint Command[str]
indicates that the command returns a str
.
Arguments
Commands use positional arguments only; keyword arguments are not supported.
In most cases, shellous automatically converts Python objects passed as command arguments to str
or bytes
. As described above, the list
and tuple
types are an exception; they are recursively flattened before their elements are converted to strings.
Dicts, sets, and generator types are not supported as arguments. Their string format doesn't make sense as a command line argument.
Results
When a command completes successfully, it returns the standard output (or "" if stdout is redirected). For a more detailed response, you can specify that the command should return a Result
object by using the .result
modifier:
>>> await echo.result("abc")
Result(exit_code=0, output_bytes=b'abc', error_bytes=b'', cancelled=False, encoding='utf-8')
A Result
object contains the command's exit_code
in addition to its output. A Result
is True if
the command's exit_code is zero. You can access the string value of the output using the .output
property:
if result := await sh.result("cat", "some-file"):
output = result.output
else:
print(f"Command failed with exit_code={result.exit_code})
You can retrieve the string value of the standard error using the .error
property. (By default, only the
first 1024 bytes of standard error is stored.)
If a command was terminated by a signal, the exit_code
will be the negative signal number.
The return value of sh.result("cmd", ...)
uses the type hint Command[Result]
.
ResultError
If you are not using the .result
modifier and a command fails, it raises a ResultError
exception:
>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'cat: does_not_exist: No such file or directory\n', cancelled=False, encoding='utf-8')
The ResultError
exception contains a Result
object with the exit_code and the first 1024 bytes of standard error.
In some cases, you want to ignore certain exit code values. That is, you want to treat them as if they are
normal. To do this, you can set the exit_codes
option:
>>> await sh("cat", "does_not_exist").set(exit_codes={0,1})
''
If there is a problem launching a process, shellous can also raise a separate FileNotFoundError
or PermissionError
exception.
Async For
Using await
to run a command collects the entire output of the command in memory before returning it. You
can also iterate over the output lines as they arrive using async for
.
>>> [line async for line in echo("hi\n", "there")]
['hi\n', ' there']
Use an async for
loop when you want to examine the stream of output from a command, line by line. For example, suppose you want to run tail on a log file.
async for line in sh("tail", "-f", "/var/log/syslog"):
if "ERROR" in line:
print(line.rstrip())
Async With
You can use a command as an asynchronous context manager. There are two ways to run a program using a context manager: a low-level API and a high-level API.
Byte-by-Byte (Low Level)
Use async with
directly when you need byte-by-byte
control over the individual streams: stdin, stdout and stderr. To control a standard stream, you
must tell shellous to "capture" it (For more on this, see Redirection.)
cmd = sh("cat").stdin(sh.CAPTURE).stdout(sh.CAPTURE)
async with cmd as run:
run.stdin.write(b"abc")
run.stdin.close()
print(await run.stdout.readline())
result = run.result()
The streams run.stdout
and run.stderr
are asyncio.StreamReader
objects. The stream run.stdin
is an asyncio.StreamWriter
object. If we didn't specify that stdin/stdout are sh.CAPTURE
, the
streams run.stdin
and run.stdout
would be
None
.
The return value of run.result()
is a Result
object. Depending on the command settings, this
function may raise a ResultError
on a non-zero exit code.
:warning: When reading or writing individual streams, you are responsible for managing reads and writes so they don't deadlock. You may use
run.create_task
to schedule a concurrent task.
You can also use async with
to run a server. When you do so, you must tell the server
to stop using run.cancel()
. Otherwise, the context manager will wait forever for the process to exit.
async with sh("some-server") as run:
# Send commands to the server here...
# Manually signal the server to stop.
run.cancel()
Prompt with Send/Expect (High Level API)
Use the prompt()
method to control a process using send
and expect
. The prompt()
method returns an asynchronous context manager (the Prompt
class) that facilitates reading and
writing strings and matching regular expressions.
cmd = sh("cat").set(pty=True)
async with cmd.prompt() as client:
await client.send("abc")
output, _ = await client.expect("\r\n")
print(output)
The Prompt
API automatically captures stdin
and stdout
.
Here is another example of controlling a bash co-process running in a docker container.
async def list_packages():
"Run bash in an ubuntu docker container and list packages."
bash_prompt = re.compile("root@[0-9a-f]+:/[^#]*# ")
cmd = sh("docker", "run", "-it", "--rm", "-e", "TERM=dumb", "ubuntu")
async with cmd.set(pty=True).prompt(bash_prompt, timeout=3) as cli:
# Read up to first prompt.
await cli.expect()
# Disable echo. The `command()` method combines send *and* expect methods.
await cli.command("stty -echo")
# Return list of packages.
result = await cli.command("apt-cache pkgnames")
return result.strip().split("\r\n")
# You can check the result object's exit code. You can only
# access `cli.result` outside the `async with` block.
assert cli.result.exit_code == 0
The prompt()
API does not raise a ResultError
when a command exits with an error status.
Typically, you'll see an EOFError when you were expecting to read a response. You can check the
exit status by retrieving the Prompt's result
property outside of the async with
block.
Redirection
shellous supports the redirection operators |
and >>
. They work similar to how they work in
the unix shell. Shellous does not support use of <
or >
for redirection. Instead, replace these
with |
.
To redirect to or from a file, use a pathlib.Path
object. Alternatively, you can redirect input/output
to a StringIO object, an open file, a Logger, or use a special redirection constant like sh.DEVNULL
.
:warning: When combining the redirect operators with
await
, you must use parentheses;await
has higher precedence than|
and>>
.
Redirecting Standard Input
To redirect standard input, use the pipe operator |
with the argument on the left-side.
Here is an example that passes the string "abc" as standard input.
>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
' 3\n'
To read input from a file, use a Path
object from pathlib
.
>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
' 201\n'
Shellous supports different STDIN behavior when using different Python types.
Python Type | Behavior as STDIN |
---|---|
str | Read input from string object. |
bytes, bytearray | Read input from bytes object. |
Path | Read input from file specified by Path . |
File, StringIO, ByteIO | Read input from open file object. |
int | Read input from existing file descriptor. |
asyncio.StreamReader | Read input from StreamReader . |
sh.DEVNULL | Read input from /dev/null . |
sh.INHERIT | Read input from existing sys.stdin . |
sh.CAPTURE | You will write to stdin interactively. |
Redirecting Standard Output
To redirect standard output, use the pipe operator |
with the argument on the right-side. Here is an
example that writes to a temporary file.
>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'
To redirect standard output with append, use the >>
operator.
>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'
Shellous supports different STDOUT behavior when using different Python types.
Python Type | Behavior as STDOUT/STDERR |
---|---|
Path | Write output to the file path specified by Path . |
bytearray | Write output to a mutable byte array. |
File, StringIO, ByteIO | Write output to an open file object. |
int | Write output to existing file descriptor at its current position. â—† |
logging.Logger | Log each line of output. â—† |
asyncio.StreamWriter | Write output to StreamWriter . â—† |
sh.CAPTURE | Capture output for async with . â—† |
sh.DEVNULL | Write output to /dev/null . â—† |
sh.INHERIT | Write output to existing sys.stdout or sys.stderr . â—† |
â—† For these types, there is no difference between using |
and >>
.
Shellous does not support redirecting standard output/error to a plain str
or bytes
object.
If you intend to redirect output to a file, you must use a pathlib.Path
object.
Redirecting Standard Error
By default, the first 1024 bytes read from standard error are stored in the Result object.
Any further bytes are discarded. You can change the 1024 byte limit using the error_limit
option.
To redirect standard error, use the stderr
method. Standard error supports the
same Python types as standard output. To append, set append=True
in the stderr
method.
To redirect stderr to the same place as stdout, use the sh.STDOUT
constant. If you also
redirect stdout to sh.DEVNULL
, you will only receive the standard error.
>>> cmd = sh("cat", "does_not_exist").stderr(sh.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'
To redirect standard error to the hosting program's sys.stderr
, use the sh.INHERIT
redirect
option.
>>> cmd = sh("cat", "does_not_exist").stderr(sh.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'', cancelled=False, encoding='utf-8')
If you redirect stderr, it will no longer be stored in the Result object, and the error_limit
option
will not apply.
Default Redirections
For regular commands, the default redirections are:
- Standard input is read from the empty string ("").
- Standard out is buffered and stored in the Result object (BUFFER).
- First 1024 bytes of standard error is buffered and stored in the Result object (BUFFER).
However, the default redirections are adjusted when using a pseudo-terminal (pty):
- Standard input is captured and ignored (CAPTURE).
- Standard out is buffered and stored in the Result object (BUFFER).
- Standard error is redirected to standard output (STDOUT).
When you use the Prompt
API, the standard input and standard output are automatically redirected to CAPTURE.
Pipelines
You can create a pipeline by combining commands using the |
operator. A pipeline feeds the standard out of one process into the next process as standard input. Here is the shellous
equivalent to the bash command: ls | grep README
>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'
A pipeline returns a Result
if the last command in the pipeline has the .result
modifier. To set other
options like encoding
for a Pipeline, set them on the last command.
>>> pipe = sh("ls") | sh("grep", "README").result
>>> await pipe
Result(exit_code=0, output_bytes=b'README.md\n', error_bytes=b'', cancelled=False, encoding='utf-8')
Error reporting for a pipeline is implemented similar to using the -o pipefail
shell option.
Pipelines support the same await/async for/async with
operations that work on a single command, including
the Prompt
API.
>>> [line.strip() async for line in pipe]
['README.md']
Process Substitution (Unix Only)
You can pass a shell command as an argument to another. Here is the shellous equivalent to the bash
command: grep README <(ls)
.
>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'
Use .writable
to write to a command instead.
>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | sh.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')
The above example is equivalent to ls | tee >(grep README > buf) > /dev/null
.
Timeouts
You can specify a timeout using the timeout
option. If the timeout expires, shellous will raise
a TimeoutError
.
>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
...
TimeoutError
Timeouts are just a special case of cancellation. When a command is cancelled, shellous terminates
the running process and raises a CancelledError
.
>>> t = asyncio.create_task(sh("sleep", 60).coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
...
CancelledError
By default, shellous will send a SIGTERM signal to the process to tell it to exit. If the process does not
exit within 3 seconds, shellous will send a SIGKILL signal. You can change these defaults with the
cancel_signal
and cancel_timeout
settings. A command is not considered fully cancelled until the
process exits.
Pseudo-Terminal Support (Unix Only)
To run a command through a pseudo-terminal, set the pty
option to True.
>>> await sh("echo", "in a pty").set(pty=True)
'in a pty\r\n'
Alternatively, you can pass a pty
function to configure the tty mode and size.
>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'
Shellous provides three built-in helper functions: shellous.cooked()
, shellous.raw()
and shellous.cbreak()
.
Context Objects
You can store shared command settings in an immutable context object (CmdContext). To create a new context object, specify your changes to the default context sh:
>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh_audit = sh.set(audit_callback=auditor)
Now all commands created with sh_audit
will log their progress using the audit callback.
>>> await sh_audit("echo", "goodbye")
start echo
stop echo
'goodbye\n'
You can also create a context object that specifies all return values are Result
objects.
>>> rsh = sh.result
>>> await rsh("echo", "whatever")
Result(exit_code=0, output_bytes=b'whatever\n', error_bytes=b'', cancelled=False, encoding='utf-8')
Options
Both Command
and CmdContext
support options to control their runtime behavior. Some of these options (timeout,
pty, audit_callback, and exit_codes) have been described above. See the shellous.Options
class for
more information.
You can retrieve an option from cmd
with cmd.options.<option>
. For example, use cmd.options.encoding
to obtain the encoding:
>>> cmd = sh("echo").set(encoding="latin1")
>>> cmd.options.encoding
'latin1'
Command
and CmdContext
use the .set()
method to specify most options:
Option | Description |
---|---|
path | Search path to use instead of the PATH environment variable. (Default=None) |
env | Additional environment variables to pass to the command. (Default={}) |
inherit_env | True if command should inherit the environment variables from the current process. (Default=True) |
encoding | Text encoding of input/output streams. You can specify an error handling scheme by including it after a space, e.g. "ascii backslashreplace". (Default="utf-8 strict") |
exit_codes | Set of exit codes that do not raise a ResultError . (Default={0}) |
timeout | Timeout in seconds to wait before cancelling the process. (Default=None) |
cancel_timeout | Timeout in seconds to wait for a cancelled process to exit before forcefully terminating it. (Default=3s) |
cancel_signal | The signal sent to a process when it is cancelled. (Default=SIGTERM) |
alt_name | Alternate name for the process used for debug logging. (Default=None) |
pass_fds | Additional file descriptors to pass to the process. (Default={}) |
pass_fds_close | True if descriptors in pass_fds should be closed after the child process is launched. (Default=False) |
pty | Used to allocate a pseudo-terminal (PTY). (Default=False) |
close_fds | True if process should close all file descriptors when it starts. This setting defaults to False to align with posix_spawn requirements. (Default=False) |
audit_callback | Provide function to audit stages of process execution. (Default=None) |
coerce_arg | Provide function to coerce Command arguments to strings when str() is not sufficient. For example, you can provide your own function that converts a dictionary argument to a sequence of strings. (Default=None) |
error_limit | Maximum number of initial bytes of STDERR to store in Result object. (Default=1024) |
env
Use the env()
method to add to the list of environment variables. The env()
method supports keyword parameters.
You can call env()
more than once and the effect is additive.
>>> cmd = sh("echo").env(ENV1="a", ENV2="b").env(ENV2=3)
>>> cmd.options.env
{'ENV1': 'a', 'ENV2': '3'}
Use the env
option with set()
when you want to replace all the environment variables.
input, output, error
When you apply a redirection operator to a Command
or CmdContext
, the redirection targets
are also stored in the Options
object. To change these, use the .stdin()
, .stdout()
, or .stderr()
methods or the redirection operator |
.
Option | Description |
---|---|
input | The redirection target for standard input. |
input_close | True if standard input should be closed after the process is launched. |
output | The redirection target for standard output. |
output_append | True if standard output should be open for append. |
output_close | True if standard output should be closed after the process is launched. |
error | The redirection target for standard error. |
error_append | True if standard error should be open for append. |
error_close | True if standard error should be closed after the process is launched. |
Type Checking
Shellous fully supports PEP 484 type hints.
Commands
Commands are generic on the return type, either str
or Result
. You will specify the
type of a command object as Command[str]
or Command[Result]
.
Use the result
modifier to obtain a Command[Result]
from a Command[str]
.
from shellous import sh, Command, Result
cmd1: Command[str] = sh("echo", "abc")
# When you `await cmd1`, the result is a `str` object.
cmd2: Command[Result] = sh.result("echo", "abc")
# When you `await cmd2`, the result is a `Result` object.
CmdContext
The CmdContext
class is also generic on either str
or Result
.
from shellous import sh, CmdContext, Result
sh1: CmdContext[str] = sh.set(path="/bin:/usr/bin")
# When you use `sh1` to create commands, it produces `Command[str]` object with the given path.
sh2: CmdContext[Result] = sh.result.set(path="/bin:/usr/bin")
# When you use `sh2` to create commands, it produces `Command[Result]` objects with the given path.
Logging
For verbose logging, shellous supports a SHELLOUS_TRACE
environment variable. Set the
value of SHELLOUS_TRACE
to a comma-delimited list of options:
detail: Enables detailed logging used to trace the steps of running a command.
prompt: Enables logging in the
Prompt
class when controlling a program using send/expect.all: Enables all logging options.
Shellous uses the built-in Python logging
module. After enabling these options,
the shellous
logger will display log messages at the INFO
level.
Without these options enabled, Shellous generates almost no log messages.
1""" 2.. include:: ../README.md 3""" 4 5# pylint: disable=cyclic-import 6# pyright: reportUnusedImport=false 7 8__version__ = "0.38.0" 9 10import sys 11import warnings 12 13from .command import AuditEventInfo, CmdContext, Command, Options 14from .pipeline import Pipeline 15from .prompt import Prompt 16from .pty_util import cbreak, cooked, raw 17from .result import Result, ResultError 18from .runner import PipeRunner, Runner 19 20if sys.version_info[:3] in [(3, 10, 9), (3, 11, 1)]: 21 # Warn about these specific Python releases: 3.10.9 and 3.11.1 22 # These releases have a known race condition. 23 warnings.warn( # pragma: no cover 24 "Python 3.10.9 and Python 3.11.1 are unreliable with respect to " 25 + "asyncio subprocesses. Consider a newer Python release: 3.10.10+ " 26 + "or 3.11.2+. (https://github.com/python/cpython/issues/100133)", 27 RuntimeWarning, 28 ) 29 30 31sh: CmdContext[str] = CmdContext() 32"""`sh` is the default command context (`CmdContext`). 33 34Use `sh` to create commands or new command contexts. 35 36```python 37from shellous import sh 38result = await sh("echo", "hello") 39``` 40""" 41 42__all__ = [ 43 "sh", 44 "CmdContext", 45 "Command", 46 "Options", 47 "Pipeline", 48 "Prompt", 49 "cbreak", 50 "cooked", 51 "raw", 52 "Result", 53 "ResultError", 54 "Runner", 55 "PipeRunner", 56 "AuditEventInfo", 57]
sh
is the default command context (CmdContext
).
Use sh
to create commands or new command contexts.
from shellous import sh
result = await sh("echo", "hello")
278@dataclass(frozen=True) 279class CmdContext(Generic[_RT]): 280 """Concrete class for an immutable execution context.""" 281 282 CAPTURE: ClassVar[Redirect] = Redirect.CAPTURE 283 "Capture and read/write stream manually." 284 285 DEVNULL: ClassVar[Redirect] = Redirect.DEVNULL 286 "Redirect to /dev/null." 287 288 INHERIT: ClassVar[Redirect] = Redirect.INHERIT 289 "Redirect to same place as existing stdin/stderr/stderr." 290 291 STDOUT: ClassVar[Redirect] = Redirect.STDOUT 292 "Redirect stderr to same place as stdout." 293 294 BUFFER: ClassVar[Redirect] = Redirect.BUFFER 295 "Redirect output to a buffer in the Result object. This is the default for stdout/stderr." 296 297 options: Options = field(default_factory=Options) 298 "Default command options." 299 300 def stdin( 301 self, 302 input_: Any, 303 *, 304 close: bool = False, 305 ) -> "CmdContext[_RT]": 306 "Return new context with updated `input` settings." 307 new_options = self.options.set_stdin(input_, close) 308 return CmdContext(new_options) 309 310 def stdout( 311 self, 312 output: Any, 313 *, 314 append: bool = False, 315 close: bool = False, 316 ) -> "CmdContext[_RT]": 317 "Return new context with updated `output` settings." 318 new_options = self.options.set_stdout(output, append, close) 319 return CmdContext(new_options) 320 321 def stderr( 322 self, 323 error: Any, 324 *, 325 append: bool = False, 326 close: bool = False, 327 ) -> "CmdContext[_RT]": 328 "Return new context with updated `error` settings." 329 new_options = self.options.set_stderr(error, append, close) 330 return CmdContext(new_options) 331 332 def env(self, **kwds: Any) -> "CmdContext[_RT]": 333 """Return new context with augmented environment.""" 334 new_options = self.options.add_env(kwds) 335 return CmdContext(new_options) 336 337 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 338 self, 339 *, 340 path: Unset[Optional[str]] = _UNSET, 341 env: Unset[dict[str, Any]] = _UNSET, 342 inherit_env: Unset[bool] = _UNSET, 343 encoding: Unset[str] = _UNSET, 344 _return_result: Unset[bool] = _UNSET, 345 _catch_cancelled_error: Unset[bool] = _UNSET, 346 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 347 timeout: Unset[Optional[float]] = _UNSET, 348 cancel_timeout: Unset[float] = _UNSET, 349 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 350 alt_name: Unset[Optional[str]] = _UNSET, 351 pass_fds: Unset[Iterable[int]] = _UNSET, 352 pass_fds_close: Unset[bool] = _UNSET, 353 _writable: Unset[bool] = _UNSET, 354 _start_new_session: Unset[bool] = _UNSET, 355 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 356 pty: Unset[PtyAdapterOrBool] = _UNSET, 357 close_fds: Unset[bool] = _UNSET, 358 audit_callback: Unset[_AuditFnT] = _UNSET, 359 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 360 error_limit: Unset[Optional[int]] = _UNSET, 361 ) -> "CmdContext[_RT]": 362 """Return new context with custom options set. 363 364 See `Command.set` for option reference. 365 """ 366 kwargs = locals() 367 del kwargs["self"] 368 if not encoding: 369 raise TypeError("invalid encoding") 370 return CmdContext(self.options.set(kwargs)) 371 372 def __call__(self, *args: Any) -> "Command[_RT]": 373 "Construct a new command." 374 return Command(coerce(args, self.options.coerce_arg), self.options) 375 376 @property 377 def result(self) -> "CmdContext[shellous.Result]": 378 "Set `_return_result` and `exit_codes`." 379 return cast( 380 CmdContext[shellous.Result], 381 self.set( 382 _return_result=True, 383 exit_codes=range(-255, 256), 384 ), 385 ) 386 387 def find_command(self, name: str) -> Optional[Path]: 388 """Find the command with the given name and return its filesystem path. 389 390 Return None if the command name is not found in the search path. 391 392 Use the `path` variable specified by the context if set. Otherwise, the 393 default behavior is to use the `PATH` environment variable with a 394 fallback to the value of `os.defpath`. 395 """ 396 result = self.options.which(name) 397 if not result: 398 return None 399 return Path(result)
Concrete class for an immutable execution context.
Capture and read/write stream manually.
Redirect to same place as existing stdin/stderr/stderr.
Redirect stderr to same place as stdout.
Redirect output to a buffer in the Result object. This is the default for stdout/stderr.
300 def stdin( 301 self, 302 input_: Any, 303 *, 304 close: bool = False, 305 ) -> "CmdContext[_RT]": 306 "Return new context with updated `input` settings." 307 new_options = self.options.set_stdin(input_, close) 308 return CmdContext(new_options)
Return new context with updated input
settings.
310 def stdout( 311 self, 312 output: Any, 313 *, 314 append: bool = False, 315 close: bool = False, 316 ) -> "CmdContext[_RT]": 317 "Return new context with updated `output` settings." 318 new_options = self.options.set_stdout(output, append, close) 319 return CmdContext(new_options)
Return new context with updated output
settings.
321 def stderr( 322 self, 323 error: Any, 324 *, 325 append: bool = False, 326 close: bool = False, 327 ) -> "CmdContext[_RT]": 328 "Return new context with updated `error` settings." 329 new_options = self.options.set_stderr(error, append, close) 330 return CmdContext(new_options)
Return new context with updated error
settings.
332 def env(self, **kwds: Any) -> "CmdContext[_RT]": 333 """Return new context with augmented environment.""" 334 new_options = self.options.add_env(kwds) 335 return CmdContext(new_options)
Return new context with augmented environment.
337 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 338 self, 339 *, 340 path: Unset[Optional[str]] = _UNSET, 341 env: Unset[dict[str, Any]] = _UNSET, 342 inherit_env: Unset[bool] = _UNSET, 343 encoding: Unset[str] = _UNSET, 344 _return_result: Unset[bool] = _UNSET, 345 _catch_cancelled_error: Unset[bool] = _UNSET, 346 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 347 timeout: Unset[Optional[float]] = _UNSET, 348 cancel_timeout: Unset[float] = _UNSET, 349 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 350 alt_name: Unset[Optional[str]] = _UNSET, 351 pass_fds: Unset[Iterable[int]] = _UNSET, 352 pass_fds_close: Unset[bool] = _UNSET, 353 _writable: Unset[bool] = _UNSET, 354 _start_new_session: Unset[bool] = _UNSET, 355 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 356 pty: Unset[PtyAdapterOrBool] = _UNSET, 357 close_fds: Unset[bool] = _UNSET, 358 audit_callback: Unset[_AuditFnT] = _UNSET, 359 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 360 error_limit: Unset[Optional[int]] = _UNSET, 361 ) -> "CmdContext[_RT]": 362 """Return new context with custom options set. 363 364 See `Command.set` for option reference. 365 """ 366 kwargs = locals() 367 del kwargs["self"] 368 if not encoding: 369 raise TypeError("invalid encoding") 370 return CmdContext(self.options.set(kwargs))
Return new context with custom options set.
See Command.set
for option reference.
376 @property 377 def result(self) -> "CmdContext[shellous.Result]": 378 "Set `_return_result` and `exit_codes`." 379 return cast( 380 CmdContext[shellous.Result], 381 self.set( 382 _return_result=True, 383 exit_codes=range(-255, 256), 384 ), 385 )
Set _return_result
and exit_codes
.
387 def find_command(self, name: str) -> Optional[Path]: 388 """Find the command with the given name and return its filesystem path. 389 390 Return None if the command name is not found in the search path. 391 392 Use the `path` variable specified by the context if set. Otherwise, the 393 default behavior is to use the `PATH` environment variable with a 394 fallback to the value of `os.defpath`. 395 """ 396 result = self.options.which(name) 397 if not result: 398 return None 399 return Path(result)
Find the command with the given name and return its filesystem path.
Return None if the command name is not found in the search path.
Use the path
variable specified by the context if set. Otherwise, the
default behavior is to use the PATH
environment variable with a
fallback to the value of os.defpath
.
402@dataclass(frozen=True) 403class Command(Generic[_RT]): 404 """A Command instance is lightweight and immutable object that specifies the 405 arguments and options used to run a program. Commands do not do anything 406 until they are awaited. 407 408 Commands are always created by a CmdContext. 409 410 ``` 411 from shellous import sh 412 413 # Create a new command from the context. 414 echo = sh("echo", "hello, world") 415 416 # Run the command. 417 result = await echo 418 ``` 419 """ 420 421 args: "tuple[Union[str, bytes, os.PathLike[Any], Command[Any], shellous.Pipeline[Any]], ...]" 422 "Command arguments including the program name as first argument." 423 424 options: Options 425 "Command options." 426 427 def __post_init__(self) -> None: 428 "Validate the command." 429 if len(self.args) == 0: 430 raise ValueError("Command must include program name") 431 432 @property 433 def name(self) -> str: 434 """Returns the name of the program being run. 435 436 Names longer than 31 characters are truncated. If `alt_name` option 437 is set, return that instead. 438 """ 439 if self.options.alt_name: 440 return self.options.alt_name 441 name = str(self.args[0]) 442 if len(name) > 31: 443 return f"...{name[-31:]}" 444 return name 445 446 def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]": 447 "Pass `input` to command's standard input." 448 new_options = self.options.set_stdin(input_, close) 449 return Command(self.args, new_options) 450 451 def stdout( 452 self, 453 output: Any, 454 *, 455 append: bool = False, 456 close: bool = False, 457 ) -> "Command[_RT]": 458 "Redirect standard output to `output`." 459 new_options = self.options.set_stdout(output, append, close) 460 return Command(self.args, new_options) 461 462 def stderr( 463 self, 464 error: Any, 465 *, 466 append: bool = False, 467 close: bool = False, 468 ) -> "Command[_RT]": 469 "Redirect standard error to `error`." 470 new_options = self.options.set_stderr(error, append, close) 471 return Command(self.args, new_options) 472 473 def env(self, **kwds: Any) -> "Command[_RT]": 474 """Return new command with augmented environment. 475 476 The changes to the environment variables made by this method are 477 additive. For example, calling `cmd.env(A=1).env(B=2)` produces a 478 command with the environment set to `{"A": "1", "B": "2"}`. 479 480 To clear the environment, use the `cmd.set(env={})` method. 481 """ 482 new_options = self.options.add_env(kwds) 483 return Command(self.args, new_options) 484 485 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 486 self, 487 *, 488 path: Unset[Optional[str]] = _UNSET, 489 env: Unset[dict[str, Any]] = _UNSET, 490 inherit_env: Unset[bool] = _UNSET, 491 encoding: Unset[str] = _UNSET, 492 _return_result: Unset[bool] = _UNSET, 493 _catch_cancelled_error: Unset[bool] = _UNSET, 494 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 495 timeout: Unset[Optional[float]] = _UNSET, 496 cancel_timeout: Unset[float] = _UNSET, 497 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 498 alt_name: Unset[Optional[str]] = _UNSET, 499 pass_fds: Unset[Iterable[int]] = _UNSET, 500 pass_fds_close: Unset[bool] = _UNSET, 501 _writable: Unset[bool] = _UNSET, 502 _start_new_session: Unset[bool] = _UNSET, 503 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 504 pty: Unset[PtyAdapterOrBool] = _UNSET, 505 close_fds: Unset[bool] = _UNSET, 506 audit_callback: Unset[_AuditFnT] = _UNSET, 507 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 508 error_limit: Unset[Optional[int]] = _UNSET, 509 ) -> "Command[_RT]": 510 """Return new command with custom options set. 511 512 **path** (str | None) default=None<br> 513 Search path for locating command executable. By default, `path` is None 514 which causes shellous to rely on the `PATH` environment variable. 515 516 **env** (dict[str, str]) default={}<br> 517 Set the environment variables for the subprocess. If `inherit_env` is 518 True, the subprocess will also inherit the environment variables 519 specified by the parent process. 520 521 Using `set(env=...)` will replace all environment variables using the 522 dictionary argument. You can also use the `env(...)` method to modify 523 the existing environment incrementally. 524 525 **inherit_env** (bool) default=True<br> 526 Subprocess should inherit the parent process environment. If this is 527 False, the subprocess will only have environment variables specified 528 by `Command.env`. If `inherit_env` is True, the parent process 529 environment is augmented/overridden by any variables specified in 530 `Command.env`. 531 532 **encoding** (str) default="utf-8"<br> 533 String encoding to use for subprocess input/output. To specify `errors`, 534 append it after a space. For example, use "utf-8 replace" to specify 535 "utf-8" with errors "replace". 536 537 **_return_result** (bool) default=False<br> 538 When True, return a `Result` object instead of the standard output. 539 Private API -- use the `result` modifier instead. 540 541 **_catch_cancelled_error** (bool) default=False<br> 542 When True, raise a `ResultError` when the command is cancelled. 543 Private API -- used internally by PipeRunner. 544 545 **exit_codes** (set[int] | None) default=None<br> 546 Set of allowed exit codes that will not raise a `ResultError`. By default, 547 `exit_codes` is `None` which indicates that 0 is the only valid exit 548 status. Any other exit status will raise a `ResultError`. In addition to 549 sets of integers, you can use a `range` object, e.g. `range(256)` for 550 any positive exit status. 551 552 **timeout** (float | None) default=None<br> 553 Timeout in seconds to wait before we cancel the process. The timer 554 begins immediately after the process is launched. This differs from 555 using `asyncio.wait_for` which includes the process launch time also. 556 If timeout is None (the default), there is no timeout. 557 558 **cancel_timeout** (float) default=3.0 seconds<br> 559 Timeout in seconds to wait for a process to exit after sending it a 560 `cancel_signal`. If the process does not exit after waiting for 561 `cancel_timeout` seconds, we send a kill signal to the process. 562 563 **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br> 564 Signal sent to a process when it is cancelled. If `cancel_signal` is 565 None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on 566 Windows. 567 568 **alt_name** (str| None) default=None<br> 569 Alternative name of the command displayed in logs. Used to resolve 570 ambiguity when the actual command name is a scripting language. 571 572 **pass_fds** (Iterable[int]) default=()<br> 573 Specify open file descriptors to pass to the subprocess. 574 575 **pass_fds_close** (bool) default=False<br> 576 Close the file descriptors in `pass_fds` immediately in the current 577 process immediately after launching the subprocess. 578 579 **_writable** (bool) default=False<br> 580 Used to indicate process substitution is writing. 581 Private API -- use the `writable` modifier instead. 582 583 **_start_new_session** (bool) default=False<br> 584 Private API -- provided for testing purposes only. 585 586 **_preexec_fn** (Callable() | None) default=None<br> 587 Private API -- provided for testing purposes only. 588 589 **pty** (bool | Callable(int)) default=False<br> 590 If True, use a pseudo-terminal (pty) to control the child process. 591 If `pty` is set to a callable, the function must take one int argument 592 for the child side of the pty. The function is called to set the child 593 pty's termios settings before spawning the subprocess. 594 595 shellous provides three utility functions: `shellous.cooked`, 596 `shellous.raw` and `shellous.cbreak` that can be used as arguments to 597 the `pty` option. 598 599 **close_fds** (bool) default=True<br> 600 Close all unnecessary file descriptors in the child process. This 601 defaults to True to align with the default behavior of the subprocess 602 module. 603 604 **audit_callback** (Callable(phase, info) | None) default=None<br> 605 Specify a function to call as the command execution goes through its 606 lifecycle. `audit_callback` is a function called with two arguments, 607 *phase* and *info*. 608 609 *phase* can be one of three values: 610 611 "start": The process is about to start. 612 613 "stop": The process finally stopped. 614 615 "signal": The process is being sent a signal. 616 617 *info* is a dictionary providing more information for the callback. The 618 following keys are currently defined: 619 620 "runner" (Runner): Reference to the Runner object. 621 622 "failure" (str): When phase is "stop", optional string with the 623 name of the exception from launching the process. 624 625 "signal" (str): When phase is "signal", the signal name/number 626 sent to the process, e.g. "SIGTERM". 627 628 The primary use case for `audit_callback` is measuring how long each 629 command takes to run and exporting this information to a metrics 630 framework like Prometheus. 631 632 **coerce_arg** (Callable(arg) | None) default=None<br> 633 Specify a function to call on each command line argument. This function 634 can specify how to coerce unsupported argument types (e.g. dict) to 635 a sequence of strings. This function should return the original value 636 unchanged if there is no conversion needed. 637 638 **error_limit** (int | None) default=1024<br> 639 Specify the number of bytes to store when redirecting STDERR to BUFFER. 640 After reading up to `error_limit` bytes, shellous will continue to 641 read from stderr, but will not store any additional bytes. Setting 642 `error_limit` only affects the internal BUFFER; it has no effect when 643 using other redirection types. 644 645 """ 646 kwargs = locals() 647 del kwargs["self"] 648 if not encoding: 649 raise TypeError("invalid encoding") 650 return Command(self.args, self.options.set(kwargs)) 651 652 def _replace_args(self, new_args: Sequence[Any]) -> "Command[_RT]": 653 """Return new command with arguments replaced by `new_args`. 654 655 Arguments are NOT type-checked by the context. Program name must be the 656 exact same object. 657 """ 658 assert new_args 659 assert new_args[0] is self.args[0] 660 return Command(tuple(new_args), self.options) 661 662 def coro( 663 self, 664 *, 665 _run_future: Optional[asyncio.Future[Runner]] = None, 666 ) -> Coroutine[Any, Any, _RT]: 667 "Return coroutine object to run awaitable." 668 return cast( 669 Coroutine[Any, Any, _RT], 670 Runner.run_command(self, _run_future=_run_future), 671 ) 672 673 @contextlib.asynccontextmanager 674 async def prompt( 675 self, 676 prompt: Union[str, list[str], re.Pattern[str], None] = None, 677 *, 678 timeout: Optional[float] = None, 679 normalize_newlines: bool = False, 680 ) -> AsyncIterator[Prompt]: 681 """Run command using the send/expect API. 682 683 This method should be called using `async with`. It returns a `Prompt` 684 object with send() and expect() methods. 685 686 You can optionally set a default `prompt`. This is used by `expect()` 687 when you don't provide another value. 688 689 Use the `timeout` parameter to set the default timeout for operations. 690 691 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 692 This conversion is done before matching with `expect()`. This option 693 does not affect strings sent with `send()`. 694 """ 695 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 696 697 cli = None 698 try: 699 async with Runner(cmd) as run: 700 cli = Prompt( 701 run, 702 default_prompt=prompt, 703 default_timeout=timeout, 704 normalize_newlines=normalize_newlines, 705 ) 706 yield cli 707 cli.close() 708 finally: 709 if cli is not None: 710 cli._finish_() # pyright: ignore[reportPrivateUsage] 711 712 def __await__(self) -> "Generator[Any, None, _RT]": 713 "Run process and return the standard output." 714 return self.coro().__await__() 715 716 async def __aenter__(self) -> Runner: 717 "Enter the async context manager." 718 return await context_aenter(self, Runner(self)) 719 720 async def __aexit__( 721 self, 722 exc_type: Optional[type[BaseException]], 723 exc_value: Optional[BaseException], 724 exc_tb: Optional[TracebackType], 725 ) -> Optional[bool]: 726 "Exit the async context manager." 727 return await context_aexit(self, exc_type, exc_value, exc_tb) 728 729 def __aiter__(self) -> AsyncIterator[str]: 730 "Return async iterator to iterate over output lines." 731 return aiter_preflight(self)._readlines() 732 733 async def _readlines(self) -> AsyncIterator[str]: 734 "Async generator to iterate over lines." 735 async with Runner(self) as run: 736 async for line in run: 737 yield line 738 739 def __call__(self, *args: Any) -> "Command[_RT]": 740 "Apply more arguments to the end of the command." 741 if not args: 742 return self 743 new_args = self.args + coerce(args, self.options.coerce_arg) 744 return Command(new_args, self.options) 745 746 def __str__(self) -> str: 747 """Return string representation for command. 748 749 Display the full name of the command only. Don't include arguments or 750 environment variables. 751 """ 752 return str(self.args[0]) 753 754 @overload 755 def __or__(self, rhs: StdoutType) -> "Command[_RT]": ... # pragma: no cover 756 757 @overload 758 def __or__( 759 self, rhs: "Command[str]" 760 ) -> "shellous.Pipeline[str]": ... # pragma: no cover 761 762 @overload 763 def __or__( 764 self, 765 rhs: "Command[shellous.Result]", 766 ) -> "shellous.Pipeline[shellous.Result]": ... # pragma: no cover 767 768 def __or__(self, rhs: Any) -> Any: 769 "Bitwise or operator is used to build pipelines." 770 if isinstance(rhs, STDOUT_TYPES): 771 return self.stdout(rhs) 772 return shellous.Pipeline.create(self) | rhs 773 774 def __ror__(self, lhs: StdinType) -> "Command[_RT]": 775 "Bitwise or operator is used to build pipelines." 776 if isinstance(lhs, STDIN_TYPES): # pyright: ignore[reportUnnecessaryIsInstance] 777 return self.stdin(lhs) 778 return NotImplemented 779 780 def __rshift__(self, rhs: StdoutType) -> "Command[_RT]": 781 "Right shift operator is used to build pipelines." 782 if isinstance( 783 rhs, STDOUT_TYPES 784 ): # pyright: ignore[reportUnnecessaryIsInstance] 785 return self.stdout(rhs, append=True) 786 return NotImplemented 787 788 @property 789 def writable(self) -> "Command[_RT]": 790 "Set `writable` to True." 791 return self.set(_writable=True) 792 793 @property 794 def result(self) -> "Command[shellous.Result]": 795 "Set `_return_result` and `exit_codes`." 796 return cast( 797 Command[shellous.Result], 798 self.set(_return_result=True, exit_codes=range(-255, 256)), 799 )
A Command instance is lightweight and immutable object that specifies the arguments and options used to run a program. Commands do not do anything until they are awaited.
Commands are always created by a CmdContext.
from shellous import sh
# Create a new command from the context.
echo = sh("echo", "hello, world")
# Run the command.
result = await echo
Command arguments including the program name as first argument.
432 @property 433 def name(self) -> str: 434 """Returns the name of the program being run. 435 436 Names longer than 31 characters are truncated. If `alt_name` option 437 is set, return that instead. 438 """ 439 if self.options.alt_name: 440 return self.options.alt_name 441 name = str(self.args[0]) 442 if len(name) > 31: 443 return f"...{name[-31:]}" 444 return name
Returns the name of the program being run.
Names longer than 31 characters are truncated. If alt_name
option
is set, return that instead.
446 def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]": 447 "Pass `input` to command's standard input." 448 new_options = self.options.set_stdin(input_, close) 449 return Command(self.args, new_options)
Pass input
to command's standard input.
451 def stdout( 452 self, 453 output: Any, 454 *, 455 append: bool = False, 456 close: bool = False, 457 ) -> "Command[_RT]": 458 "Redirect standard output to `output`." 459 new_options = self.options.set_stdout(output, append, close) 460 return Command(self.args, new_options)
Redirect standard output to output
.
462 def stderr( 463 self, 464 error: Any, 465 *, 466 append: bool = False, 467 close: bool = False, 468 ) -> "Command[_RT]": 469 "Redirect standard error to `error`." 470 new_options = self.options.set_stderr(error, append, close) 471 return Command(self.args, new_options)
Redirect standard error to error
.
473 def env(self, **kwds: Any) -> "Command[_RT]": 474 """Return new command with augmented environment. 475 476 The changes to the environment variables made by this method are 477 additive. For example, calling `cmd.env(A=1).env(B=2)` produces a 478 command with the environment set to `{"A": "1", "B": "2"}`. 479 480 To clear the environment, use the `cmd.set(env={})` method. 481 """ 482 new_options = self.options.add_env(kwds) 483 return Command(self.args, new_options)
Return new command with augmented environment.
The changes to the environment variables made by this method are
additive. For example, calling cmd.env(A=1).env(B=2)
produces a
command with the environment set to {"A": "1", "B": "2"}
.
To clear the environment, use the cmd.set(env={})
method.
485 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 486 self, 487 *, 488 path: Unset[Optional[str]] = _UNSET, 489 env: Unset[dict[str, Any]] = _UNSET, 490 inherit_env: Unset[bool] = _UNSET, 491 encoding: Unset[str] = _UNSET, 492 _return_result: Unset[bool] = _UNSET, 493 _catch_cancelled_error: Unset[bool] = _UNSET, 494 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 495 timeout: Unset[Optional[float]] = _UNSET, 496 cancel_timeout: Unset[float] = _UNSET, 497 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 498 alt_name: Unset[Optional[str]] = _UNSET, 499 pass_fds: Unset[Iterable[int]] = _UNSET, 500 pass_fds_close: Unset[bool] = _UNSET, 501 _writable: Unset[bool] = _UNSET, 502 _start_new_session: Unset[bool] = _UNSET, 503 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 504 pty: Unset[PtyAdapterOrBool] = _UNSET, 505 close_fds: Unset[bool] = _UNSET, 506 audit_callback: Unset[_AuditFnT] = _UNSET, 507 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 508 error_limit: Unset[Optional[int]] = _UNSET, 509 ) -> "Command[_RT]": 510 """Return new command with custom options set. 511 512 **path** (str | None) default=None<br> 513 Search path for locating command executable. By default, `path` is None 514 which causes shellous to rely on the `PATH` environment variable. 515 516 **env** (dict[str, str]) default={}<br> 517 Set the environment variables for the subprocess. If `inherit_env` is 518 True, the subprocess will also inherit the environment variables 519 specified by the parent process. 520 521 Using `set(env=...)` will replace all environment variables using the 522 dictionary argument. You can also use the `env(...)` method to modify 523 the existing environment incrementally. 524 525 **inherit_env** (bool) default=True<br> 526 Subprocess should inherit the parent process environment. If this is 527 False, the subprocess will only have environment variables specified 528 by `Command.env`. If `inherit_env` is True, the parent process 529 environment is augmented/overridden by any variables specified in 530 `Command.env`. 531 532 **encoding** (str) default="utf-8"<br> 533 String encoding to use for subprocess input/output. To specify `errors`, 534 append it after a space. For example, use "utf-8 replace" to specify 535 "utf-8" with errors "replace". 536 537 **_return_result** (bool) default=False<br> 538 When True, return a `Result` object instead of the standard output. 539 Private API -- use the `result` modifier instead. 540 541 **_catch_cancelled_error** (bool) default=False<br> 542 When True, raise a `ResultError` when the command is cancelled. 543 Private API -- used internally by PipeRunner. 544 545 **exit_codes** (set[int] | None) default=None<br> 546 Set of allowed exit codes that will not raise a `ResultError`. By default, 547 `exit_codes` is `None` which indicates that 0 is the only valid exit 548 status. Any other exit status will raise a `ResultError`. In addition to 549 sets of integers, you can use a `range` object, e.g. `range(256)` for 550 any positive exit status. 551 552 **timeout** (float | None) default=None<br> 553 Timeout in seconds to wait before we cancel the process. The timer 554 begins immediately after the process is launched. This differs from 555 using `asyncio.wait_for` which includes the process launch time also. 556 If timeout is None (the default), there is no timeout. 557 558 **cancel_timeout** (float) default=3.0 seconds<br> 559 Timeout in seconds to wait for a process to exit after sending it a 560 `cancel_signal`. If the process does not exit after waiting for 561 `cancel_timeout` seconds, we send a kill signal to the process. 562 563 **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br> 564 Signal sent to a process when it is cancelled. If `cancel_signal` is 565 None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on 566 Windows. 567 568 **alt_name** (str| None) default=None<br> 569 Alternative name of the command displayed in logs. Used to resolve 570 ambiguity when the actual command name is a scripting language. 571 572 **pass_fds** (Iterable[int]) default=()<br> 573 Specify open file descriptors to pass to the subprocess. 574 575 **pass_fds_close** (bool) default=False<br> 576 Close the file descriptors in `pass_fds` immediately in the current 577 process immediately after launching the subprocess. 578 579 **_writable** (bool) default=False<br> 580 Used to indicate process substitution is writing. 581 Private API -- use the `writable` modifier instead. 582 583 **_start_new_session** (bool) default=False<br> 584 Private API -- provided for testing purposes only. 585 586 **_preexec_fn** (Callable() | None) default=None<br> 587 Private API -- provided for testing purposes only. 588 589 **pty** (bool | Callable(int)) default=False<br> 590 If True, use a pseudo-terminal (pty) to control the child process. 591 If `pty` is set to a callable, the function must take one int argument 592 for the child side of the pty. The function is called to set the child 593 pty's termios settings before spawning the subprocess. 594 595 shellous provides three utility functions: `shellous.cooked`, 596 `shellous.raw` and `shellous.cbreak` that can be used as arguments to 597 the `pty` option. 598 599 **close_fds** (bool) default=True<br> 600 Close all unnecessary file descriptors in the child process. This 601 defaults to True to align with the default behavior of the subprocess 602 module. 603 604 **audit_callback** (Callable(phase, info) | None) default=None<br> 605 Specify a function to call as the command execution goes through its 606 lifecycle. `audit_callback` is a function called with two arguments, 607 *phase* and *info*. 608 609 *phase* can be one of three values: 610 611 "start": The process is about to start. 612 613 "stop": The process finally stopped. 614 615 "signal": The process is being sent a signal. 616 617 *info* is a dictionary providing more information for the callback. The 618 following keys are currently defined: 619 620 "runner" (Runner): Reference to the Runner object. 621 622 "failure" (str): When phase is "stop", optional string with the 623 name of the exception from launching the process. 624 625 "signal" (str): When phase is "signal", the signal name/number 626 sent to the process, e.g. "SIGTERM". 627 628 The primary use case for `audit_callback` is measuring how long each 629 command takes to run and exporting this information to a metrics 630 framework like Prometheus. 631 632 **coerce_arg** (Callable(arg) | None) default=None<br> 633 Specify a function to call on each command line argument. This function 634 can specify how to coerce unsupported argument types (e.g. dict) to 635 a sequence of strings. This function should return the original value 636 unchanged if there is no conversion needed. 637 638 **error_limit** (int | None) default=1024<br> 639 Specify the number of bytes to store when redirecting STDERR to BUFFER. 640 After reading up to `error_limit` bytes, shellous will continue to 641 read from stderr, but will not store any additional bytes. Setting 642 `error_limit` only affects the internal BUFFER; it has no effect when 643 using other redirection types. 644 645 """ 646 kwargs = locals() 647 del kwargs["self"] 648 if not encoding: 649 raise TypeError("invalid encoding") 650 return Command(self.args, self.options.set(kwargs))
Return new command with custom options set.
path (str | None) default=None
Search path for locating command executable. By default, path
is None
which causes shellous to rely on the PATH
environment variable.
env (dict[str, str]) default={}
Set the environment variables for the subprocess. If inherit_env
is
True, the subprocess will also inherit the environment variables
specified by the parent process.
Using set(env=...)
will replace all environment variables using the
dictionary argument. You can also use the env(...)
method to modify
the existing environment incrementally.
inherit_env (bool) default=True
Subprocess should inherit the parent process environment. If this is
False, the subprocess will only have environment variables specified
by Command.env
. If inherit_env
is True, the parent process
environment is augmented/overridden by any variables specified in
Command.env
.
encoding (str) default="utf-8"
String encoding to use for subprocess input/output. To specify errors
,
append it after a space. For example, use "utf-8 replace" to specify
"utf-8" with errors "replace".
_return_result (bool) default=False
When True, return a Result
object instead of the standard output.
Private API -- use the result
modifier instead.
_catch_cancelled_error (bool) default=False
When True, raise a ResultError
when the command is cancelled.
Private API -- used internally by PipeRunner.
exit_codes (set[int] | None) default=None
Set of allowed exit codes that will not raise a ResultError
. By default,
exit_codes
is None
which indicates that 0 is the only valid exit
status. Any other exit status will raise a ResultError
. In addition to
sets of integers, you can use a range
object, e.g. range(256)
for
any positive exit status.
timeout (float | None) default=None
Timeout in seconds to wait before we cancel the process. The timer
begins immediately after the process is launched. This differs from
using asyncio.wait_for
which includes the process launch time also.
If timeout is None (the default), there is no timeout.
cancel_timeout (float) default=3.0 seconds
Timeout in seconds to wait for a process to exit after sending it a
cancel_signal
. If the process does not exit after waiting for
cancel_timeout
seconds, we send a kill signal to the process.
cancel_signal (signals.Signal | None) default=signal.SIGTERM
Signal sent to a process when it is cancelled. If cancel_signal
is
None, send a SIGKILL
on Unix and SIGTERM
(TerminateProcess) on
Windows.
alt_name (str| None) default=None
Alternative name of the command displayed in logs. Used to resolve
ambiguity when the actual command name is a scripting language.
pass_fds (Iterable[int]) default=()
Specify open file descriptors to pass to the subprocess.
pass_fds_close (bool) default=False
Close the file descriptors in pass_fds
immediately in the current
process immediately after launching the subprocess.
_writable (bool) default=False
Used to indicate process substitution is writing.
Private API -- use the writable
modifier instead.
_start_new_session (bool) default=False
Private API -- provided for testing purposes only.
_preexec_fn (Callable() | None) default=None
Private API -- provided for testing purposes only.
pty (bool | Callable(int)) default=False
If True, use a pseudo-terminal (pty) to control the child process.
If pty
is set to a callable, the function must take one int argument
for the child side of the pty. The function is called to set the child
pty's termios settings before spawning the subprocess.
shellous provides three utility functions: shellous.cooked
,
shellous.raw
and shellous.cbreak
that can be used as arguments to
the pty
option.
close_fds (bool) default=True
Close all unnecessary file descriptors in the child process. This
defaults to True to align with the default behavior of the subprocess
module.
audit_callback (Callable(phase, info) | None) default=None
Specify a function to call as the command execution goes through its
lifecycle. audit_callback
is a function called with two arguments,
phase and info.
phase can be one of three values:
"start": The process is about to start.
"stop": The process finally stopped.
"signal": The process is being sent a signal.
info is a dictionary providing more information for the callback. The following keys are currently defined:
"runner" (Runner): Reference to the Runner object.
"failure" (str): When phase is "stop", optional string with the
name of the exception from launching the process.
"signal" (str): When phase is "signal", the signal name/number
sent to the process, e.g. "SIGTERM".
The primary use case for audit_callback
is measuring how long each
command takes to run and exporting this information to a metrics
framework like Prometheus.
coerce_arg (Callable(arg) | None) default=None
Specify a function to call on each command line argument. This function
can specify how to coerce unsupported argument types (e.g. dict) to
a sequence of strings. This function should return the original value
unchanged if there is no conversion needed.
error_limit (int | None) default=1024
Specify the number of bytes to store when redirecting STDERR to BUFFER.
After reading up to error_limit
bytes, shellous will continue to
read from stderr, but will not store any additional bytes. Setting
error_limit
only affects the internal BUFFER; it has no effect when
using other redirection types.
662 def coro( 663 self, 664 *, 665 _run_future: Optional[asyncio.Future[Runner]] = None, 666 ) -> Coroutine[Any, Any, _RT]: 667 "Return coroutine object to run awaitable." 668 return cast( 669 Coroutine[Any, Any, _RT], 670 Runner.run_command(self, _run_future=_run_future), 671 )
Return coroutine object to run awaitable.
673 @contextlib.asynccontextmanager 674 async def prompt( 675 self, 676 prompt: Union[str, list[str], re.Pattern[str], None] = None, 677 *, 678 timeout: Optional[float] = None, 679 normalize_newlines: bool = False, 680 ) -> AsyncIterator[Prompt]: 681 """Run command using the send/expect API. 682 683 This method should be called using `async with`. It returns a `Prompt` 684 object with send() and expect() methods. 685 686 You can optionally set a default `prompt`. This is used by `expect()` 687 when you don't provide another value. 688 689 Use the `timeout` parameter to set the default timeout for operations. 690 691 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 692 This conversion is done before matching with `expect()`. This option 693 does not affect strings sent with `send()`. 694 """ 695 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 696 697 cli = None 698 try: 699 async with Runner(cmd) as run: 700 cli = Prompt( 701 run, 702 default_prompt=prompt, 703 default_timeout=timeout, 704 normalize_newlines=normalize_newlines, 705 ) 706 yield cli 707 cli.close() 708 finally: 709 if cli is not None: 710 cli._finish_() # pyright: ignore[reportPrivateUsage]
Run command using the send/expect API.
This method should be called using async with
. It returns a Prompt
object with send() and expect() methods.
You can optionally set a default prompt
. This is used by expect()
when you don't provide another value.
Use the timeout
parameter to set the default timeout for operations.
Set normalize_newlines
to True to convert incoming CR and CR-LF to LF.
This conversion is done before matching with expect()
. This option
does not affect strings sent with send()
.
712 def __await__(self) -> "Generator[Any, None, _RT]": 713 "Run process and return the standard output." 714 return self.coro().__await__()
Run process and return the standard output.
716 async def __aenter__(self) -> Runner: 717 "Enter the async context manager." 718 return await context_aenter(self, Runner(self))
Enter the async context manager.
729 def __aiter__(self) -> AsyncIterator[str]: 730 "Return async iterator to iterate over output lines." 731 return aiter_preflight(self)._readlines()
Return async iterator to iterate over output lines.
94@dataclass(frozen=True) 95class Options: # pylint: disable=too-many-instance-attributes 96 "Concrete class for per-command options." 97 98 path: Optional[str] = None 99 "Optional search path to use instead of PATH environment variable." 100 101 env: Optional[EnvironmentDict] = field(default=None, repr=False) 102 "Additional environment variables for command." 103 104 inherit_env: bool = True 105 "True if subprocess should inherit the current environment variables." 106 107 input: _RedirectT = Redirect.DEFAULT 108 "Input object to bind to stdin." 109 110 input_close: bool = False 111 "True if input object should be closed after subprocess launch." 112 113 output: _RedirectT = Redirect.DEFAULT 114 "Output object to bind to stdout." 115 116 output_append: bool = False 117 "True if output object should be opened in append mode." 118 119 output_close: bool = False 120 "True if output object should be closed after subprocess launch." 121 122 error: _RedirectT = Redirect.DEFAULT 123 "Error object to bind to stderr." 124 125 error_append: bool = False 126 "True if error object should be opened in append mode." 127 128 error_close: bool = False 129 "True if error object should be closed after subprocess launch." 130 131 error_limit: Optional[int] = DEFAULT_ERROR_LIMIT 132 "Bytes of stderr to buffer in memory (`sh.BUFFER`). None means unlimited." 133 134 encoding: str = "utf-8" 135 "Specifies encoding of input/output." 136 137 _return_result: bool = False 138 "True if we should return `Result` object instead of the output text/bytes." 139 140 _catch_cancelled_error: bool = False 141 "True if we should raise `ResultError` after clean up from cancelled task." 142 143 exit_codes: Optional[Container[int]] = None 144 "Set of exit codes that do not raise a `ResultError`. None means {0}." 145 146 timeout: Optional[float] = None 147 "Timeout in seconds that we wait before cancelling the process." 148 149 cancel_timeout: float = 3.0 150 "Timeout in seconds that we wait for a cancelled process to terminate." 151 152 cancel_signal: Optional[signal.Signals] = signal.SIGTERM 153 "The signal sent to terminate a cancelled process." 154 155 alt_name: Optional[str] = None 156 "Alternate name for the command to use when logging." 157 158 pass_fds: Iterable[int] = () 159 "File descriptors to pass to the command." 160 161 pass_fds_close: bool = False 162 "True if pass_fds should be closed after subprocess launch." 163 164 _writable: bool = False 165 "True if using process substitution in write mode." 166 167 _start_new_session: bool = False 168 "True if child process should start a new session with `setsid` call." 169 170 _preexec_fn: _PreexecFnT = None 171 "Function to call in child process after fork from parent." 172 173 pty: PtyAdapterOrBool = False 174 "True if child process should be controlled using a pseudo-terminal (pty)." 175 176 close_fds: bool = True 177 "True if child process should close all file descriptors." 178 179 audit_callback: _AuditFnT = None 180 "Function called to audit stages of process execution." 181 182 coerce_arg: _CoerceArgFnT = None 183 "Function called to coerce top level arguments." 184 185 def runtime_env(self) -> Optional[dict[str, str]]: 186 "@private Return our `env` merged with the global environment." 187 if self.inherit_env: 188 if not self.env: 189 return None 190 return os.environ | self.env 191 192 if self.env: 193 return dict(self.env) # make copy of dict 194 return {} 195 196 def set_stdin(self, input_: Any, close: bool) -> "Options": 197 "@private Return new options with `input` configured." 198 if input_ is None: 199 raise TypeError("invalid stdin") 200 201 if input_ == Redirect.STDOUT: 202 raise ValueError("STDOUT is only supported by stderr") 203 204 return dataclasses.replace( 205 self, 206 input=input_, 207 input_close=close, 208 ) 209 210 def set_stdout(self, output: Any, append: bool, close: bool) -> "Options": 211 "@private Return new options with `output` configured." 212 if output is None: 213 raise TypeError("invalid stdout") 214 215 if output == Redirect.STDOUT: 216 raise ValueError("STDOUT is only supported by stderr") 217 218 return dataclasses.replace( 219 self, 220 output=output, 221 output_append=append, 222 output_close=close, 223 ) 224 225 def set_stderr(self, error: Any, append: bool, close: bool) -> "Options": 226 "@private Return new options with `error` configured." 227 if error is None: 228 raise TypeError("invalid stderr") 229 230 return dataclasses.replace( 231 self, 232 error=error, 233 error_append=append, 234 error_close=close, 235 ) 236 237 def add_env(self, updates: dict[str, Any]) -> "Options": 238 "@private Return new options with augmented environment." 239 new_env = EnvironmentDict(self.env, updates) 240 return dataclasses.replace(self, env=new_env) 241 242 def set(self, kwds: dict[str, Any]) -> "Options": 243 """@private Return new options with given properties updated. 244 245 See `Command.set` for option reference. 246 """ 247 kwds = {key: value for key, value in kwds.items() if value is not _UNSET} 248 if "env" in kwds: 249 # The "env" property is stored as an `EnvironmentDict`. 250 new_env = kwds["env"] 251 if new_env: 252 kwds["env"] = EnvironmentDict(None, new_env) 253 else: 254 kwds["env"] = None 255 return dataclasses.replace(self, **kwds) 256 257 @overload 258 def which(self, name: bytes) -> Optional[bytes]: 259 "@private Find the command with the given name and return its path." 260 261 @overload 262 def which( 263 self, name: Union[str, os.PathLike[Any]] 264 ) -> Optional[Union[str, os.PathLike[Any]]]: 265 "@private Find the command with the given name and return its path." 266 267 def which( 268 self, name: Union[str, bytes, os.PathLike[Any]] 269 ) -> Optional[Union[str, bytes, os.PathLike[Any]]]: 270 "@private Find the command with the given name and return its path." 271 return shutil.which(name, path=self.path)
Concrete class for per-command options.
Bytes of stderr to buffer in memory (sh.BUFFER
). None means unlimited.
Set of exit codes that do not raise a ResultError
. None means {0}.
The signal sent to terminate a cancelled process.
True if child process should be controlled using a pseudo-terminal (pty).
40@dataclass(frozen=True) 41class Pipeline(Generic[_RT]): 42 "A Pipeline is a sequence of commands." 43 44 commands: tuple[shellous.Command[_RT], ...] = () 45 46 @staticmethod 47 def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]": 48 "Create a new Pipeline." 49 return Pipeline(commands) 50 51 def __post_init__(self) -> None: 52 "Validate the pipeline." 53 if len(self.commands) == 0: 54 raise ValueError("Pipeline must include at least one command") 55 56 @property 57 def name(self) -> str: 58 "Return the name of the pipeline." 59 return "|".join(cmd.name for cmd in self.commands) 60 61 @property 62 def options(self) -> shellous.Options: 63 "Return the last command's options." 64 return self.commands[-1].options 65 66 def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]": 67 "Set stdin on the first command of the pipeline." 68 new_first = self.commands[0].stdin(input_, close=close) 69 new_commands = (new_first,) + self.commands[1:] 70 return dataclasses.replace(self, commands=new_commands) 71 72 def stdout( 73 self, 74 output: Any, 75 *, 76 append: bool = False, 77 close: bool = False, 78 ) -> "Pipeline[_RT]": 79 "Set stdout on the last command of the pipeline." 80 new_last = self.commands[-1].stdout(output, append=append, close=close) 81 new_commands = self.commands[0:-1] + (new_last,) 82 return dataclasses.replace(self, commands=new_commands) 83 84 def stderr( 85 self, 86 error: Any, 87 *, 88 append: bool = False, 89 close: bool = False, 90 ) -> "Pipeline[_RT]": 91 "Set stderr on the last command of the pipeline." 92 new_last = self.commands[-1].stderr(error, append=append, close=close) 93 new_commands = self.commands[0:-1] + (new_last,) 94 return dataclasses.replace(self, commands=new_commands) 95 96 def _set(self, **kwds: Any): 97 "Set options on last command of the pipeline." 98 new_last = self.commands[-1].set(**kwds) 99 new_commands = self.commands[0:-1] + (new_last,) 100 return dataclasses.replace(self, commands=new_commands) 101 102 def coro(self) -> Coroutine[Any, Any, _RT]: 103 "Return coroutine object for pipeline." 104 return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self)) 105 106 @contextlib.asynccontextmanager 107 async def prompt( 108 self, 109 prompt: Union[str, list[str], re.Pattern[str], None] = None, 110 *, 111 timeout: Optional[float] = None, 112 normalize_newlines: bool = False, 113 ) -> AsyncIterator[Prompt]: 114 """Run pipeline using the send/expect API. 115 116 This method should be called using `async with`. It returns a `Prompt` 117 object with send() and expect() methods. 118 119 You can optionally set a default `prompt`. This is used by `expect()` 120 when you don't provide another value. 121 122 Use the `timeout` parameter to set the default timeout for operations. 123 124 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 125 This conversion is done before matching with `expect()`. This option 126 does not affect strings sent with `send()`. 127 """ 128 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 129 130 cli = None 131 try: 132 async with PipeRunner(cmd, capturing=True) as run: 133 cli = Prompt( 134 run, 135 default_prompt=prompt, 136 default_timeout=timeout, 137 normalize_newlines=normalize_newlines, 138 ) 139 yield cli 140 cli.close() 141 finally: 142 if cli is not None: 143 cli._finish_() # pyright: ignore[reportPrivateUsage] 144 145 def _add(self, item: Union["shellous.Command[Any]", "Pipeline[Any]"]): 146 if isinstance(item, shellous.Command): 147 return dataclasses.replace(self, commands=(*self.commands, item)) 148 return dataclasses.replace( 149 self, 150 commands=self.commands + item.commands, 151 ) 152 153 def __len__(self) -> int: 154 "Return number of commands in pipe." 155 return len(self.commands) 156 157 def __getitem__(self, key: int) -> shellous.Command[Any]: 158 "Return specified command by index." 159 return self.commands[key] 160 161 def __call__(self, *args: Any) -> "Pipeline[_RT]": 162 if args: 163 raise TypeError("Calling pipeline with 1 or more arguments.") 164 return self 165 166 @overload 167 def __or__( 168 self, rhs: "Union[shellous.Command[shellous.Result], Pipeline[shellous.Result]]" 169 ) -> "Pipeline[shellous.Result]": ... # pragma: no cover 170 171 @overload 172 def __or__( 173 self, rhs: "Union[shellous.Command[str], Pipeline[str]]" 174 ) -> "Pipeline[str]": ... # pragma: no cover 175 176 @overload 177 def __or__(self, rhs: StdoutType) -> "Pipeline[_RT]": ... # pragma: no cover 178 179 def __or__(self, rhs: Any) -> "Pipeline[Any]": 180 if isinstance(rhs, (shellous.Command, Pipeline)): 181 return self._add(rhs) # pyright: ignore[reportUnknownArgumentType] 182 if isinstance(rhs, STDOUT_TYPES): 183 return self.stdout(rhs) 184 if isinstance(rhs, (str, bytes)): 185 raise TypeError( 186 f"{type(rhs)!r} unsupported for | output (Use 'pathlib.Path')" 187 ) 188 return NotImplemented 189 190 def __ror__(self, lhs: StdinType) -> "Pipeline[_RT]": 191 if isinstance(lhs, STDIN_TYPES): # pyright: ignore[reportUnnecessaryIsInstance] 192 return self.stdin(lhs) 193 return NotImplemented 194 195 def __rshift__(self, rhs: StdoutType) -> "Pipeline[_RT]": 196 if isinstance( 197 rhs, STDOUT_TYPES 198 ): # pyright: ignore[reportUnnecessaryIsInstance] 199 return self.stdout(rhs, append=True) 200 if isinstance(rhs, (str, bytes)): 201 raise TypeError( 202 f"{type(rhs)!r} unsupported for >> output (Use 'pathlib.Path')" 203 ) 204 return NotImplemented 205 206 @property 207 def writable(self) -> "Pipeline[_RT]": 208 "Set writable=True option on last command of pipeline." 209 return self._set(_writable=True) 210 211 @property 212 def result(self) -> "Pipeline[shellous.Result]": 213 "Set `_return_result` and `exit_codes`." 214 return cast( 215 Pipeline[shellous.Result], 216 self._set(_return_result=True, exit_codes=range(-255, 256)), 217 ) 218 219 def __await__(self) -> "Generator[Any, None, _RT]": 220 return self.coro().__await__() # FP pylint: disable=no-member 221 222 async def __aenter__(self) -> PipeRunner: 223 "Enter the async context manager." 224 return await context_aenter(self, PipeRunner(self, capturing=True)) 225 226 async def __aexit__( 227 self, 228 exc_type: Optional[type[BaseException]], 229 exc_value: Optional[BaseException], 230 exc_tb: Optional[TracebackType], 231 ) -> Optional[bool]: 232 "Exit the async context manager." 233 return await context_aexit(self, exc_type, exc_value, exc_tb) 234 235 def __aiter__(self) -> AsyncIterator[str]: 236 "Return async iterator to iterate over output lines." 237 return aiter_preflight(self)._readlines() 238 239 async def _readlines(self): 240 "Async generator to iterate over lines." 241 async with PipeRunner(self, capturing=True) as run: 242 async for line in run: 243 yield line
A Pipeline is a sequence of commands.
46 @staticmethod 47 def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]": 48 "Create a new Pipeline." 49 return Pipeline(commands)
Create a new Pipeline.
56 @property 57 def name(self) -> str: 58 "Return the name of the pipeline." 59 return "|".join(cmd.name for cmd in self.commands)
Return the name of the pipeline.
61 @property 62 def options(self) -> shellous.Options: 63 "Return the last command's options." 64 return self.commands[-1].options
Return the last command's options.
66 def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]": 67 "Set stdin on the first command of the pipeline." 68 new_first = self.commands[0].stdin(input_, close=close) 69 new_commands = (new_first,) + self.commands[1:] 70 return dataclasses.replace(self, commands=new_commands)
Set stdin on the first command of the pipeline.
72 def stdout( 73 self, 74 output: Any, 75 *, 76 append: bool = False, 77 close: bool = False, 78 ) -> "Pipeline[_RT]": 79 "Set stdout on the last command of the pipeline." 80 new_last = self.commands[-1].stdout(output, append=append, close=close) 81 new_commands = self.commands[0:-1] + (new_last,) 82 return dataclasses.replace(self, commands=new_commands)
Set stdout on the last command of the pipeline.
84 def stderr( 85 self, 86 error: Any, 87 *, 88 append: bool = False, 89 close: bool = False, 90 ) -> "Pipeline[_RT]": 91 "Set stderr on the last command of the pipeline." 92 new_last = self.commands[-1].stderr(error, append=append, close=close) 93 new_commands = self.commands[0:-1] + (new_last,) 94 return dataclasses.replace(self, commands=new_commands)
Set stderr on the last command of the pipeline.
102 def coro(self) -> Coroutine[Any, Any, _RT]: 103 "Return coroutine object for pipeline." 104 return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self))
Return coroutine object for pipeline.
106 @contextlib.asynccontextmanager 107 async def prompt( 108 self, 109 prompt: Union[str, list[str], re.Pattern[str], None] = None, 110 *, 111 timeout: Optional[float] = None, 112 normalize_newlines: bool = False, 113 ) -> AsyncIterator[Prompt]: 114 """Run pipeline using the send/expect API. 115 116 This method should be called using `async with`. It returns a `Prompt` 117 object with send() and expect() methods. 118 119 You can optionally set a default `prompt`. This is used by `expect()` 120 when you don't provide another value. 121 122 Use the `timeout` parameter to set the default timeout for operations. 123 124 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 125 This conversion is done before matching with `expect()`. This option 126 does not affect strings sent with `send()`. 127 """ 128 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 129 130 cli = None 131 try: 132 async with PipeRunner(cmd, capturing=True) as run: 133 cli = Prompt( 134 run, 135 default_prompt=prompt, 136 default_timeout=timeout, 137 normalize_newlines=normalize_newlines, 138 ) 139 yield cli 140 cli.close() 141 finally: 142 if cli is not None: 143 cli._finish_() # pyright: ignore[reportPrivateUsage]
Run pipeline using the send/expect API.
This method should be called using async with
. It returns a Prompt
object with send() and expect() methods.
You can optionally set a default prompt
. This is used by expect()
when you don't provide another value.
Use the timeout
parameter to set the default timeout for operations.
Set normalize_newlines
to True to convert incoming CR and CR-LF to LF.
This conversion is done before matching with expect()
. This option
does not affect strings sent with send()
.
153 def __len__(self) -> int: 154 "Return number of commands in pipe." 155 return len(self.commands)
Return number of commands in pipe.
157 def __getitem__(self, key: int) -> shellous.Command[Any]: 158 "Return specified command by index." 159 return self.commands[key]
Return specified command by index.
206 @property 207 def writable(self) -> "Pipeline[_RT]": 208 "Set writable=True option on last command of pipeline." 209 return self._set(_writable=True)
Set writable=True option on last command of pipeline.
211 @property 212 def result(self) -> "Pipeline[shellous.Result]": 213 "Set `_return_result` and `exit_codes`." 214 return cast( 215 Pipeline[shellous.Result], 216 self._set(_return_result=True, exit_codes=range(-255, 256)), 217 )
Set _return_result
and exit_codes
.
24class Prompt: 25 """Utility class to help with an interactive prompt session. 26 27 When you are controlling a co-process you will usually "send" some 28 text to it, and then "expect" a response. The "expect" operation can use 29 a regular expression to match different types of responses. 30 31 Create a new `Prompt` instance using the `prompt()` API. 32 33 ``` 34 # In this example, we are using a default prompt of "??? ". 35 # Setting the PS1 environment variable tells the shell to use this as 36 # the shell prompt. 37 cmd = sh("sh").env(PS1="??? ").set(pty=True) 38 39 async with cmd.prompt("??? ", timeout=3.0) as cli: 40 # Turn off terminal echo. 41 cli.echo = False 42 43 # Wait for greeting and initial prompt. Calling expect() with no 44 # argument will match the default prompt "??? ". 45 greeting, _ = await cli.expect() 46 47 # Send a command and wait for the response. 48 await cli.send("echo hello") 49 answer, _ = await cli.expect() 50 assert answer == "hello\\r\\n" 51 ``` 52 """ 53 54 _runner: Union[Runner, PipeRunner] 55 _encoding: str 56 _default_prompt: Optional[re.Pattern[str]] 57 _default_timeout: Optional[float] 58 _normalize_newlines: bool 59 _chunk_size: int 60 _decoder: codecs.IncrementalDecoder 61 _pending: str = "" 62 _at_eof: bool = False 63 _result: Optional[Result] = None 64 65 def __init__( 66 self, 67 runner: Runner, 68 *, 69 default_prompt: Union[str, list[str], re.Pattern[str], None] = None, 70 default_timeout: Optional[float] = None, 71 normalize_newlines: bool = False, 72 _chunk_size: int = _DEFAULT_CHUNK_SIZE, 73 ): 74 assert runner.stdin is not None 75 assert runner.stdout is not None 76 77 if isinstance(default_prompt, (str, list)): 78 default_prompt = _regex_compile_exact(default_prompt) 79 80 self._runner = runner 81 self._encoding = runner.options.encoding 82 self._default_prompt = default_prompt 83 self._default_timeout = default_timeout 84 self._normalize_newlines = normalize_newlines 85 self._chunk_size = _chunk_size 86 self._decoder = _make_decoder(self._encoding, normalize_newlines) 87 88 if LOG_PROMPT: 89 LOGGER.info( 90 "Prompt[pid=%s]: --- BEGIN --- name=%r", 91 self._runner.pid, 92 self._runner.name, 93 ) 94 95 @property 96 def at_eof(self) -> bool: 97 "True if the prompt reader is at the end of file." 98 return self._at_eof and not self._pending 99 100 @property 101 def pending(self) -> str: 102 """Characters that still remain in the `pending` buffer.""" 103 return self._pending 104 105 @property 106 def echo(self) -> bool: 107 """True if PTY is in echo mode. 108 109 If the runner is not using a PTY, always return False. 110 111 When the process is using a PTY, you can enable/disable terminal echo 112 mode by setting the `echo` property to True/False. 113 """ 114 if not isinstance(self._runner, Runner) or self._runner.pty_fd is None: 115 return False 116 return pty_util.get_term_echo(self._runner.pty_fd) 117 118 @echo.setter 119 def echo(self, value: bool) -> None: 120 """Set echo mode for the PTY. 121 122 Raise an error if the runner is not using a PTY. 123 """ 124 if not isinstance(self._runner, Runner) or self._runner.pty_fd is None: 125 raise RuntimeError("Cannot set echo mode. Not running in a PTY.") 126 pty_util.set_term_echo(self._runner.pty_fd, value) 127 128 @property 129 def result(self) -> Result: 130 """The `Result` of the co-process when it exited. 131 132 You can only retrieve this property *after* the `async with` block 133 exits where the co-process is running: 134 135 ``` 136 async with cmd.prompt() as cli: 137 ... 138 # Access `cli.result` here. 139 ``` 140 141 Inside the `async with` block, raise a RuntimeError because the process 142 has not exited yet. 143 """ 144 if self._result is None: 145 raise RuntimeError("Prompt process is still running") 146 return self._result 147 148 async def send( 149 self, 150 text: Union[bytes, str], 151 *, 152 end: Optional[str] = _DEFAULT_LINE_END, 153 no_echo: bool = False, 154 timeout: Optional[float] = None, 155 ) -> None: 156 """Write some `text` to co-process standard input and append a newline. 157 158 The `text` parameter is the string that you want to write. Use a `bytes` 159 object to send some raw bytes (e.g. to the terminal driver). 160 161 The default line ending is "\n". Use the `end` parameter to change the 162 line ending. To omit the line ending entirely, specify `end=None`. 163 164 Set `no_echo` to True when you are writing a password. When you set 165 `no_echo` to True, the `send` method will wait for terminal 166 echo mode to be disabled before writing the text. If shellous logging 167 is enabled, the sensitive information will **not** be logged. 168 169 Use the `timeout` parameter to override the default timeout. Normally, 170 data is delivered immediately and this method returns making a trip 171 through the event loop. However, there are situations where the 172 co-process input pipeline fills and we have to wait for it to 173 drain. 174 175 When this method needs to wait for the co-process input pipe to drain, 176 this method will concurrently read from the output pipe into the pending 177 buffer. This is necessary to avoid a deadlock situation where everything 178 stops because neither process can make progress. 179 """ 180 if end is None: 181 end = "" 182 183 if no_echo: 184 await self._wait_no_echo() 185 186 if isinstance(text, bytes): 187 data = text + encode_bytes(end, self._encoding) 188 else: 189 data = encode_bytes(text + end, self._encoding) 190 191 stdin = self._runner.stdin 192 assert stdin is not None 193 stdin.write(data) 194 195 if LOG_PROMPT: 196 self._log_send(data, no_echo) 197 198 # Drain our write to stdin. 199 cancelled, ex = await harvest_results( 200 self._drain(stdin), 201 timeout=timeout or self._default_timeout, 202 ) 203 if cancelled: 204 raise asyncio.CancelledError() 205 if isinstance(ex[0], Exception): 206 raise ex[0] 207 208 async def expect( 209 self, 210 prompt: Union[str, list[str], re.Pattern[str], None] = None, 211 *, 212 timeout: Optional[float] = None, 213 ) -> tuple[str, re.Match[str]]: 214 """Read from co-process standard output until `prompt` pattern matches. 215 216 Returns a 2-tuple of (output, match) where `output` is the text *before* 217 the prompt pattern and `match` is a `re.Match` object for the prompt 218 text itself. 219 220 If `expect` reaches EOF or a timeout occurs before the prompt pattern 221 matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread 222 characters will be available in the `pending` buffer. 223 224 After this method returns, there may still be characters read from stdout 225 that remain in the `pending` buffer. These are the characters *after* the 226 prompt pattern. You can examine these using the `pending` property. 227 Subsequent calls to expect will examine this buffer first before 228 reading new data from the output pipe. 229 230 By default, this method will use the default `timeout` for the `Prompt` 231 object if one is set. You can use the `timeout` parameter to specify 232 a custom timeout in seconds. 233 234 Prompt Patterns 235 ~~~~~~~~~~~~~~~ 236 237 The `expect()` method supports matching fixed strings and regular 238 expressions. The type of the `prompt` parameter determines the type of 239 search. 240 241 No argument or `None`: 242 Use the default prompt pattern. If there is no default prompt, 243 raise a TypeError. 244 `str`: 245 Match this string exactly. 246 `list[str]`: 247 Match one of these strings exactly. 248 `re.Pattern[str]`: 249 Match the given regular expression. 250 251 When matching a regular expression, only a single Pattern object is 252 supported. To match multiple regular expressions, combine them into a 253 single regular expression using *alternation* syntax (|). 254 255 The `expect()` method returns a 2-tuple (output, match). The `match` 256 is the result of the regular expression search (re.Match). If you 257 specify your prompt as a string or list of strings, it is still compiled 258 into a regular expression that produces an `re.Match` object. You can 259 examine the `match` object to determine the prompt value found. 260 261 This method conducts a regular expression search on streaming data. The 262 `expect()` method reads a new chunk of data into the `pending` buffer 263 and then searches it. You must be careful in writing a regular 264 expression so that the search is agnostic to how the incoming chunks of 265 data arrive. Consider including a boundary condition at the end of your pattern. 266 For example, instead of searching for the open-ended pattern`[a-z]+`, 267 search for the pattern `[a-z]+[^a-z]` which ends with a non-letter 268 character. 269 270 Examples 271 ~~~~~~~~ 272 273 Expect an exact string: 274 275 ``` 276 await cli.expect("ftp> ") 277 await cli.send(command) 278 response, _ = await cli.expect("ftp> ") 279 ``` 280 281 Expect a choice of strings: 282 283 ``` 284 _, m = await cli.expect(["Login: ", "Password: ", "ftp> "]) 285 match m[0]: 286 case "Login: ": 287 await cli.send(login) 288 case "Password: ": 289 await cli.send(password) 290 case "ftp> ": 291 await cli.send(command) 292 ``` 293 294 Read until EOF: 295 296 ``` 297 data = await cli.read_all() 298 ``` 299 300 Read the contents of the `pending` buffer without filling the buffer 301 with any new data from the co-process pipe: 302 303 ``` 304 data = await cli.read_pending() 305 ``` 306 """ 307 if prompt is None: 308 prompt = self._default_prompt 309 if prompt is None: 310 raise TypeError("prompt is required when default prompt is not set") 311 elif isinstance(prompt, (str, list)): 312 prompt = _regex_compile_exact(prompt) 313 314 if self._pending: 315 result = self._search_pending(prompt) 316 if result is not None: 317 return result 318 319 if self._at_eof: 320 raise EOFError("Prompt has reached EOF") 321 322 cancelled, (result,) = await harvest_results( 323 self._read_to_pattern(prompt), 324 timeout=timeout or self._default_timeout, 325 ) 326 if cancelled: 327 raise asyncio.CancelledError() 328 if isinstance(result, Exception): 329 raise result 330 331 return result 332 333 async def read_all( 334 self, 335 *, 336 timeout: Optional[float] = None, 337 ) -> str: 338 """Read from co-process output until EOF. 339 340 If we are already at EOF, return "". 341 """ 342 if not self._at_eof: 343 cancelled, (result,) = await harvest_results( 344 self._read_some(tag="@read_all"), 345 timeout=timeout or self._default_timeout, 346 ) 347 if cancelled: 348 raise asyncio.CancelledError() 349 if isinstance(result, Exception): 350 raise result 351 352 return self.read_pending() 353 354 def read_pending(self) -> str: 355 """Read the contents of the pending buffer and empty it. 356 357 This method does not fill the pending buffer with any new data from the 358 co-process output pipe. If the pending buffer is already empty, return 359 "". 360 """ 361 result = self._pending 362 self._pending = "" 363 return result 364 365 async def command( 366 self, 367 text: str, 368 *, 369 end: str = _DEFAULT_LINE_END, 370 no_echo: bool = False, 371 prompt: Union[str, re.Pattern[str], None] = None, 372 timeout: Optional[float] = None, 373 allow_eof: bool = False, 374 ) -> str: 375 """Send some text to the co-process and return the response. 376 377 This method is equivalent to calling send() following by expect(). 378 However, the return value is simpler; `command()` does not return the 379 `re.Match` object. 380 381 If you call this method *after* the co-process output pipe has already 382 returned EOF, raise `EOFError`. 383 384 If `allow_eof` is True, this method will read data up to EOF instead of 385 raising an EOFError. 386 """ 387 if self._at_eof: 388 raise EOFError("Prompt has reached EOF") 389 390 await self.send(text, end=end, no_echo=no_echo, timeout=timeout) 391 try: 392 result, _ = await self.expect(prompt, timeout=timeout) 393 except EOFError: 394 if not allow_eof: 395 raise 396 result = self.read_pending() 397 398 return result 399 400 def close(self) -> None: 401 "Close stdin to end the prompt session." 402 stdin = self._runner.stdin 403 assert stdin is not None 404 405 if isinstance(self._runner, Runner) and self._runner.pty_eof: 406 # Write EOF twice; once to end the current line, and the second 407 # time to signal the end. 408 stdin.write(self._runner.pty_eof * 2) 409 if LOG_PROMPT: 410 LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid) 411 412 else: 413 stdin.close() 414 if LOG_PROMPT: 415 LOGGER.info("Prompt[pid=%s] close", self._runner.pid) 416 417 def _finish_(self) -> None: 418 "Internal method called when process exits to fetch the `Result` and cache it." 419 self._result = self._runner.result(check=False) 420 if LOG_PROMPT: 421 LOGGER.info( 422 "Prompt[pid=%s]: --- END --- result=%r", 423 self._runner.pid, 424 self._result, 425 ) 426 427 async def _read_to_pattern( 428 self, 429 pattern: re.Pattern[str], 430 ) -> tuple[str, re.Match[str]]: 431 """Read text up to part that matches the pattern. 432 433 Returns 2-tuple with (text, match). 434 """ 435 stdout = self._runner.stdout 436 assert stdout is not None 437 assert self._chunk_size > 0 438 439 while not self._at_eof: 440 _prev_len = len(self._pending) # debug check 441 442 try: 443 # Read chunk and check for EOF. 444 chunk = await stdout.read(self._chunk_size) 445 if not chunk: 446 self._at_eof = True 447 except asyncio.CancelledError: 448 if LOG_PROMPT: 449 LOGGER.info( 450 "Prompt[pid=%s] receive cancelled: pending=%r", 451 self._runner.pid, 452 self._pending, 453 ) 454 raise 455 456 if LOG_PROMPT: 457 self._log_receive(chunk) 458 459 # Decode eligible bytes into our buffer. 460 data = self._decoder.decode(chunk, final=self._at_eof) 461 if not data and not self._at_eof: 462 continue 463 self._pending += data 464 465 result = self._search_pending(pattern) 466 if result is not None: 467 return result 468 469 assert self._at_eof or len(self._pending) > _prev_len # debug check 470 471 raise EOFError("Prompt has reached EOF") 472 473 def _search_pending( 474 self, 475 pattern: re.Pattern[str], 476 ) -> Optional[tuple[str, re.Match[str]]]: 477 """Search our `pending` buffer for the pattern. 478 479 If we find a match, we return the data up to the portion that matched 480 and leave the trailing data in the `pending` buffer. This method returns 481 (result, match) or None if there is no match. 482 """ 483 found = pattern.search(self._pending) 484 if found: 485 result = self._pending[0 : found.start(0)] 486 self._pending = self._pending[found.end(0) :] 487 if LOG_PROMPT: 488 LOGGER.info( 489 "Prompt[pid=%s] found: %r [%s CHARS PENDING]", 490 self._runner.pid, 491 found, 492 len(self._pending), 493 ) 494 return (result, found) 495 496 return None 497 498 async def _drain(self, stream: asyncio.StreamWriter) -> None: 499 "Drain stream while reading into buffer concurrently." 500 read_task = asyncio.create_task( 501 self._read_some(tag="@drain", concurrent_cancel=True) 502 ) 503 try: 504 await stream.drain() 505 506 finally: 507 if not read_task.done(): 508 read_task.cancel() 509 await read_task 510 511 async def _read_some( 512 self, 513 *, 514 tag: str = "", 515 concurrent_cancel: bool = False, 516 ) -> None: 517 "Read into `pending` buffer until cancelled or EOF." 518 stdout = self._runner.stdout 519 assert stdout is not None 520 assert self._chunk_size > 0 521 522 while not self._at_eof: 523 # Yield time to other tasks; read() doesn't yield as long as there 524 # is data to read. We need to provide a cancel point when this 525 # method is called during `drain`. 526 if concurrent_cancel: 527 await asyncio.sleep(0) 528 529 # Read chunk and check for EOF. 530 chunk = await stdout.read(self._chunk_size) 531 if not chunk: 532 self._at_eof = True 533 534 if LOG_PROMPT: 535 self._log_receive(chunk, tag) 536 537 # Decode eligible bytes into our buffer. 538 data = self._decoder.decode(chunk, final=self._at_eof) 539 self._pending += data 540 541 async def _wait_no_echo(self): 542 "Wait for terminal echo mode to be disabled." 543 if LOG_PROMPT: 544 LOGGER.info("Prompt[pid=%s] wait: no_echo", self._runner.pid) 545 546 for _ in range(4 * 30): 547 if not self.echo: 548 break 549 await asyncio.sleep(0.25) 550 else: 551 raise RuntimeError("Timed out: Terminal echo mode remains enabled.") 552 553 def _log_send(self, data: bytes, no_echo: bool): 554 "Log data as it is being sent." 555 pid = self._runner.pid 556 557 if no_echo: 558 LOGGER.info("Prompt[pid=%s] send: [[HIDDEN]]", pid) 559 else: 560 data_len = len(data) 561 if data_len > _LOG_LIMIT: 562 LOGGER.info( 563 "Prompt[pid=%s] send: [%d B] %r...%r", 564 pid, 565 data_len, 566 data[: _LOG_LIMIT - _LOG_LIMIT_END], 567 data[-_LOG_LIMIT_END:], 568 ) 569 else: 570 LOGGER.info( 571 "Prompt[pid=%s] send: [%d B] %r", 572 pid, 573 data_len, 574 data, 575 ) 576 577 def _log_receive(self, data: bytes, tag: str = ""): 578 "Log data as it is being received." 579 pid = self._runner.pid 580 data_len = len(data) 581 582 if data_len > _LOG_LIMIT: 583 LOGGER.info( 584 "Prompt[pid=%s] receive%s: [%d B] %r...%r", 585 pid, 586 tag, 587 data_len, 588 data[: _LOG_LIMIT - _LOG_LIMIT_END], 589 data[-_LOG_LIMIT_END:], 590 ) 591 else: 592 LOGGER.info( 593 "Prompt[pid=%s] receive%s: [%d B] %r", 594 pid, 595 tag, 596 data_len, 597 data, 598 )
Utility class to help with an interactive prompt session.
When you are controlling a co-process you will usually "send" some text to it, and then "expect" a response. The "expect" operation can use a regular expression to match different types of responses.
Create a new Prompt
instance using the prompt()
API.
# In this example, we are using a default prompt of "??? ".
# Setting the PS1 environment variable tells the shell to use this as
# the shell prompt.
cmd = sh("sh").env(PS1="??? ").set(pty=True)
async with cmd.prompt("??? ", timeout=3.0) as cli:
# Turn off terminal echo.
cli.echo = False
# Wait for greeting and initial prompt. Calling expect() with no
# argument will match the default prompt "??? ".
greeting, _ = await cli.expect()
# Send a command and wait for the response.
await cli.send("echo hello")
answer, _ = await cli.expect()
assert answer == "hello\r\n"
65 def __init__( 66 self, 67 runner: Runner, 68 *, 69 default_prompt: Union[str, list[str], re.Pattern[str], None] = None, 70 default_timeout: Optional[float] = None, 71 normalize_newlines: bool = False, 72 _chunk_size: int = _DEFAULT_CHUNK_SIZE, 73 ): 74 assert runner.stdin is not None 75 assert runner.stdout is not None 76 77 if isinstance(default_prompt, (str, list)): 78 default_prompt = _regex_compile_exact(default_prompt) 79 80 self._runner = runner 81 self._encoding = runner.options.encoding 82 self._default_prompt = default_prompt 83 self._default_timeout = default_timeout 84 self._normalize_newlines = normalize_newlines 85 self._chunk_size = _chunk_size 86 self._decoder = _make_decoder(self._encoding, normalize_newlines) 87 88 if LOG_PROMPT: 89 LOGGER.info( 90 "Prompt[pid=%s]: --- BEGIN --- name=%r", 91 self._runner.pid, 92 self._runner.name, 93 )
95 @property 96 def at_eof(self) -> bool: 97 "True if the prompt reader is at the end of file." 98 return self._at_eof and not self._pending
True if the prompt reader is at the end of file.
100 @property 101 def pending(self) -> str: 102 """Characters that still remain in the `pending` buffer.""" 103 return self._pending
Characters that still remain in the pending
buffer.
105 @property 106 def echo(self) -> bool: 107 """True if PTY is in echo mode. 108 109 If the runner is not using a PTY, always return False. 110 111 When the process is using a PTY, you can enable/disable terminal echo 112 mode by setting the `echo` property to True/False. 113 """ 114 if not isinstance(self._runner, Runner) or self._runner.pty_fd is None: 115 return False 116 return pty_util.get_term_echo(self._runner.pty_fd)
True if PTY is in echo mode.
If the runner is not using a PTY, always return False.
When the process is using a PTY, you can enable/disable terminal echo
mode by setting the echo
property to True/False.
128 @property 129 def result(self) -> Result: 130 """The `Result` of the co-process when it exited. 131 132 You can only retrieve this property *after* the `async with` block 133 exits where the co-process is running: 134 135 ``` 136 async with cmd.prompt() as cli: 137 ... 138 # Access `cli.result` here. 139 ``` 140 141 Inside the `async with` block, raise a RuntimeError because the process 142 has not exited yet. 143 """ 144 if self._result is None: 145 raise RuntimeError("Prompt process is still running") 146 return self._result
The Result
of the co-process when it exited.
You can only retrieve this property after the async with
block
exits where the co-process is running:
async with cmd.prompt() as cli:
...
# Access `cli.result` here.
Inside the async with
block, raise a RuntimeError because the process
has not exited yet.
148 async def send( 149 self, 150 text: Union[bytes, str], 151 *, 152 end: Optional[str] = _DEFAULT_LINE_END, 153 no_echo: bool = False, 154 timeout: Optional[float] = None, 155 ) -> None: 156 """Write some `text` to co-process standard input and append a newline. 157 158 The `text` parameter is the string that you want to write. Use a `bytes` 159 object to send some raw bytes (e.g. to the terminal driver). 160 161 The default line ending is "\n". Use the `end` parameter to change the 162 line ending. To omit the line ending entirely, specify `end=None`. 163 164 Set `no_echo` to True when you are writing a password. When you set 165 `no_echo` to True, the `send` method will wait for terminal 166 echo mode to be disabled before writing the text. If shellous logging 167 is enabled, the sensitive information will **not** be logged. 168 169 Use the `timeout` parameter to override the default timeout. Normally, 170 data is delivered immediately and this method returns making a trip 171 through the event loop. However, there are situations where the 172 co-process input pipeline fills and we have to wait for it to 173 drain. 174 175 When this method needs to wait for the co-process input pipe to drain, 176 this method will concurrently read from the output pipe into the pending 177 buffer. This is necessary to avoid a deadlock situation where everything 178 stops because neither process can make progress. 179 """ 180 if end is None: 181 end = "" 182 183 if no_echo: 184 await self._wait_no_echo() 185 186 if isinstance(text, bytes): 187 data = text + encode_bytes(end, self._encoding) 188 else: 189 data = encode_bytes(text + end, self._encoding) 190 191 stdin = self._runner.stdin 192 assert stdin is not None 193 stdin.write(data) 194 195 if LOG_PROMPT: 196 self._log_send(data, no_echo) 197 198 # Drain our write to stdin. 199 cancelled, ex = await harvest_results( 200 self._drain(stdin), 201 timeout=timeout or self._default_timeout, 202 ) 203 if cancelled: 204 raise asyncio.CancelledError() 205 if isinstance(ex[0], Exception): 206 raise ex[0]
Write some text
to co-process standard input and append a newline.
The `text` parameter is the string that you want to write. Use a `bytes`
object to send some raw bytes (e.g. to the terminal driver).
The default line ending is "
". Use the end
parameter to change the
line ending. To omit the line ending entirely, specify end=None
.
Set `no_echo` to True when you are writing a password. When you set
`no_echo` to True, the `send` method will wait for terminal
echo mode to be disabled before writing the text. If shellous logging
is enabled, the sensitive information will **not** be logged.
Use the `timeout` parameter to override the default timeout. Normally,
data is delivered immediately and this method returns making a trip
through the event loop. However, there are situations where the
co-process input pipeline fills and we have to wait for it to
drain.
When this method needs to wait for the co-process input pipe to drain,
this method will concurrently read from the output pipe into the pending
buffer. This is necessary to avoid a deadlock situation where everything
stops because neither process can make progress.
208 async def expect( 209 self, 210 prompt: Union[str, list[str], re.Pattern[str], None] = None, 211 *, 212 timeout: Optional[float] = None, 213 ) -> tuple[str, re.Match[str]]: 214 """Read from co-process standard output until `prompt` pattern matches. 215 216 Returns a 2-tuple of (output, match) where `output` is the text *before* 217 the prompt pattern and `match` is a `re.Match` object for the prompt 218 text itself. 219 220 If `expect` reaches EOF or a timeout occurs before the prompt pattern 221 matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread 222 characters will be available in the `pending` buffer. 223 224 After this method returns, there may still be characters read from stdout 225 that remain in the `pending` buffer. These are the characters *after* the 226 prompt pattern. You can examine these using the `pending` property. 227 Subsequent calls to expect will examine this buffer first before 228 reading new data from the output pipe. 229 230 By default, this method will use the default `timeout` for the `Prompt` 231 object if one is set. You can use the `timeout` parameter to specify 232 a custom timeout in seconds. 233 234 Prompt Patterns 235 ~~~~~~~~~~~~~~~ 236 237 The `expect()` method supports matching fixed strings and regular 238 expressions. The type of the `prompt` parameter determines the type of 239 search. 240 241 No argument or `None`: 242 Use the default prompt pattern. If there is no default prompt, 243 raise a TypeError. 244 `str`: 245 Match this string exactly. 246 `list[str]`: 247 Match one of these strings exactly. 248 `re.Pattern[str]`: 249 Match the given regular expression. 250 251 When matching a regular expression, only a single Pattern object is 252 supported. To match multiple regular expressions, combine them into a 253 single regular expression using *alternation* syntax (|). 254 255 The `expect()` method returns a 2-tuple (output, match). The `match` 256 is the result of the regular expression search (re.Match). If you 257 specify your prompt as a string or list of strings, it is still compiled 258 into a regular expression that produces an `re.Match` object. You can 259 examine the `match` object to determine the prompt value found. 260 261 This method conducts a regular expression search on streaming data. The 262 `expect()` method reads a new chunk of data into the `pending` buffer 263 and then searches it. You must be careful in writing a regular 264 expression so that the search is agnostic to how the incoming chunks of 265 data arrive. Consider including a boundary condition at the end of your pattern. 266 For example, instead of searching for the open-ended pattern`[a-z]+`, 267 search for the pattern `[a-z]+[^a-z]` which ends with a non-letter 268 character. 269 270 Examples 271 ~~~~~~~~ 272 273 Expect an exact string: 274 275 ``` 276 await cli.expect("ftp> ") 277 await cli.send(command) 278 response, _ = await cli.expect("ftp> ") 279 ``` 280 281 Expect a choice of strings: 282 283 ``` 284 _, m = await cli.expect(["Login: ", "Password: ", "ftp> "]) 285 match m[0]: 286 case "Login: ": 287 await cli.send(login) 288 case "Password: ": 289 await cli.send(password) 290 case "ftp> ": 291 await cli.send(command) 292 ``` 293 294 Read until EOF: 295 296 ``` 297 data = await cli.read_all() 298 ``` 299 300 Read the contents of the `pending` buffer without filling the buffer 301 with any new data from the co-process pipe: 302 303 ``` 304 data = await cli.read_pending() 305 ``` 306 """ 307 if prompt is None: 308 prompt = self._default_prompt 309 if prompt is None: 310 raise TypeError("prompt is required when default prompt is not set") 311 elif isinstance(prompt, (str, list)): 312 prompt = _regex_compile_exact(prompt) 313 314 if self._pending: 315 result = self._search_pending(prompt) 316 if result is not None: 317 return result 318 319 if self._at_eof: 320 raise EOFError("Prompt has reached EOF") 321 322 cancelled, (result,) = await harvest_results( 323 self._read_to_pattern(prompt), 324 timeout=timeout or self._default_timeout, 325 ) 326 if cancelled: 327 raise asyncio.CancelledError() 328 if isinstance(result, Exception): 329 raise result 330 331 return result
Read from co-process standard output until prompt
pattern matches.
Returns a 2-tuple of (output, match) where output
is the text before
the prompt pattern and match
is a re.Match
object for the prompt
text itself.
If expect
reaches EOF or a timeout occurs before the prompt pattern
matches, it raises an EOFError
or asyncio.TimeoutError
. The unread
characters will be available in the pending
buffer.
After this method returns, there may still be characters read from stdout
that remain in the pending
buffer. These are the characters after the
prompt pattern. You can examine these using the pending
property.
Subsequent calls to expect will examine this buffer first before
reading new data from the output pipe.
By default, this method will use the default timeout
for the Prompt
object if one is set. You can use the timeout
parameter to specify
a custom timeout in seconds.
Prompt Patterns
~~~
The expect()
method supports matching fixed strings and regular
expressions. The type of the prompt
parameter determines the type of
search.
No argument or None
:
Use the default prompt pattern. If there is no default prompt,
raise a TypeError.
str
:
Match this string exactly.
list[str]
:
Match one of these strings exactly.
re.Pattern[str]
:
Match the given regular expression.
When matching a regular expression, only a single Pattern object is supported. To match multiple regular expressions, combine them into a single regular expression using alternation syntax (|).
The expect()
method returns a 2-tuple (output, match). The match
is the result of the regular expression search (re.Match). If you
specify your prompt as a string or list of strings, it is still compiled
into a regular expression that produces an re.Match
object. You can
examine the match
object to determine the prompt value found.
This method conducts a regular expression search on streaming data. The
expect()
method reads a new chunk of data into the pending
buffer
and then searches it. You must be careful in writing a regular
expression so that the search is agnostic to how the incoming chunks of
data arrive. Consider including a boundary condition at the end of your pattern.
For example, instead of searching for the open-ended pattern[a-z]+
,
search for the pattern [a-z]+[^a-z]
which ends with a non-letter
character.
Examples
~~~~
Expect an exact string:
await cli.expect("ftp> ")
await cli.send(command)
response, _ = await cli.expect("ftp> ")
Expect a choice of strings:
_, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
match m[0]:
case "Login: ":
await cli.send(login)
case "Password: ":
await cli.send(password)
case "ftp> ":
await cli.send(command)
Read until EOF:
data = await cli.read_all()
Read the contents of the pending
buffer without filling the buffer
with any new data from the co-process pipe:
data = await cli.read_pending()
333 async def read_all( 334 self, 335 *, 336 timeout: Optional[float] = None, 337 ) -> str: 338 """Read from co-process output until EOF. 339 340 If we are already at EOF, return "". 341 """ 342 if not self._at_eof: 343 cancelled, (result,) = await harvest_results( 344 self._read_some(tag="@read_all"), 345 timeout=timeout or self._default_timeout, 346 ) 347 if cancelled: 348 raise asyncio.CancelledError() 349 if isinstance(result, Exception): 350 raise result 351 352 return self.read_pending()
Read from co-process output until EOF.
If we are already at EOF, return "".
354 def read_pending(self) -> str: 355 """Read the contents of the pending buffer and empty it. 356 357 This method does not fill the pending buffer with any new data from the 358 co-process output pipe. If the pending buffer is already empty, return 359 "". 360 """ 361 result = self._pending 362 self._pending = "" 363 return result
Read the contents of the pending buffer and empty it.
This method does not fill the pending buffer with any new data from the co-process output pipe. If the pending buffer is already empty, return "".
365 async def command( 366 self, 367 text: str, 368 *, 369 end: str = _DEFAULT_LINE_END, 370 no_echo: bool = False, 371 prompt: Union[str, re.Pattern[str], None] = None, 372 timeout: Optional[float] = None, 373 allow_eof: bool = False, 374 ) -> str: 375 """Send some text to the co-process and return the response. 376 377 This method is equivalent to calling send() following by expect(). 378 However, the return value is simpler; `command()` does not return the 379 `re.Match` object. 380 381 If you call this method *after* the co-process output pipe has already 382 returned EOF, raise `EOFError`. 383 384 If `allow_eof` is True, this method will read data up to EOF instead of 385 raising an EOFError. 386 """ 387 if self._at_eof: 388 raise EOFError("Prompt has reached EOF") 389 390 await self.send(text, end=end, no_echo=no_echo, timeout=timeout) 391 try: 392 result, _ = await self.expect(prompt, timeout=timeout) 393 except EOFError: 394 if not allow_eof: 395 raise 396 result = self.read_pending() 397 398 return result
Send some text to the co-process and return the response.
This method is equivalent to calling send() following by expect().
However, the return value is simpler; command()
does not return the
re.Match
object.
If you call this method after the co-process output pipe has already
returned EOF, raise EOFError
.
If allow_eof
is True, this method will read data up to EOF instead of
raising an EOFError.
400 def close(self) -> None: 401 "Close stdin to end the prompt session." 402 stdin = self._runner.stdin 403 assert stdin is not None 404 405 if isinstance(self._runner, Runner) and self._runner.pty_eof: 406 # Write EOF twice; once to end the current line, and the second 407 # time to signal the end. 408 stdin.write(self._runner.pty_eof * 2) 409 if LOG_PROMPT: 410 LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid) 411 412 else: 413 stdin.close() 414 if LOG_PROMPT: 415 LOGGER.info("Prompt[pid=%s] close", self._runner.pid)
Close stdin to end the prompt session.
200def cbreak(rows: int = 0, cols: int = 0) -> PtyAdapter: 201 "Return a function that sets PtyOptions.child_fd to cbreak mode." 202 203 def _pty_set_cbreak(fdesc: int): 204 tty.setcbreak(fdesc) 205 if rows or cols: 206 _set_term_size(fdesc, rows, cols) 207 assert _get_eof(fdesc) == b"" 208 209 return _pty_set_cbreak
Return a function that sets PtyOptions.child_fd to cbreak mode.
212def cooked(rows: int = 0, cols: int = 0, echo: bool = True) -> PtyAdapter: 213 "Return a function that leaves PtyOptions.child_fd in cooked mode." 214 215 def _pty_set_canonical(fdesc: int): 216 if rows or cols: 217 _set_term_size(fdesc, rows, cols) 218 if not echo: 219 set_term_echo(fdesc, False) 220 assert _get_eof(fdesc) == b"\x04" 221 222 return _pty_set_canonical
Return a function that leaves PtyOptions.child_fd in cooked mode.
188def raw(rows: int = 0, cols: int = 0) -> PtyAdapter: 189 "Return a function that sets PtyOptions.child_fd to raw mode." 190 191 def _pty_set_raw(fdesc: int): 192 tty.setraw(fdesc) 193 if rows or cols: 194 _set_term_size(fdesc, rows, cols) 195 assert _get_eof(fdesc) == b"" 196 197 return _pty_set_raw
Return a function that sets PtyOptions.child_fd to raw mode.
26@dataclass(frozen=True, **_KW_ONLY) 27class Result: 28 "Concrete class for the result of a Command." 29 30 exit_code: int 31 "Command's exit status. If < 0, this is a negated signal number." 32 33 output_bytes: bytes 34 "Output of command as bytes. May be None if there is no output." 35 36 error_bytes: bytes 37 "Limited standard error from command if not redirected." 38 39 cancelled: bool 40 "Command was cancelled." 41 42 encoding: str 43 "Output encoding." 44 45 @property 46 def output(self) -> str: 47 "Output of command as a string." 48 return decode_bytes(self.output_bytes, self.encoding) 49 50 @property 51 def error(self) -> str: 52 "Error from command as a string (if it is not redirected)." 53 return decode_bytes(self.error_bytes, self.encoding) 54 55 @property 56 def exit_signal(self) -> Optional[signal.Signals]: 57 "Signal that caused the command to exit, or None if no signal." 58 if self.exit_code >= 0: 59 return None 60 return signal.Signals(-self.exit_code) 61 62 def __bool__(self) -> bool: 63 "Return true if exit_code is 0." 64 return self.exit_code == 0
Concrete class for the result of a Command.
45 @property 46 def output(self) -> str: 47 "Output of command as a string." 48 return decode_bytes(self.output_bytes, self.encoding)
Output of command as a string.
50 @property 51 def error(self) -> str: 52 "Error from command as a string (if it is not redirected)." 53 return decode_bytes(self.error_bytes, self.encoding)
Error from command as a string (if it is not redirected).
55 @property 56 def exit_signal(self) -> Optional[signal.Signals]: 57 "Signal that caused the command to exit, or None if no signal." 58 if self.exit_code >= 0: 59 return None 60 return signal.Signals(-self.exit_code)
Signal that caused the command to exit, or None if no signal.
14class ResultError(Exception): 15 "Represents a non-zero exit status." 16 17 @property 18 def result(self) -> "shellous.Result": 19 "Result of the command." 20 return self.args[0]
Represents a non-zero exit status.
382class Runner: 383 """Runner is an asynchronous context manager that runs a command. 384 385 ``` 386 async with Runner(cmd) as run: 387 # process streams: run.stdin, run.stdout, run.stderr (if not None) 388 result = run.result() 389 ``` 390 """ 391 392 stdin: Optional[asyncio.StreamWriter] = None 393 "Process standard input." 394 395 stdout: Optional[asyncio.StreamReader] = None 396 "Process standard output." 397 398 stderr: Optional[asyncio.StreamReader] = None 399 "Process standard error." 400 401 _options: _RunOptions 402 _tasks: list[asyncio.Task[Any]] 403 _proc: Optional["asyncio.subprocess.Process"] = None 404 _cancelled: bool = False 405 _timer: Optional[asyncio.TimerHandle] = None 406 _timed_out: bool = False 407 _last_signal: Optional[int] = None 408 409 def __init__(self, command: "shellous.Command[Any]"): 410 self._options = _RunOptions(command) 411 self._tasks = [] 412 413 @property 414 def name(self) -> str: 415 "Return name of process being run." 416 return self.command.name 417 418 @property 419 def options(self) -> "shellous.Options": 420 "Return options for process being run." 421 return self.command.options 422 423 @property 424 def command(self) -> "shellous.Command[Any]": 425 "Return the command being run." 426 return self._options.command 427 428 @property 429 def pid(self) -> Optional[int]: 430 "Return the command's process ID." 431 if not self._proc: 432 return None 433 return self._proc.pid 434 435 @property 436 def returncode(self) -> Optional[int]: 437 "Process's exit code." 438 if not self._proc: 439 if self._cancelled: 440 # The process was cancelled before starting. 441 return CANCELLED_EXIT_CODE 442 return None 443 code = self._proc.returncode 444 if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None: 445 # After sending a signal, `waitpid` may fail to locate the child 446 # process. In this case, map the status to the last signal we sent. 447 # For more on this, see https://github.com/python/cpython/issues/87744 448 return -self._last_signal # pylint: disable=invalid-unary-operand-type 449 return code 450 451 @property 452 def cancelled(self) -> bool: 453 "Return True if the command was cancelled." 454 return self._cancelled 455 456 @property 457 def pty_fd(self) -> Optional[int]: 458 """The file descriptor used to communicate with the child PTY process. 459 460 Returns None if the process is not using a PTY. 461 """ 462 pty_fds = self._options.pty_fds 463 if pty_fds is not None: 464 return pty_fds.parent_fd 465 return None 466 467 @property 468 def pty_eof(self) -> Optional[bytes]: 469 """Byte sequence used to indicate EOF when written to the PTY child. 470 471 Returns None if process is not using a PTY. 472 """ 473 pty_fds = self._options.pty_fds 474 if pty_fds is not None: 475 return pty_fds.eof 476 return None 477 478 def result(self, *, check: bool = True) -> Result: 479 "Check process exit code and raise a ResultError if necessary." 480 code = self.returncode 481 if code is None: 482 raise TypeError("Runner.result(): Process has not exited") 483 484 result = Result( 485 exit_code=code, 486 output_bytes=bytes(self._options.output_bytes or b""), 487 error_bytes=bytes(self._options.error_bytes or b""), 488 cancelled=self._cancelled, 489 encoding=self._options.encoding, 490 ) 491 492 if not check: 493 return result 494 495 return check_result( 496 result, 497 self.command.options, 498 self._cancelled, 499 self._timed_out, 500 ) 501 502 def add_task( 503 self, 504 coro: Coroutine[Any, Any, _T], 505 tag: str = "", 506 ) -> asyncio.Task[_T]: 507 "Add a background task." 508 task_name = f"{self.name}#{tag}" 509 task = asyncio.create_task(coro, name=task_name) 510 self._tasks.append(task) 511 return task 512 513 def send_signal(self, sig: int) -> None: 514 "Send an arbitrary signal to the process if it is running." 515 if self.returncode is None: 516 self._signal(sig) 517 518 def cancel(self) -> None: 519 "Cancel the running process if it is running." 520 if self.returncode is None: 521 self._signal(self.command.options.cancel_signal) 522 523 def _is_bsd_pty(self) -> bool: 524 "Return true if we're running a pty on BSD." 525 return BSD_DERIVED and bool(self._options.pty_fds) 526 527 @log_method(LOG_DETAIL) 528 async def _wait(self) -> None: 529 "Normal wait for background I/O tasks and process to finish." 530 assert self._proc 531 532 try: 533 if self._tasks: 534 await harvest(*self._tasks, trustee=self) 535 if self._is_bsd_pty(): 536 await self._waiter() 537 538 except asyncio.CancelledError: 539 LOGGER.debug("Runner.wait cancelled %r", self) 540 self._set_cancelled() 541 self._tasks.clear() # all tasks were cancelled 542 await self._kill() 543 544 except Exception as ex: 545 LOGGER.debug("Runner.wait exited with error %r ex=%r", self, ex) 546 self._tasks.clear() # all tasks were cancelled 547 await self._kill() 548 raise # re-raise exception 549 550 @log_method(LOG_DETAIL) 551 async def _wait_pid(self): 552 "Manually poll `waitpid` until process finishes." 553 assert self._is_bsd_pty() 554 555 while True: 556 assert self._proc is not None # (pyright) 557 558 if poll_wait_pid(self._proc): 559 break 560 await asyncio.sleep(0.025) 561 562 @log_method(LOG_DETAIL) 563 async def _kill(self): 564 "Kill process and wait for it to finish." 565 assert self._proc 566 567 cancel_timeout = self.command.options.cancel_timeout 568 cancel_signal = self.command.options.cancel_signal 569 570 try: 571 # If not already done, send cancel signal. 572 if self._proc.returncode is None: 573 self._signal(cancel_signal) 574 575 if self._tasks: 576 await harvest(*self._tasks, timeout=cancel_timeout, trustee=self) 577 578 if self._proc.returncode is None: 579 await harvest(self._waiter(), timeout=cancel_timeout, trustee=self) 580 581 except (asyncio.CancelledError, asyncio.TimeoutError) as ex: 582 LOGGER.warning("Runner.kill %r (ex)=%r", self, ex) 583 if _is_cancelled(ex): 584 self._set_cancelled() 585 await self._kill_wait() 586 587 except (Exception, GeneratorExit) as ex: 588 LOGGER.warning("Runner.kill %r ex=%r", self, ex) 589 await self._kill_wait() 590 raise 591 592 def _signal(self, sig: Optional[int]): 593 "Send a signal to the process." 594 assert self._proc is not None # (pyright) 595 596 if LOG_DETAIL: 597 LOGGER.debug("Runner.signal %r signal=%r", self, sig) 598 self._audit_callback("signal", signal=sig) 599 600 if sig is None: 601 self._proc.kill() 602 else: 603 self._proc.send_signal(sig) 604 self._last_signal = sig 605 606 @log_method(LOG_DETAIL) 607 async def _kill_wait(self): 608 "Wait for killed process to exit." 609 assert self._proc 610 611 # Check if process is already done. 612 if self._proc.returncode is not None: 613 return 614 615 try: 616 self._signal(None) 617 await harvest(self._waiter(), timeout=_KILL_TIMEOUT, trustee=self) 618 except asyncio.TimeoutError as ex: 619 # Manually check if the process is still running. 620 if poll_wait_pid(self._proc): 621 LOGGER.warning("%r process reaped manually %r", self, self._proc) 622 else: 623 LOGGER.error("%r failed to kill process %r", self, self._proc) 624 raise RuntimeError(f"Unable to kill process {self._proc!r}") from ex 625 626 @log_method(LOG_DETAIL) 627 async def __aenter__(self): 628 "Set up redirections and launch subprocess." 629 self._audit_callback("start") 630 try: 631 return await self._start() 632 except BaseException as ex: 633 self._stop_timer() # failsafe just in case 634 self._audit_callback("stop", failure=type(ex).__name__) 635 raise 636 finally: 637 if self._cancelled and self.command.options._catch_cancelled_error: 638 # Raises ResultError instead of CancelledError. 639 self.result() 640 641 @log_method(LOG_DETAIL) 642 async def _start(self): 643 "Set up redirections and launch subprocess." 644 # assert self._proc is None 645 assert not self._tasks 646 647 try: 648 # Set up subprocess arguments and launch subprocess. 649 with self._options as opts: 650 await self._subprocess_spawn(opts) 651 652 assert self._proc is not None 653 stdin = self._proc.stdin 654 stdout = self._proc.stdout 655 stderr = self._proc.stderr 656 657 # Assign pty streams. 658 if opts.pty_fds: 659 assert (stdin, stdout) == (None, None) 660 stdin, stdout = opts.pty_fds.writer, opts.pty_fds.reader 661 662 if stderr is not None: 663 limit = None 664 if opts.error_bytes is not None: 665 error = opts.error_bytes 666 limit = opts.command.options.error_limit 667 elif opts.is_stderr_only: 668 assert stdout is None 669 assert opts.output_bytes is not None 670 error = opts.output_bytes 671 else: 672 error = opts.command.options.error 673 stderr = self._setup_output_sink( 674 stderr, error, opts.encoding, "stderr", limit 675 ) 676 677 if stdout is not None: 678 limit = None 679 if opts.output_bytes is not None: 680 output = opts.output_bytes 681 else: 682 output = opts.command.options.output 683 stdout = self._setup_output_sink( 684 stdout, output, opts.encoding, "stdout", limit 685 ) 686 687 if stdin is not None: 688 stdin = self._setup_input_source(stdin, opts) 689 690 except (Exception, asyncio.CancelledError) as ex: 691 LOGGER.debug("Runner._start %r ex=%r", self, ex) 692 if _is_cancelled(ex): 693 self._set_cancelled() 694 if self._proc: 695 await self._kill() 696 raise 697 698 # Make final streams available. These may be different from `self.proc` 699 # versions. 700 self.stdin = stdin 701 self.stdout = stdout 702 self.stderr = stderr 703 704 # Add a task to monitor for when the process finishes. 705 if not self._is_bsd_pty(): 706 self.add_task(self._waiter(), "waiter") 707 708 # Set a timer to cancel the current task after a timeout. 709 self._start_timer(self.command.options.timeout) 710 711 return self 712 713 @log_method(LOG_DETAIL) 714 async def _subprocess_spawn(self, opts: _RunOptions): 715 "Start the subprocess." 716 assert self._proc is None 717 718 # Second half of pty setup. 719 if opts.pty_fds: 720 opts.pty_fds = await opts.pty_fds.open_streams() 721 722 # Check for task cancellation and yield right before exec'ing. If the 723 # current task is already cancelled, this will raise a CancelledError, 724 # and we save ourselves the work of launching and immediately killing 725 # a process. 726 await asyncio.sleep(0) 727 728 # Launch the subprocess (always completes even if cancelled). 729 await uninterrupted(self._subprocess_exec(opts)) 730 731 # Launch the process substitution commands (if any). 732 for cmd in opts.subcmds: 733 self.add_task(cmd.coro(), "procsub") 734 735 @log_method(LOG_DETAIL) 736 async def _subprocess_exec(self, opts: _RunOptions): 737 "Start the subprocess and assign to `self.proc`." 738 with log_timer("asyncio.create_subprocess_exec"): 739 sys.audit(EVENT_SHELLOUS_EXEC, opts.pos_args[0]) 740 with pty_util.set_ignore_child_watcher( 741 BSD_DERIVED and opts.pty_fds is not None 742 ): 743 self._proc = await asyncio.create_subprocess_exec( 744 *opts.pos_args, 745 **opts.kwd_args, 746 ) 747 748 @log_method(LOG_DETAIL) 749 async def _waiter(self): 750 "Run task that waits for process to exit." 751 assert self._proc is not None # (pyright) 752 753 try: 754 if self._is_bsd_pty(): 755 await self._wait_pid() 756 else: 757 await self._proc.wait() 758 finally: 759 self._stop_timer() 760 761 def _set_cancelled(self): 762 "Set the cancelled flag, and cancel any inflight timers." 763 self._cancelled = True 764 self._stop_timer() 765 766 def _start_timer(self, timeout: Optional[float]): 767 "Start an optional timer to cancel the process if `timeout` desired." 768 assert self._timer is None 769 if timeout is not None: 770 loop = asyncio.get_running_loop() 771 task = asyncio.current_task() 772 assert task is not None 773 self._timer = loop.call_later( 774 timeout, 775 self._set_timer_expired, 776 task, 777 ) 778 779 def _set_timer_expired(self, main_task: asyncio.Task[Any]): 780 "Set a flag when the timer expires and cancel the main task." 781 self._timed_out = True 782 self._timer = None 783 main_task.cancel() 784 785 def _stop_timer(self): 786 if self._timer: 787 self._timer.cancel() 788 self._timer = None 789 790 def _setup_input_source( 791 self, 792 stream: asyncio.StreamWriter, 793 opts: _RunOptions, 794 ): 795 "Set up a task to read from custom input source." 796 tag = "stdin" 797 eof = opts.pty_fds.eof if opts.pty_fds else None 798 799 if opts.input_bytes is not None: 800 self.add_task(redir.write_stream(opts.input_bytes, stream, eof), tag) 801 return None 802 803 source = opts.command.options.input 804 805 if isinstance(source, asyncio.StreamReader): 806 self.add_task(redir.write_reader(source, stream, eof), tag) 807 return None 808 809 if isinstance(source, io.BytesIO): 810 self.add_task(redir.write_stream(source.getvalue(), stream, eof), tag) 811 return None 812 813 if isinstance(source, io.StringIO): 814 input_bytes = encode_bytes(source.getvalue(), opts.encoding) 815 self.add_task(redir.write_stream(input_bytes, stream, eof), tag) 816 return None 817 818 return stream 819 820 def _setup_output_sink( 821 self, 822 stream: asyncio.StreamReader, 823 sink: Any, 824 encoding: str, 825 tag: str, 826 limit: Optional[int] = None, 827 ) -> Optional[asyncio.StreamReader]: 828 "Set up a task to write to custom output sink." 829 if isinstance(sink, io.StringIO): 830 self.add_task(redir.copy_stringio(stream, sink, encoding), tag) 831 return None 832 833 if isinstance(sink, io.BytesIO): 834 self.add_task(redir.copy_bytesio(stream, sink), tag) 835 return None 836 837 if isinstance(sink, bytearray): 838 # N.B. `limit` is only supported for bytearray. 839 if limit is not None: 840 self.add_task(redir.copy_bytearray_limit(stream, sink, limit), tag) 841 else: 842 self.add_task(redir.copy_bytearray(stream, sink), tag) 843 return None 844 845 if isinstance(sink, Logger): 846 self.add_task(redir.copy_logger(stream, sink, encoding), tag) 847 return None 848 849 if isinstance(sink, asyncio.StreamWriter): 850 self.add_task(redir.copy_streamwriter(stream, sink), tag) 851 return None 852 853 return stream 854 855 @log_method(LOG_DETAIL) 856 async def __aexit__( 857 self, 858 _exc_type: Union[type[BaseException], None], 859 exc_value: Union[BaseException, None], 860 _exc_tb: Optional[TracebackType], 861 ): 862 "Wait for process to exit and handle cancellation." 863 suppress = False 864 try: 865 suppress = await self._finish(exc_value) 866 except asyncio.CancelledError: 867 LOGGER.debug("Runner cancelled inside _finish %r", self) 868 self._set_cancelled() 869 finally: 870 self._stop_timer() # failsafe just in case 871 self._audit_callback("stop") 872 # If `timeout` expired, raise TimeoutError rather than CancelledError. 873 if ( 874 self._cancelled 875 and self._timed_out 876 and not self.command.options._catch_cancelled_error 877 ): 878 raise asyncio.TimeoutError() 879 return suppress 880 881 @log_method(LOG_DETAIL) 882 async def _finish(self, exc_value: Union[BaseException, None]): 883 "Finish the run. Return True only if `exc_value` should be suppressed." 884 assert self._proc 885 886 try: 887 if exc_value is not None: 888 if _is_cancelled(exc_value): 889 self._set_cancelled() 890 await self._kill() 891 return self._cancelled 892 893 await self._wait() 894 return False 895 896 finally: 897 await self._close() 898 899 @log_method(LOG_DETAIL) 900 async def _close(self): 901 "Make sure that our resources are properly closed." 902 assert self._proc is not None 903 904 if self._options.pty_fds: 905 self._options.pty_fds.close() 906 907 # Make sure the transport is closed (for asyncio and uvloop). 908 self._proc._transport.close() # pyright: ignore 909 910 # _close can be called when unwinding exceptions. We need to handle 911 # the case that the process has not exited yet. 912 if self._proc.returncode is None: 913 LOGGER.critical("Runner._close process still running %r", self._proc) 914 return 915 916 try: 917 # Make sure that original stdin is properly closed. `wait_closed` 918 # will raise a BrokenPipeError if not all input was properly written. 919 if self._proc.stdin is not None: 920 self._proc.stdin.close() 921 await harvest( 922 self._proc.stdin.wait_closed(), 923 timeout=_CLOSE_TIMEOUT, 924 cancel_finish=True, # finish `wait_closed` if cancelled 925 trustee=self, 926 ) 927 928 except asyncio.TimeoutError: 929 LOGGER.critical("Runner._close %r timeout stdin=%r", self, self._proc.stdin) 930 931 def _audit_callback( 932 self, 933 phase: str, 934 *, 935 failure: str = "", 936 signal: Optional[int] = None, 937 ): 938 "Call `audit_callback` if there is one." 939 callback = self.command.options.audit_callback 940 if callback: 941 sig = _signame(signal) if phase == "signal" else "" 942 info: shellous.AuditEventInfo = { 943 "runner": self, 944 "failure": failure, 945 "signal": sig, 946 } 947 callback(phase, info) 948 949 def __repr__(self) -> str: 950 "Return string representation of Runner." 951 cancelled = " cancelled" if self._cancelled else "" 952 if self._proc: 953 procinfo = f" pid={self._proc.pid} exit_code={self.returncode}" 954 else: 955 procinfo = " pid=None" 956 return f"<Runner {self.name!r}{cancelled}{procinfo}>" 957 958 async def _readlines(self): 959 "Iterate over lines in stdout/stderr" 960 stream = self.stdout or self.stderr 961 if stream: 962 async for line in redir.read_lines(stream, self._options.encoding): 963 yield line 964 965 def __aiter__(self) -> AsyncIterator[str]: 966 "Return asynchronous iterator over stdout/stderr." 967 return self._readlines() 968 969 @staticmethod 970 async def run_command( 971 command: "shellous.Command[Any]", 972 *, 973 _run_future: Optional[asyncio.Future["Runner"]] = None, 974 ) -> Union[str, Result]: 975 "Run a command. This is the main entry point for Runner." 976 if not _run_future and _is_multiple_capture(command): 977 LOGGER.warning("run_command: multiple capture requires 'async with'") 978 _cleanup(command) 979 raise ValueError("multiple capture requires 'async with'") 980 981 async with Runner(command) as run: 982 if _run_future is not None: 983 # Return streams to caller in another task. 984 _run_future.set_result(run) 985 986 result = run.result() 987 if command.options._return_result: 988 return result 989 return result.output
Runner is an asynchronous context manager that runs a command.
async with Runner(cmd) as run:
# process streams: run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
413 @property 414 def name(self) -> str: 415 "Return name of process being run." 416 return self.command.name
Return name of process being run.
418 @property 419 def options(self) -> "shellous.Options": 420 "Return options for process being run." 421 return self.command.options
Return options for process being run.
423 @property 424 def command(self) -> "shellous.Command[Any]": 425 "Return the command being run." 426 return self._options.command
Return the command being run.
428 @property 429 def pid(self) -> Optional[int]: 430 "Return the command's process ID." 431 if not self._proc: 432 return None 433 return self._proc.pid
Return the command's process ID.
435 @property 436 def returncode(self) -> Optional[int]: 437 "Process's exit code." 438 if not self._proc: 439 if self._cancelled: 440 # The process was cancelled before starting. 441 return CANCELLED_EXIT_CODE 442 return None 443 code = self._proc.returncode 444 if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None: 445 # After sending a signal, `waitpid` may fail to locate the child 446 # process. In this case, map the status to the last signal we sent. 447 # For more on this, see https://github.com/python/cpython/issues/87744 448 return -self._last_signal # pylint: disable=invalid-unary-operand-type 449 return code
Process's exit code.
451 @property 452 def cancelled(self) -> bool: 453 "Return True if the command was cancelled." 454 return self._cancelled
Return True if the command was cancelled.
456 @property 457 def pty_fd(self) -> Optional[int]: 458 """The file descriptor used to communicate with the child PTY process. 459 460 Returns None if the process is not using a PTY. 461 """ 462 pty_fds = self._options.pty_fds 463 if pty_fds is not None: 464 return pty_fds.parent_fd 465 return None
The file descriptor used to communicate with the child PTY process.
Returns None if the process is not using a PTY.
467 @property 468 def pty_eof(self) -> Optional[bytes]: 469 """Byte sequence used to indicate EOF when written to the PTY child. 470 471 Returns None if process is not using a PTY. 472 """ 473 pty_fds = self._options.pty_fds 474 if pty_fds is not None: 475 return pty_fds.eof 476 return None
Byte sequence used to indicate EOF when written to the PTY child.
Returns None if process is not using a PTY.
478 def result(self, *, check: bool = True) -> Result: 479 "Check process exit code and raise a ResultError if necessary." 480 code = self.returncode 481 if code is None: 482 raise TypeError("Runner.result(): Process has not exited") 483 484 result = Result( 485 exit_code=code, 486 output_bytes=bytes(self._options.output_bytes or b""), 487 error_bytes=bytes(self._options.error_bytes or b""), 488 cancelled=self._cancelled, 489 encoding=self._options.encoding, 490 ) 491 492 if not check: 493 return result 494 495 return check_result( 496 result, 497 self.command.options, 498 self._cancelled, 499 self._timed_out, 500 )
Check process exit code and raise a ResultError if necessary.
502 def add_task( 503 self, 504 coro: Coroutine[Any, Any, _T], 505 tag: str = "", 506 ) -> asyncio.Task[_T]: 507 "Add a background task." 508 task_name = f"{self.name}#{tag}" 509 task = asyncio.create_task(coro, name=task_name) 510 self._tasks.append(task) 511 return task
Add a background task.
513 def send_signal(self, sig: int) -> None: 514 "Send an arbitrary signal to the process if it is running." 515 if self.returncode is None: 516 self._signal(sig)
Send an arbitrary signal to the process if it is running.
518 def cancel(self) -> None: 519 "Cancel the running process if it is running." 520 if self.returncode is None: 521 self._signal(self.command.options.cancel_signal)
Cancel the running process if it is running.
626 @log_method(LOG_DETAIL) 627 async def __aenter__(self): 628 "Set up redirections and launch subprocess." 629 self._audit_callback("start") 630 try: 631 return await self._start() 632 except BaseException as ex: 633 self._stop_timer() # failsafe just in case 634 self._audit_callback("stop", failure=type(ex).__name__) 635 raise 636 finally: 637 if self._cancelled and self.command.options._catch_cancelled_error: 638 # Raises ResultError instead of CancelledError. 639 self.result()
Set up redirections and launch subprocess.
965 def __aiter__(self) -> AsyncIterator[str]: 966 "Return asynchronous iterator over stdout/stderr." 967 return self._readlines()
Return asynchronous iterator over stdout/stderr.
969 @staticmethod 970 async def run_command( 971 command: "shellous.Command[Any]", 972 *, 973 _run_future: Optional[asyncio.Future["Runner"]] = None, 974 ) -> Union[str, Result]: 975 "Run a command. This is the main entry point for Runner." 976 if not _run_future and _is_multiple_capture(command): 977 LOGGER.warning("run_command: multiple capture requires 'async with'") 978 _cleanup(command) 979 raise ValueError("multiple capture requires 'async with'") 980 981 async with Runner(command) as run: 982 if _run_future is not None: 983 # Return streams to caller in another task. 984 _run_future.set_result(run) 985 986 result = run.result() 987 if command.options._return_result: 988 return result 989 return result.output
Run a command. This is the main entry point for Runner.
992class PipeRunner: 993 """PipeRunner is an asynchronous context manager that runs a pipeline. 994 995 ``` 996 async with pipe.run() as run: 997 # process run.stdin, run.stdout, run.stderr (if not None) 998 result = run.result() 999 ``` 1000 """ 1001 1002 stdin: Optional[asyncio.StreamWriter] = None 1003 "Pipeline standard input." 1004 1005 stdout: Optional[asyncio.StreamReader] = None 1006 "Pipeline standard output." 1007 1008 stderr: Optional[asyncio.StreamReader] = None 1009 "Pipeline standard error." 1010 1011 _pipe: "shellous.Pipeline[Any]" 1012 _capturing: bool 1013 _tasks: list[asyncio.Task[Any]] 1014 _encoding: str 1015 _cancelled: bool = False 1016 _results: Optional[list[Union[BaseException, Result]]] = None 1017 _pid: int = -1 1018 1019 def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool): 1020 """`capturing=True` indicates we are within an `async with` block and 1021 client needs to access `stdin` and `stderr` streams. 1022 """ 1023 assert len(pipe.commands) > 1 1024 1025 self._pipe = pipe 1026 self._cancelled = False 1027 self._tasks = [] 1028 self._capturing = capturing 1029 self._encoding = pipe.options.encoding 1030 1031 @property 1032 def name(self) -> str: 1033 "Return name of the pipeline." 1034 return self._pipe.name 1035 1036 @property 1037 def options(self) -> "shellous.Options": 1038 """Return options for pipeline being run. 1039 1040 These are the options for the last command in the pipeline. 1041 """ 1042 return self._pipe.options 1043 1044 @property 1045 def pid(self) -> Optional[int]: 1046 """Return the process ID for the first command in the pipeline. 1047 1048 The PID is only available when `capturing=True`. 1049 """ 1050 if self._pid < 0: 1051 return None 1052 return self._pid 1053 1054 def result(self, *, check: bool = True) -> Result: 1055 "Return `Result` object for PipeRunner." 1056 assert self._results is not None 1057 1058 result = convert_result_list(self._results, self._cancelled) 1059 if not check: 1060 return result 1061 1062 return check_result(result, self._pipe.options, self._cancelled) 1063 1064 def add_task( 1065 self, 1066 coro: Coroutine[Any, Any, _T], 1067 tag: str = "", 1068 ) -> asyncio.Task[_T]: 1069 "Add a background task." 1070 task_name = f"{self.name}#{tag}" 1071 task = asyncio.create_task(coro, name=task_name) 1072 self._tasks.append(task) 1073 return task 1074 1075 @log_method(LOG_DETAIL) 1076 async def _wait(self, *, kill: bool = False): 1077 "Wait for pipeline to finish." 1078 assert self._results is None 1079 1080 if kill: 1081 LOGGER.debug("PipeRunner.wait killing pipe %r", self) 1082 for task in self._tasks: 1083 task.cancel() 1084 1085 cancelled, self._results = await harvest_results(*self._tasks, trustee=self) 1086 if cancelled: 1087 self._cancelled = True 1088 self._tasks.clear() # clear all tasks when done 1089 1090 @log_method(LOG_DETAIL) 1091 async def __aenter__(self) -> "PipeRunner": 1092 "Set up redirections and launch pipeline." 1093 try: 1094 return await self._start() 1095 except (Exception, asyncio.CancelledError) as ex: 1096 LOGGER.warning("PipeRunner enter %r ex=%r", self, ex) 1097 if _is_cancelled(ex): 1098 self._cancelled = True 1099 await self._wait(kill=True) 1100 raise 1101 1102 @log_method(LOG_DETAIL) 1103 async def __aexit__( 1104 self, 1105 _exc_type: Union[type[BaseException], None], 1106 exc_value: Union[BaseException, None], 1107 _exc_tb: Optional[TracebackType], 1108 ): 1109 "Wait for pipeline to exit and handle cancellation." 1110 suppress = False 1111 try: 1112 suppress = await self._finish(exc_value) 1113 except asyncio.CancelledError: 1114 LOGGER.warning("PipeRunner cancelled inside _finish %r", self) 1115 self._cancelled = True 1116 return suppress 1117 1118 @log_method(LOG_DETAIL) 1119 async def _finish(self, exc_value: Optional[BaseException]) -> bool: 1120 "Wait for pipeline to exit and handle cancellation." 1121 if exc_value is not None: 1122 LOGGER.warning("PipeRunner._finish exc_value=%r", exc_value) 1123 if _is_cancelled(exc_value): 1124 self._cancelled = True 1125 await self._wait(kill=True) 1126 return self._cancelled 1127 1128 await self._wait() 1129 return False 1130 1131 @log_method(LOG_DETAIL) 1132 async def _start(self): 1133 "Set up redirection and launch pipeline." 1134 open_fds: list[int] = [] 1135 1136 try: 1137 stdin = None 1138 stdout = None 1139 stderr = None 1140 1141 cmds = self._setup_pipeline(open_fds) 1142 1143 if self._capturing: 1144 stdin, stdout, stderr = await self._setup_capturing(cmds) 1145 else: 1146 for cmd in cmds: 1147 self.add_task(cmd.coro()) 1148 1149 self.stdin = stdin 1150 self.stdout = stdout 1151 self.stderr = stderr 1152 1153 return self 1154 1155 except BaseException: # pylint: disable=broad-except 1156 # Clean up after any exception *including* CancelledError. 1157 close_fds(open_fds) 1158 raise 1159 1160 def _setup_pipeline(self, open_fds: list[int]): 1161 """Return the pipeline stitched together with pipe fd's. 1162 1163 Each created open file descriptor is added to `open_fds` so it can 1164 be closed if there's an exception later. 1165 """ 1166 cmds = list(self._pipe.commands) 1167 1168 cmd_count = len(cmds) 1169 for i in range(cmd_count - 1): 1170 (read_fd, write_fd) = os.pipe() 1171 open_fds.extend((read_fd, write_fd)) 1172 1173 cmds[i] = cmds[i].stdout(write_fd, close=True) 1174 cmds[i + 1] = cmds[i + 1].stdin(read_fd, close=True) 1175 1176 for i in range(cmd_count): 1177 cmds[i] = cmds[i].set(_return_result=True, _catch_cancelled_error=True) 1178 1179 return cmds 1180 1181 @log_method(LOG_DETAIL) 1182 async def _setup_capturing(self, cmds: "list[shellous.Command[Any]]"): 1183 """Set up capturing and return (stdin, stdout, stderr) streams.""" 1184 loop = asyncio.get_event_loop() 1185 first_fut = loop.create_future() 1186 last_fut = loop.create_future() 1187 1188 first_coro = cmds[0].coro(_run_future=first_fut) 1189 last_coro = cmds[-1].coro(_run_future=last_fut) 1190 middle_coros = [cmd.coro() for cmd in cmds[1:-1]] 1191 1192 # Tag each task name with the index of the command in the pipe. 1193 self.add_task(first_coro, "0") 1194 for i, coro in enumerate(middle_coros): 1195 self.add_task(coro, str(i + 1)) 1196 self.add_task(last_coro, str(len(cmds) - 1)) 1197 1198 # When capturing, we need the first and last commands in the 1199 # pipe to signal when they are ready. 1200 first_ready, last_ready = await asyncio.gather(first_fut, last_fut) 1201 1202 stdin, stdout, stderr = ( 1203 first_ready.stdin, 1204 last_ready.stdout, 1205 last_ready.stderr, 1206 ) 1207 self._pid = first_ready.pid 1208 1209 return (stdin, stdout, stderr) 1210 1211 def __repr__(self) -> str: 1212 "Return string representation of PipeRunner." 1213 cancelled_info = "" 1214 if self._cancelled: 1215 cancelled_info = " cancelled" 1216 result_info = "" 1217 if self._results: 1218 result_info = f" results={self._results!r}" 1219 return f"<PipeRunner {self.name!r}{cancelled_info}{result_info}>" 1220 1221 async def _readlines(self) -> AsyncIterator[str]: 1222 "Iterate over lines in stdout/stderr" 1223 stream = self.stdout or self.stderr 1224 if stream: 1225 async for line in redir.read_lines(stream, self._encoding): 1226 yield line 1227 1228 def __aiter__(self) -> AsyncIterator[str]: 1229 "Return asynchronous iterator over stdout/stderr." 1230 return self._readlines() 1231 1232 @staticmethod 1233 async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]: 1234 "Run a pipeline. This is the main entry point for PipeRunner." 1235 run = PipeRunner(pipe, capturing=False) 1236 async with run: 1237 pass 1238 1239 result = run.result() 1240 if pipe.options._return_result: 1241 return result 1242 return result.output
PipeRunner is an asynchronous context manager that runs a pipeline.
async with pipe.run() as run:
# process run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
1019 def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool): 1020 """`capturing=True` indicates we are within an `async with` block and 1021 client needs to access `stdin` and `stderr` streams. 1022 """ 1023 assert len(pipe.commands) > 1 1024 1025 self._pipe = pipe 1026 self._cancelled = False 1027 self._tasks = [] 1028 self._capturing = capturing 1029 self._encoding = pipe.options.encoding
1031 @property 1032 def name(self) -> str: 1033 "Return name of the pipeline." 1034 return self._pipe.name
Return name of the pipeline.
1036 @property 1037 def options(self) -> "shellous.Options": 1038 """Return options for pipeline being run. 1039 1040 These are the options for the last command in the pipeline. 1041 """ 1042 return self._pipe.options
Return options for pipeline being run.
These are the options for the last command in the pipeline.
1044 @property 1045 def pid(self) -> Optional[int]: 1046 """Return the process ID for the first command in the pipeline. 1047 1048 The PID is only available when `capturing=True`. 1049 """ 1050 if self._pid < 0: 1051 return None 1052 return self._pid
Return the process ID for the first command in the pipeline.
The PID is only available when capturing=True
.
1054 def result(self, *, check: bool = True) -> Result: 1055 "Return `Result` object for PipeRunner." 1056 assert self._results is not None 1057 1058 result = convert_result_list(self._results, self._cancelled) 1059 if not check: 1060 return result 1061 1062 return check_result(result, self._pipe.options, self._cancelled)
Return Result
object for PipeRunner.
1064 def add_task( 1065 self, 1066 coro: Coroutine[Any, Any, _T], 1067 tag: str = "", 1068 ) -> asyncio.Task[_T]: 1069 "Add a background task." 1070 task_name = f"{self.name}#{tag}" 1071 task = asyncio.create_task(coro, name=task_name) 1072 self._tasks.append(task) 1073 return task
Add a background task.
1090 @log_method(LOG_DETAIL) 1091 async def __aenter__(self) -> "PipeRunner": 1092 "Set up redirections and launch pipeline." 1093 try: 1094 return await self._start() 1095 except (Exception, asyncio.CancelledError) as ex: 1096 LOGGER.warning("PipeRunner enter %r ex=%r", self, ex) 1097 if _is_cancelled(ex): 1098 self._cancelled = True 1099 await self._wait(kill=True) 1100 raise
Set up redirections and launch pipeline.
1228 def __aiter__(self) -> AsyncIterator[str]: 1229 "Return asynchronous iterator over stdout/stderr." 1230 return self._readlines()
Return asynchronous iterator over stdout/stderr.
1232 @staticmethod 1233 async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]: 1234 "Run a pipeline. This is the main entry point for PipeRunner." 1235 run = PipeRunner(pipe, capturing=False) 1236 async with run: 1237 pass 1238 1239 result = run.result() 1240 if pipe.options._return_result: 1241 return result 1242 return result.output
Run a pipeline. This is the main entry point for PipeRunner.
74class AuditEventInfo(TypedDict): 75 """Info attached to each audit callback event. 76 77 See `audit_callback` in `Command.set` for more information. 78 """ 79 80 runner: Runner 81 "Reference to the Runner object." 82 83 failure: str 84 "When phase is 'stop', the name of the exception from starting the process." 85 86 signal: str 87 "When phase is 'signal', the signal name/number sent to the process."
Info attached to each audit callback event.
See audit_callback
in Command.set
for more information.