shellous
Async Processes and Pipelines
shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.
import asyncio
from shellous import sh
async def main():
result = await sh("echo", "hello")
print(result)
asyncio.run(main())
Benefits
- Run programs asynchronously in a single line.
- Redirect stdin, stdout and stderr to files, memory buffers, async streams or loggers.
- Iterate asynchronously over subprocess output.
- Set timeouts and reliably cancel running processes.
- Run a program with a pseudo-terminal (pty).
- Use send() and expect() to manually control a subprocess.
- Construct pipelines and use process substitution directly from Python (no shell required).
- Runs on Linux, MacOS, FreeBSD and Windows.
- Monitor processes being started and stopped with
audit_callbackAPI.
Requirements
- Requires Python 3.9 or later.
- Requires an asyncio event loop.
- Pseudo-terminals require a Unix system.
- Process substitution requires a Unix system with /dev/fd support.
Running a Command
The tutorial in this README uses the asyncio REPL built into Python. In these examples, >>>
is the REPL prompt.
Start the asyncio REPL by typing python3 -m asyncio, and import sh from the shellous module:
>>> from shellous import sh
Here's a command that runs echo "hello, world".
>>> await sh("echo", "hello, world")
'hello, world\n'
The first argument to sh is the program name. It is followed by zero or more arguments. Each argument will be
converted to a string. If an argument is a list or tuple, it is flattened recursively.
>>> await sh("echo", 1, 2, [3, 4, (5, 6)])
'1 2 3 4 5 6\n'
A command does not run until you await it. When you run a command using await, it returns the value of the standard output interpreted as a UTF-8 string.
It is safe to await the same command object more than once.
Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc") will run the same command as echo -n "abc".
>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'
Commands are immutable objects that represent a program invocation: program name, arguments, environment
variables, redirection operators and other settings. When you use a method to modify a Command, you are
returning a new Command object. The original object is unchanged.
You can wrap your commands in a function to improve type safety:
>>> from shellous import Command
>>> def exclaim(word: str) -> Command[str]:
... return sh("echo", "-n", f"{word}!!")
...
>>> await exclaim("Oh")
'Oh!!'
The type hint Command[str] indicates that the command returns a str.
Arguments
Commands use positional arguments only; keyword arguments are not supported.
In most cases, shellous automatically converts Python objects passed as command arguments to str or bytes. As described above, the list and tuple types are an exception; they are recursively flattened before their elements are converted to strings.
Dicts, sets, and generator types are not supported as arguments. Their string format doesn't make sense as a command line argument.
Results
When a command completes successfully, it returns the standard output (or "" if stdout is redirected). For a more detailed response, you can specify that the command should return a Result object by using the .result modifier:
>>> await echo.result("abc")
Result(exit_code=0, output_bytes=b'abc', error_bytes=b'', cancelled=False, encoding='utf-8')
A Result object contains the command's exit_code in addition to its output. A Result is True if
the command's exit_code is zero. You can access the string value of the output using the .output property:
if result := await sh.result("cat", "some-file"):
output = result.output
else:
print(f"Command failed with exit_code={result.exit_code})
You can retrieve the string value of the standard error using the .error property. (By default, only the
first 1024 bytes of standard error is stored.)
If a command was terminated by a signal, the exit_code will be the negative signal number.
The return value of sh.result("cmd", ...) uses the type hint Command[Result].
ResultError
If you are not using the .result modifier and a command fails, it raises a ResultError exception:
>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'cat: does_not_exist: No such file or directory\n', cancelled=False, encoding='utf-8')
The ResultError exception contains a Result object with the exit_code and the first 1024 bytes of standard error.
In some cases, you want to ignore certain exit code values. That is, you want to treat them as if they are
normal. To do this, you can set the exit_codes option:
>>> await sh("cat", "does_not_exist").set(exit_codes={0,1})
''
If there is a problem launching a process, shellous can also raise a separate FileNotFoundError or PermissionError exception.
Async For
Using await to run a command collects the entire output of the command in memory before returning it. You
can also iterate over the output lines as they arrive using async for.
>>> [line async for line in echo("hi\n", "there")]
['hi\n', ' there']
Use an async for loop when you want to examine the stream of output from a command, line by line. For example, suppose you want to run tail on a log file.
async for line in sh("tail", "-f", "/var/log/syslog"):
if "ERROR" in line:
print(line.rstrip())
Async With
You can use a command as an asynchronous context manager. There are two ways to run a program using a context manager: a low-level API and a high-level API.
Byte-by-Byte (Low Level)
Use async with directly when you need byte-by-byte
control over the individual streams: stdin, stdout and stderr. To control a standard stream, you
must tell shellous to "capture" it (For more on this, see Redirection.)
cmd = sh("cat").stdin(sh.CAPTURE).stdout(sh.CAPTURE)
async with cmd as run:
run.stdin.write(b"abc")
run.stdin.close()
print(await run.stdout.readline())
result = run.result()
The streams run.stdout and run.stderr are asyncio.StreamReader objects. The stream run.stdin
is an asyncio.StreamWriter object. If we didn't specify that stdin/stdout are sh.CAPTURE, the
streams run.stdin and run.stdout would be
None.
The return value of run.result() is a Result object. Depending on the command settings, this
function may raise a ResultError on a non-zero exit code.
:warning: When reading or writing individual streams, you are responsible for managing reads and writes so they don't deadlock. You may use
run.create_taskto schedule a concurrent task.
You can also use async with to run a server. When you do so, you must tell the server
to stop using run.cancel(). Otherwise, the context manager will wait forever for the process to exit.
async with sh("some-server") as run:
# Send commands to the server here...
# Manually signal the server to stop.
run.cancel()
Prompt with Send/Expect (High Level API)
Use the prompt() method to control a process using send and expect. The prompt()
method returns an asynchronous context manager (the Prompt class) that facilitates reading and
writing strings and matching regular expressions.
cmd = sh("cat").set(pty=True)
async with cmd.prompt() as client:
await client.send("abc")
output, _ = await client.expect("\r\n")
print(output)
The Prompt API automatically captures stdin and stdout.
Here is another example of controlling a bash co-process running in a docker container.
async def list_packages():
"Run bash in an ubuntu docker container and list packages."
bash_prompt = re.compile("root@[0-9a-f]+:/[^#]*# ")
cmd = sh("docker", "run", "-it", "--rm", "-e", "TERM=dumb", "ubuntu")
async with cmd.set(pty=True).prompt(bash_prompt, timeout=3) as cli:
# Read up to first prompt.
await cli.expect()
# Disable echo. The `command()` method combines send *and* expect methods.
await cli.command("stty -echo")
# Return list of packages.
result = await cli.command("apt-cache pkgnames")
return result.strip().split("\r\n")
# You can check the result object's exit code. You can only
# access `cli.result` outside the `async with` block.
assert cli.result.exit_code == 0
The prompt() API does not raise a ResultError when a command exits with an error status.
Typically, you'll see an EOFError when you were expecting to read a response. You can check the
exit status by retrieving the Prompt's result property outside of the async with block.
Redirection
shellous supports the redirection operators | and >>. They work similar to how they work in
the unix shell. Shellous does not support use of < or > for redirection. Instead, replace these
with |.
To redirect to or from a file, use a pathlib.Path object. Alternatively, you can redirect input/output
to a StringIO object, an open file, a Logger, or use a special redirection constant like sh.DEVNULL.
:warning: When combining the redirect operators with
await, you must use parentheses;awaithas higher precedence than|and>>.
Redirecting Standard Input
To redirect standard input, use the pipe operator | with the argument on the left-side.
Here is an example that passes the string "abc" as standard input.
>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
' 3\n'
To read input from a file, use a Path object from pathlib.
>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
' 201\n'
Shellous supports different STDIN behavior when using different Python types.
| Python Type | Behavior as STDIN |
|---|---|
| str | Read input from string object. |
| bytes, bytearray | Read input from bytes object. |
| Path | Read input from file specified by Path. |
| File, StringIO, ByteIO | Read input from open file object. |
| int | Read input from existing file descriptor. |
| asyncio.StreamReader | Read input from StreamReader. |
| sh.DEVNULL | Read input from /dev/null. |
| sh.INHERIT | Read input from existing sys.stdin. |
| sh.CAPTURE | You will write to stdin interactively. |
Redirecting Standard Output
To redirect standard output, use the pipe operator | with the argument on the right-side. Here is an
example that writes to a temporary file.
>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'
To redirect standard output with append, use the >> operator.
>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'
Shellous supports different STDOUT behavior when using different Python types.
| Python Type | Behavior as STDOUT/STDERR |
|---|---|
| Path | Write output to the file path specified by Path. |
| bytearray | Write output to a mutable byte array. |
| File, StringIO, ByteIO | Write output to an open file object. |
| int | Write output to existing file descriptor at its current position. â—† |
| logging.Logger | Log each line of output. â—† |
| asyncio.StreamWriter | Write output to StreamWriter. â—† |
| sh.CAPTURE | Capture output for async with. â—† |
| sh.DEVNULL | Write output to /dev/null. â—† |
| sh.INHERIT | Write output to existing sys.stdout or sys.stderr. â—† |
â—† For these types, there is no difference between using | and >>.
Shellous does not support redirecting standard output/error to a plain str or bytes object.
If you intend to redirect output to a file, you must use a pathlib.Path object.
Redirecting Standard Error
By default, the first 1024 bytes read from standard error are stored in the Result object.
Any further bytes are discarded. You can change the 1024 byte limit using the error_limit
option.
To redirect standard error, use the stderr method. Standard error supports the
same Python types as standard output. To append, set append=True in the stderr method.
To redirect stderr to the same place as stdout, use the sh.STDOUT constant. If you also
redirect stdout to sh.DEVNULL, you will only receive the standard error.
>>> cmd = sh("cat", "does_not_exist").stderr(sh.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'
To redirect standard error to the hosting program's sys.stderr, use the sh.INHERIT redirect
option.
>>> cmd = sh("cat", "does_not_exist").stderr(sh.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'', cancelled=False, encoding='utf-8')
If you redirect stderr, it will no longer be stored in the Result object, and the error_limit option
will not apply.
Default Redirections
For regular commands, the default redirections are:
- Standard input is read from the empty string ("").
- Standard out is buffered and stored in the Result object (BUFFER).
- First 1024 bytes of standard error is buffered and stored in the Result object (BUFFER).
However, the default redirections are adjusted when using a pseudo-terminal (pty):
- Standard input is captured and ignored (CAPTURE).
- Standard out is buffered and stored in the Result object (BUFFER).
- Standard error is redirected to standard output (STDOUT).
When you use the Prompt API, the standard input and standard output are automatically redirected to CAPTURE.
Pipelines
You can create a pipeline by combining commands using the | operator. A pipeline feeds the standard out of one process into the next process as standard input. Here is the shellous
equivalent to the bash command: ls | grep README
>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'
A pipeline returns a Result if the last command in the pipeline has the .result modifier. To set other
options like encoding for a Pipeline, set them on the last command.
>>> pipe = sh("ls") | sh("grep", "README").result
>>> await pipe
Result(exit_code=0, output_bytes=b'README.md\n', error_bytes=b'', cancelled=False, encoding='utf-8')
Error reporting for a pipeline is implemented similar to using the -o pipefail shell option.
Pipelines support the same await/async for/async with operations that work on a single command, including
the Prompt API.
>>> [line.strip() async for line in pipe]
['README.md']
Process Substitution (Unix Only)
You can pass a shell command as an argument to another. Here is the shellous equivalent to the bash
command: grep README <(ls).
>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'
Use .writable to write to a command instead.
>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | sh.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')
The above example is equivalent to ls | tee >(grep README > buf) > /dev/null.
Timeouts
You can specify a timeout using the timeout option. If the timeout expires, shellous will raise
a TimeoutError.
>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
...
TimeoutError
Timeouts are just a special case of cancellation. When a command is cancelled, shellous terminates
the running process and raises a CancelledError.
>>> t = asyncio.create_task(sh("sleep", 60).coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
...
CancelledError
By default, shellous will send a SIGTERM signal to the process to tell it to exit. If the process does not
exit within 3 seconds, shellous will send a SIGKILL signal. You can change these defaults with the
cancel_signal and cancel_timeout settings. A command is not considered fully cancelled until the
process exits.
Pseudo-Terminal Support (Unix Only)
To run a command through a pseudo-terminal, set the pty option to True.
>>> await sh("echo", "in a pty").set(pty=True)
'in a pty\r\n'
Alternatively, you can pass a pty function to configure the tty mode and size.
>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'
Shellous provides three built-in helper functions: shellous.cooked(), shellous.raw() and shellous.cbreak().
Context Objects
You can store shared command settings in an immutable context object (CmdContext). To create a new context object, specify your changes to the default context sh:
>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh_audit = sh.set(audit_callback=auditor)
Now all commands created with sh_audit will log their progress using the audit callback.
>>> await sh_audit("echo", "goodbye")
start echo
stop echo
'goodbye\n'
You can also create a context object that specifies all return values are Result objects.
>>> rsh = sh.result
>>> await rsh("echo", "whatever")
Result(exit_code=0, output_bytes=b'whatever\n', error_bytes=b'', cancelled=False, encoding='utf-8')
Options
Both Command and CmdContext support options to control their runtime behavior. Some of these options (timeout,
pty, audit_callback, and exit_codes) have been described above. See the shellous.Options class for
more information.
You can retrieve an option from cmd with cmd.options.<option>. For example, use cmd.options.encoding to obtain the encoding:
>>> cmd = sh("echo").set(encoding="latin1")
>>> cmd.options.encoding
'latin1'
Command and CmdContext use the .set() method to specify most options:
| Option | Description |
|---|---|
| path | Search path to use instead of the PATH environment variable. (Default=None) |
| env | Additional environment variables to pass to the command. (Default={}) |
| inherit_env | True if command should inherit the environment variables from the current process. (Default=True) |
| encoding | Text encoding of input/output streams. You can specify an error handling scheme by including it after a space, e.g. "ascii backslashreplace". (Default="utf-8 strict") |
| exit_codes | Set of exit codes that do not raise a ResultError. (Default={0}) |
| timeout | Timeout in seconds to wait before cancelling the process. (Default=None) |
| cancel_timeout | Timeout in seconds to wait for a cancelled process to exit before forcefully terminating it. (Default=3s) |
| cancel_signal | The signal sent to a process when it is cancelled. (Default=SIGTERM) |
| alt_name | Alternate name for the process used for debug logging. (Default=None) |
| pass_fds | Additional file descriptors to pass to the process. (Default={}) |
| pass_fds_close | True if descriptors in pass_fds should be closed after the child process is launched. (Default=False) |
| pty | Used to allocate a pseudo-terminal (PTY). (Default=False) |
| close_fds | True if process should close all file descriptors when it starts. This setting defaults to False to align with posix_spawn requirements. (Default=False) |
| audit_callback | Provide function to audit stages of process execution. (Default=None) |
| coerce_arg | Provide function to coerce Command arguments to strings when str() is not sufficient. For example, you can provide your own function that converts a dictionary argument to a sequence of strings. (Default=None) |
| error_limit | Maximum number of initial bytes of STDERR to store in Result object. (Default=1024) |
| read_buffer_limit | Maximum number of bytes to read when looking for a separator. (Default=65536) |
env
Use the env() method to add to the list of environment variables. The env() method supports keyword parameters.
You can call env() more than once and the effect is additive.
>>> cmd = sh("echo").env(ENV1="a", ENV2="b").env(ENV2=3)
>>> cmd.options.env
{'ENV1': 'a', 'ENV2': '3'}
Use the env option with set() when you want to replace all the environment variables.
input, output, error
When you apply a redirection operator to a Command or CmdContext, the redirection targets
are also stored in the Options object. To change these, use the .stdin(), .stdout(), or .stderr() methods or the redirection operator |.
| Option | Description |
|---|---|
| input | The redirection target for standard input. |
| input_close | True if standard input should be closed after the process is launched. |
| output | The redirection target for standard output. |
| output_append | True if standard output should be open for append. |
| output_close | True if standard output should be closed after the process is launched. |
| error | The redirection target for standard error. |
| error_append | True if standard error should be open for append. |
| error_close | True if standard error should be closed after the process is launched. |
Type Checking
Shellous fully supports PEP 484 type hints.
Commands
Commands are generic on the return type, either str or Result. You will specify the
type of a command object as Command[str] or Command[Result].
Use the result modifier to obtain a Command[Result] from a Command[str].
from shellous import sh, Command, Result
cmd1: Command[str] = sh("echo", "abc")
# When you `await cmd1`, the result is a `str` object.
cmd2: Command[Result] = sh.result("echo", "abc")
# When you `await cmd2`, the result is a `Result` object.
CmdContext
The CmdContext class is also generic on either str or Result.
from shellous import sh, CmdContext, Result
sh1: CmdContext[str] = sh.set(path="/bin:/usr/bin")
# When you use `sh1` to create commands, it produces `Command[str]` object with the given path.
sh2: CmdContext[Result] = sh.result.set(path="/bin:/usr/bin")
# When you use `sh2` to create commands, it produces `Command[Result]` objects with the given path.
Logging
For verbose logging, shellous supports a SHELLOUS_TRACE environment variable. Set the
value of SHELLOUS_TRACE to a comma-delimited list of options:
detail: Enables detailed logging used to trace the steps of running a command.
prompt: Enables logging in the
Promptclass when controlling a program using send/expect.all: Enables all logging options.
Shellous uses the built-in Python logging module. After enabling these options,
the shellous logger will display log messages at the INFO level.
Without these options enabled, Shellous generates almost no log messages.
1""" 2.. include:: ../README.md 3""" 4 5# pylint: disable=cyclic-import 6# pyright: reportUnusedImport=false 7 8__version__ = "0.40.0" 9 10import sys 11import warnings 12 13from .command import AuditEventInfo, CmdContext, Command, Options 14from .pipeline import Pipeline 15from .prompt import Prompt 16from .pty_util import cbreak, cooked, raw 17from .result import Result, ResultError 18from .runner import PipeRunner, Runner 19 20if sys.version_info[:3] in [(3, 10, 9), (3, 11, 1)]: 21 # Warn about these specific Python releases: 3.10.9 and 3.11.1 22 # These releases have a known race condition. 23 warnings.warn( # pragma: no cover 24 "Python 3.10.9 and Python 3.11.1 are unreliable with respect to " 25 + "asyncio subprocesses. Consider a newer Python release: 3.10.10+ " 26 + "or 3.11.2+. (https://github.com/python/cpython/issues/100133)", 27 RuntimeWarning, 28 ) 29 30 31sh: CmdContext[str] = CmdContext() 32"""`sh` is the default command context (`CmdContext`). 33 34Use `sh` to create commands or new command contexts. 35 36```python 37from shellous import sh 38result = await sh("echo", "hello") 39``` 40""" 41 42__all__ = [ 43 "sh", 44 "CmdContext", 45 "Command", 46 "Options", 47 "Pipeline", 48 "Prompt", 49 "cbreak", 50 "cooked", 51 "raw", 52 "Result", 53 "ResultError", 54 "Runner", 55 "PipeRunner", 56 "AuditEventInfo", 57]
sh is the default command context (CmdContext).
Use sh to create commands or new command contexts.
from shellous import sh
result = await sh("echo", "hello")
282@dataclass(frozen=True) 283class CmdContext(Generic[_RT]): 284 """Concrete class for an immutable execution context.""" 285 286 CAPTURE: ClassVar[Redirect] = Redirect.CAPTURE 287 "Capture and read/write stream manually." 288 289 DEVNULL: ClassVar[Redirect] = Redirect.DEVNULL 290 "Redirect to /dev/null." 291 292 INHERIT: ClassVar[Redirect] = Redirect.INHERIT 293 "Redirect to same place as existing stdin/stderr/stderr." 294 295 STDOUT: ClassVar[Redirect] = Redirect.STDOUT 296 "Redirect stderr to same place as stdout." 297 298 BUFFER: ClassVar[Redirect] = Redirect.BUFFER 299 "Redirect output to a buffer in the Result object. This is the default for stdout/stderr." 300 301 options: Options = field(default_factory=Options) 302 "Default command options." 303 304 def stdin( 305 self, 306 input_: Any, 307 *, 308 close: bool = False, 309 ) -> "CmdContext[_RT]": 310 "Return new context with updated `input` settings." 311 new_options = self.options.set_stdin(input_, close) 312 return CmdContext(new_options) 313 314 def stdout( 315 self, 316 output: Any, 317 *, 318 append: bool = False, 319 close: bool = False, 320 ) -> "CmdContext[_RT]": 321 "Return new context with updated `output` settings." 322 new_options = self.options.set_stdout(output, append, close) 323 return CmdContext(new_options) 324 325 def stderr( 326 self, 327 error: Any, 328 *, 329 append: bool = False, 330 close: bool = False, 331 ) -> "CmdContext[_RT]": 332 "Return new context with updated `error` settings." 333 new_options = self.options.set_stderr(error, append, close) 334 return CmdContext(new_options) 335 336 def env(self, **kwds: Any) -> "CmdContext[_RT]": 337 """Return new context with augmented environment.""" 338 new_options = self.options.add_env(kwds) 339 return CmdContext(new_options) 340 341 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 342 self, 343 *, 344 path: Unset[Optional[str]] = _UNSET, 345 env: Unset[dict[str, Any]] = _UNSET, 346 inherit_env: Unset[bool] = _UNSET, 347 encoding: Unset[str] = _UNSET, 348 _return_result: Unset[bool] = _UNSET, 349 _catch_cancelled_error: Unset[bool] = _UNSET, 350 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 351 timeout: Unset[Optional[float]] = _UNSET, 352 cancel_timeout: Unset[float] = _UNSET, 353 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 354 alt_name: Unset[Optional[str]] = _UNSET, 355 pass_fds: Unset[Iterable[int]] = _UNSET, 356 pass_fds_close: Unset[bool] = _UNSET, 357 _writable: Unset[bool] = _UNSET, 358 _start_new_session: Unset[bool] = _UNSET, 359 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 360 pty: Unset[PtyAdapterOrBool] = _UNSET, 361 close_fds: Unset[bool] = _UNSET, 362 audit_callback: Unset[_AuditFnT] = _UNSET, 363 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 364 error_limit: Unset[Optional[int]] = _UNSET, 365 ) -> "CmdContext[_RT]": 366 """Return new context with custom options set. 367 368 See `Command.set` for option reference. 369 """ 370 kwargs = locals() 371 del kwargs["self"] 372 if not encoding: 373 raise TypeError("invalid encoding") 374 return CmdContext(self.options.set(kwargs)) 375 376 def __call__(self, *args: Any) -> "Command[_RT]": 377 "Construct a new command." 378 return Command(coerce(args, self.options.coerce_arg), self.options) 379 380 @property 381 def result(self) -> "CmdContext[shellous.Result]": 382 "Set `_return_result` and `exit_codes`." 383 return cast( 384 CmdContext[shellous.Result], 385 self.set( 386 _return_result=True, 387 exit_codes=range(-255, 256), 388 ), 389 ) 390 391 def find_command(self, name: str) -> Optional[Path]: 392 """Find the command with the given name and return its filesystem path. 393 394 Return None if the command name is not found in the search path. 395 396 Use the `path` variable specified by the context if set. Otherwise, the 397 default behavior is to use the `PATH` environment variable with a 398 fallback to the value of `os.defpath`. 399 """ 400 result = self.options.which(name) 401 if not result: 402 return None 403 return Path(result)
Concrete class for an immutable execution context.
Capture and read/write stream manually.
Redirect to same place as existing stdin/stderr/stderr.
Redirect stderr to same place as stdout.
Redirect output to a buffer in the Result object. This is the default for stdout/stderr.
304 def stdin( 305 self, 306 input_: Any, 307 *, 308 close: bool = False, 309 ) -> "CmdContext[_RT]": 310 "Return new context with updated `input` settings." 311 new_options = self.options.set_stdin(input_, close) 312 return CmdContext(new_options)
Return new context with updated input settings.
314 def stdout( 315 self, 316 output: Any, 317 *, 318 append: bool = False, 319 close: bool = False, 320 ) -> "CmdContext[_RT]": 321 "Return new context with updated `output` settings." 322 new_options = self.options.set_stdout(output, append, close) 323 return CmdContext(new_options)
Return new context with updated output settings.
325 def stderr( 326 self, 327 error: Any, 328 *, 329 append: bool = False, 330 close: bool = False, 331 ) -> "CmdContext[_RT]": 332 "Return new context with updated `error` settings." 333 new_options = self.options.set_stderr(error, append, close) 334 return CmdContext(new_options)
Return new context with updated error settings.
336 def env(self, **kwds: Any) -> "CmdContext[_RT]": 337 """Return new context with augmented environment.""" 338 new_options = self.options.add_env(kwds) 339 return CmdContext(new_options)
Return new context with augmented environment.
341 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 342 self, 343 *, 344 path: Unset[Optional[str]] = _UNSET, 345 env: Unset[dict[str, Any]] = _UNSET, 346 inherit_env: Unset[bool] = _UNSET, 347 encoding: Unset[str] = _UNSET, 348 _return_result: Unset[bool] = _UNSET, 349 _catch_cancelled_error: Unset[bool] = _UNSET, 350 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 351 timeout: Unset[Optional[float]] = _UNSET, 352 cancel_timeout: Unset[float] = _UNSET, 353 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 354 alt_name: Unset[Optional[str]] = _UNSET, 355 pass_fds: Unset[Iterable[int]] = _UNSET, 356 pass_fds_close: Unset[bool] = _UNSET, 357 _writable: Unset[bool] = _UNSET, 358 _start_new_session: Unset[bool] = _UNSET, 359 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 360 pty: Unset[PtyAdapterOrBool] = _UNSET, 361 close_fds: Unset[bool] = _UNSET, 362 audit_callback: Unset[_AuditFnT] = _UNSET, 363 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 364 error_limit: Unset[Optional[int]] = _UNSET, 365 ) -> "CmdContext[_RT]": 366 """Return new context with custom options set. 367 368 See `Command.set` for option reference. 369 """ 370 kwargs = locals() 371 del kwargs["self"] 372 if not encoding: 373 raise TypeError("invalid encoding") 374 return CmdContext(self.options.set(kwargs))
Return new context with custom options set.
See Command.set for option reference.
380 @property 381 def result(self) -> "CmdContext[shellous.Result]": 382 "Set `_return_result` and `exit_codes`." 383 return cast( 384 CmdContext[shellous.Result], 385 self.set( 386 _return_result=True, 387 exit_codes=range(-255, 256), 388 ), 389 )
Set _return_result and exit_codes.
391 def find_command(self, name: str) -> Optional[Path]: 392 """Find the command with the given name and return its filesystem path. 393 394 Return None if the command name is not found in the search path. 395 396 Use the `path` variable specified by the context if set. Otherwise, the 397 default behavior is to use the `PATH` environment variable with a 398 fallback to the value of `os.defpath`. 399 """ 400 result = self.options.which(name) 401 if not result: 402 return None 403 return Path(result)
Find the command with the given name and return its filesystem path.
Return None if the command name is not found in the search path.
Use the path variable specified by the context if set. Otherwise, the
default behavior is to use the PATH environment variable with a
fallback to the value of os.defpath.
406@dataclass(frozen=True) 407class Command(Generic[_RT]): 408 """A Command instance is lightweight and immutable object that specifies the 409 arguments and options used to run a program. Commands do not do anything 410 until they are awaited. 411 412 Commands are always created by a CmdContext. 413 414 ``` 415 from shellous import sh 416 417 # Create a new command from the context. 418 echo = sh("echo", "hello, world") 419 420 # Run the command. 421 result = await echo 422 ``` 423 """ 424 425 args: "tuple[Union[str, bytes, os.PathLike[Any], Command[Any], shellous.Pipeline[Any]], ...]" 426 "Command arguments including the program name as first argument." 427 428 options: Options 429 "Command options." 430 431 def __post_init__(self) -> None: 432 "Validate the command." 433 if len(self.args) == 0: 434 raise ValueError("Command must include program name") 435 436 @property 437 def name(self) -> str: 438 """Returns the name of the program being run. 439 440 Names longer than 31 characters are truncated. If `alt_name` option 441 is set, return that instead. 442 """ 443 if self.options.alt_name: 444 return self.options.alt_name 445 name = str(self.args[0]) 446 if len(name) > 31: 447 return f"...{name[-31:]}" 448 return name 449 450 def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]": 451 "Pass `input` to command's standard input." 452 new_options = self.options.set_stdin(input_, close) 453 return Command(self.args, new_options) 454 455 def stdout( 456 self, 457 output: Any, 458 *, 459 append: bool = False, 460 close: bool = False, 461 ) -> "Command[_RT]": 462 "Redirect standard output to `output`." 463 new_options = self.options.set_stdout(output, append, close) 464 return Command(self.args, new_options) 465 466 def stderr( 467 self, 468 error: Any, 469 *, 470 append: bool = False, 471 close: bool = False, 472 ) -> "Command[_RT]": 473 "Redirect standard error to `error`." 474 new_options = self.options.set_stderr(error, append, close) 475 return Command(self.args, new_options) 476 477 def env(self, **kwds: Any) -> "Command[_RT]": 478 """Return new command with augmented environment. 479 480 The changes to the environment variables made by this method are 481 additive. For example, calling `cmd.env(A=1).env(B=2)` produces a 482 command with the environment set to `{"A": "1", "B": "2"}`. 483 484 To clear the environment, use the `cmd.set(env={})` method. 485 """ 486 new_options = self.options.add_env(kwds) 487 return Command(self.args, new_options) 488 489 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 490 self, 491 *, 492 path: Unset[Optional[str]] = _UNSET, 493 env: Unset[dict[str, Any]] = _UNSET, 494 inherit_env: Unset[bool] = _UNSET, 495 encoding: Unset[str] = _UNSET, 496 _return_result: Unset[bool] = _UNSET, 497 _catch_cancelled_error: Unset[bool] = _UNSET, 498 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 499 timeout: Unset[Optional[float]] = _UNSET, 500 cancel_timeout: Unset[float] = _UNSET, 501 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 502 alt_name: Unset[Optional[str]] = _UNSET, 503 pass_fds: Unset[Iterable[int]] = _UNSET, 504 pass_fds_close: Unset[bool] = _UNSET, 505 _writable: Unset[bool] = _UNSET, 506 _start_new_session: Unset[bool] = _UNSET, 507 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 508 pty: Unset[PtyAdapterOrBool] = _UNSET, 509 close_fds: Unset[bool] = _UNSET, 510 audit_callback: Unset[_AuditFnT] = _UNSET, 511 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 512 error_limit: Unset[Optional[int]] = _UNSET, 513 read_buffer_limit: Unset[Optional[int]] = _UNSET, 514 ) -> "Command[_RT]": 515 """Return new command with custom options set. 516 517 **path** (str | None) default=None<br> 518 Search path for locating command executable. By default, `path` is None 519 which causes shellous to rely on the `PATH` environment variable. 520 521 **env** (dict[str, str]) default={}<br> 522 Set the environment variables for the subprocess. If `inherit_env` is 523 True, the subprocess will also inherit the environment variables 524 specified by the parent process. 525 526 Using `set(env=...)` will replace all environment variables using the 527 dictionary argument. You can also use the `env(...)` method to modify 528 the existing environment incrementally. 529 530 **inherit_env** (bool) default=True<br> 531 Subprocess should inherit the parent process environment. If this is 532 False, the subprocess will only have environment variables specified 533 by `Command.env`. If `inherit_env` is True, the parent process 534 environment is augmented/overridden by any variables specified in 535 `Command.env`. 536 537 **encoding** (str) default="utf-8"<br> 538 String encoding to use for subprocess input/output. To specify `errors`, 539 append it after a space. For example, use "utf-8 replace" to specify 540 "utf-8" with errors "replace". 541 542 **_return_result** (bool) default=False<br> 543 When True, return a `Result` object instead of the standard output. 544 Private API -- use the `result` modifier instead. 545 546 **_catch_cancelled_error** (bool) default=False<br> 547 When True, raise a `ResultError` when the command is cancelled. 548 Private API -- used internally by PipeRunner. 549 550 **exit_codes** (set[int] | None) default=None<br> 551 Set of allowed exit codes that will not raise a `ResultError`. By default, 552 `exit_codes` is `None` which indicates that 0 is the only valid exit 553 status. Any other exit status will raise a `ResultError`. In addition to 554 sets of integers, you can use a `range` object, e.g. `range(256)` for 555 any positive exit status. 556 557 **timeout** (float | None) default=None<br> 558 Timeout in seconds to wait before we cancel the process. The timer 559 begins immediately after the process is launched. This differs from 560 using `asyncio.wait_for` which includes the process launch time also. 561 If timeout is None (the default), there is no timeout. 562 563 **cancel_timeout** (float) default=3.0 seconds<br> 564 Timeout in seconds to wait for a process to exit after sending it a 565 `cancel_signal`. If the process does not exit after waiting for 566 `cancel_timeout` seconds, we send a kill signal to the process. 567 568 **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br> 569 Signal sent to a process when it is cancelled. If `cancel_signal` is 570 None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on 571 Windows. 572 573 **alt_name** (str| None) default=None<br> 574 Alternative name of the command displayed in logs. Used to resolve 575 ambiguity when the actual command name is a scripting language. 576 577 **pass_fds** (Iterable[int]) default=()<br> 578 Specify open file descriptors to pass to the subprocess. 579 580 **pass_fds_close** (bool) default=False<br> 581 Close the file descriptors in `pass_fds` immediately in the current 582 process immediately after launching the subprocess. 583 584 **_writable** (bool) default=False<br> 585 Used to indicate process substitution is writing. 586 Private API -- use the `writable` modifier instead. 587 588 **_start_new_session** (bool) default=False<br> 589 Private API -- provided for testing purposes only. 590 591 **_preexec_fn** (Callable() | None) default=None<br> 592 Private API -- provided for testing purposes only. 593 594 **pty** (bool | Callable(int)) default=False<br> 595 If True, use a pseudo-terminal (pty) to control the child process. 596 If `pty` is set to a callable, the function must take one int argument 597 for the child side of the pty. The function is called to set the child 598 pty's termios settings before spawning the subprocess. 599 600 shellous provides three utility functions: `shellous.cooked`, 601 `shellous.raw` and `shellous.cbreak` that can be used as arguments to 602 the `pty` option. 603 604 **close_fds** (bool) default=True<br> 605 Close all unnecessary file descriptors in the child process. This 606 defaults to True to align with the default behavior of the subprocess 607 module. 608 609 **audit_callback** (Callable(phase, info) | None) default=None<br> 610 Specify a function to call as the command execution goes through its 611 lifecycle. `audit_callback` is a function called with two arguments, 612 *phase* and *info*. 613 614 *phase* can be one of three values: 615 616 "start": The process is about to start. 617 618 "stop": The process finally stopped. 619 620 "signal": The process is being sent a signal. 621 622 *info* is a dictionary providing more information for the callback. The 623 following keys are currently defined: 624 625 "runner" (Runner): Reference to the Runner object. 626 627 "failure" (str): When phase is "stop", optional string with the 628 name of the exception from launching the process. 629 630 "signal" (str): When phase is "signal", the signal name/number 631 sent to the process, e.g. "SIGTERM". 632 633 The primary use case for `audit_callback` is measuring how long each 634 command takes to run and exporting this information to a metrics 635 framework like Prometheus. 636 637 **coerce_arg** (Callable(arg) | None) default=None<br> 638 Specify a function to call on each command line argument. This function 639 can specify how to coerce unsupported argument types (e.g. dict) to 640 a sequence of strings. This function should return the original value 641 unchanged if there is no conversion needed. 642 643 **error_limit** (int | None) default=1024<br> 644 Specify the number of bytes to store when redirecting STDERR to BUFFER. 645 After reading up to `error_limit` bytes, shellous will continue to 646 read from stderr, but will not store any additional bytes. Setting 647 `error_limit` only affects the internal BUFFER; it has no effect when 648 using other redirection types. 649 650 **read_buffer_limit** (int | None) default=65536<br> 651 Specify the maximum number of bytes to read from a stdout/stderr stream 652 when looking for a separator. Increase this value if you expect the 653 subprocess to emit extremely long lines. If this value is too small, 654 you may get the error: "Separator is not found". 655 656 """ 657 kwargs = locals() 658 del kwargs["self"] 659 if not encoding: 660 raise TypeError("invalid encoding") 661 return Command(self.args, self.options.set(kwargs)) 662 663 def _replace_args(self, new_args: Sequence[Any]) -> "Command[_RT]": 664 """Return new command with arguments replaced by `new_args`. 665 666 Arguments are NOT type-checked by the context. Program name must be the 667 exact same object. 668 """ 669 assert new_args 670 assert new_args[0] is self.args[0] 671 return Command(tuple(new_args), self.options) 672 673 def coro( 674 self, 675 *, 676 _run_future: Optional[asyncio.Future[Runner]] = None, 677 ) -> Coroutine[Any, Any, _RT]: 678 "Return coroutine object to run awaitable." 679 return cast( 680 Coroutine[Any, Any, _RT], 681 Runner.run_command(self, _run_future=_run_future), 682 ) 683 684 @contextlib.asynccontextmanager 685 async def prompt( 686 self, 687 prompt: Union[str, list[str], re.Pattern[str], None] = None, 688 *, 689 timeout: Optional[float] = None, 690 normalize_newlines: bool = False, 691 ) -> AsyncGenerator[Prompt, None]: 692 """Run command using the send/expect API. 693 694 This method should be called using `async with`. It returns a `Prompt` 695 object with send() and expect() methods. 696 697 You can optionally set a default `prompt`. This is used by `expect()` 698 when you don't provide another value. 699 700 Use the `timeout` parameter to set the default timeout for operations. 701 702 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 703 This conversion is done before matching with `expect()`. This option 704 does not affect strings sent with `send()`. 705 """ 706 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 707 708 cli = None 709 try: 710 async with Runner(cmd) as run: 711 cli = Prompt( 712 run, 713 default_prompt=prompt, 714 default_timeout=timeout, 715 normalize_newlines=normalize_newlines, 716 ) 717 yield cli 718 cli.close() 719 finally: 720 if cli is not None: 721 cli._finish_() # pyright: ignore[reportPrivateUsage] 722 723 def __await__(self) -> "Generator[Any, None, _RT]": 724 "Run process and return the standard output." 725 return self.coro().__await__() 726 727 async def __aenter__(self) -> Runner: 728 "Enter the async context manager." 729 return await context_aenter(self, Runner(self)) 730 731 async def __aexit__( 732 self, 733 exc_type: Optional[type[BaseException]], 734 exc_value: Optional[BaseException], 735 exc_tb: Optional[TracebackType], 736 ) -> Optional[bool]: 737 "Exit the async context manager." 738 return await context_aexit(self, exc_type, exc_value, exc_tb) 739 740 def __aiter__(self) -> AsyncIterator[str]: 741 "Return async iterator to iterate over output lines." 742 return aiter_preflight(self)._readlines() 743 744 async def _readlines(self) -> AsyncIterator[str]: 745 "Async generator to iterate over lines." 746 async with Runner(self) as run: 747 async for line in run: 748 yield line 749 750 def __call__(self, *args: Any) -> "Command[_RT]": 751 "Apply more arguments to the end of the command." 752 if not args: 753 return self 754 new_args = self.args + coerce(args, self.options.coerce_arg) 755 return Command(new_args, self.options) 756 757 def __str__(self) -> str: 758 """Return string representation for command. 759 760 Display the full name of the command only. Don't include arguments or 761 environment variables. 762 """ 763 return str(self.args[0]) 764 765 @overload 766 def __or__(self, rhs: StdoutType) -> "Command[_RT]": ... # pragma: no cover 767 768 @overload 769 def __or__( 770 self, rhs: "Command[str]" 771 ) -> "shellous.Pipeline[str]": ... # pragma: no cover 772 773 @overload 774 def __or__( 775 self, 776 rhs: "Command[shellous.Result]", 777 ) -> "shellous.Pipeline[shellous.Result]": ... # pragma: no cover 778 779 def __or__(self, rhs: Any) -> Any: 780 "Bitwise or operator is used to build pipelines." 781 if isinstance(rhs, STDOUT_TYPES): 782 return self.stdout(rhs) 783 return shellous.Pipeline.create(self) | rhs 784 785 def __ror__(self, lhs: StdinType) -> "Command[_RT]": 786 "Bitwise or operator is used to build pipelines." 787 if isinstance(lhs, STDIN_TYPES): # pyright: ignore[reportUnnecessaryIsInstance] 788 return self.stdin(lhs) 789 return NotImplemented 790 791 def __rshift__(self, rhs: StdoutType) -> "Command[_RT]": 792 "Right shift operator is used to build pipelines." 793 if isinstance( 794 rhs, STDOUT_TYPES 795 ): # pyright: ignore[reportUnnecessaryIsInstance] 796 return self.stdout(rhs, append=True) 797 return NotImplemented 798 799 @property 800 def writable(self) -> "Command[_RT]": 801 "Set `writable` to True." 802 return self.set(_writable=True) 803 804 @property 805 def result(self) -> "Command[shellous.Result]": 806 "Set `_return_result` and `exit_codes`." 807 return cast( 808 Command[shellous.Result], 809 self.set(_return_result=True, exit_codes=range(-255, 256)), 810 )
A Command instance is lightweight and immutable object that specifies the arguments and options used to run a program. Commands do not do anything until they are awaited.
Commands are always created by a CmdContext.
from shellous import sh
# Create a new command from the context.
echo = sh("echo", "hello, world")
# Run the command.
result = await echo
Command arguments including the program name as first argument.
436 @property 437 def name(self) -> str: 438 """Returns the name of the program being run. 439 440 Names longer than 31 characters are truncated. If `alt_name` option 441 is set, return that instead. 442 """ 443 if self.options.alt_name: 444 return self.options.alt_name 445 name = str(self.args[0]) 446 if len(name) > 31: 447 return f"...{name[-31:]}" 448 return name
Returns the name of the program being run.
Names longer than 31 characters are truncated. If alt_name option
is set, return that instead.
450 def stdin(self, input_: Any, *, close: bool = False) -> "Command[_RT]": 451 "Pass `input` to command's standard input." 452 new_options = self.options.set_stdin(input_, close) 453 return Command(self.args, new_options)
Pass input to command's standard input.
455 def stdout( 456 self, 457 output: Any, 458 *, 459 append: bool = False, 460 close: bool = False, 461 ) -> "Command[_RT]": 462 "Redirect standard output to `output`." 463 new_options = self.options.set_stdout(output, append, close) 464 return Command(self.args, new_options)
Redirect standard output to output.
466 def stderr( 467 self, 468 error: Any, 469 *, 470 append: bool = False, 471 close: bool = False, 472 ) -> "Command[_RT]": 473 "Redirect standard error to `error`." 474 new_options = self.options.set_stderr(error, append, close) 475 return Command(self.args, new_options)
Redirect standard error to error.
477 def env(self, **kwds: Any) -> "Command[_RT]": 478 """Return new command with augmented environment. 479 480 The changes to the environment variables made by this method are 481 additive. For example, calling `cmd.env(A=1).env(B=2)` produces a 482 command with the environment set to `{"A": "1", "B": "2"}`. 483 484 To clear the environment, use the `cmd.set(env={})` method. 485 """ 486 new_options = self.options.add_env(kwds) 487 return Command(self.args, new_options)
Return new command with augmented environment.
The changes to the environment variables made by this method are
additive. For example, calling cmd.env(A=1).env(B=2) produces a
command with the environment set to {"A": "1", "B": "2"}.
To clear the environment, use the cmd.set(env={}) method.
489 def set( # pylint: disable=unused-argument, too-many-locals, too-many-arguments 490 self, 491 *, 492 path: Unset[Optional[str]] = _UNSET, 493 env: Unset[dict[str, Any]] = _UNSET, 494 inherit_env: Unset[bool] = _UNSET, 495 encoding: Unset[str] = _UNSET, 496 _return_result: Unset[bool] = _UNSET, 497 _catch_cancelled_error: Unset[bool] = _UNSET, 498 exit_codes: Unset[Optional[Container[int]]] = _UNSET, 499 timeout: Unset[Optional[float]] = _UNSET, 500 cancel_timeout: Unset[float] = _UNSET, 501 cancel_signal: Unset[Optional[signal.Signals]] = _UNSET, 502 alt_name: Unset[Optional[str]] = _UNSET, 503 pass_fds: Unset[Iterable[int]] = _UNSET, 504 pass_fds_close: Unset[bool] = _UNSET, 505 _writable: Unset[bool] = _UNSET, 506 _start_new_session: Unset[bool] = _UNSET, 507 _preexec_fn: Unset[_PreexecFnT] = _UNSET, 508 pty: Unset[PtyAdapterOrBool] = _UNSET, 509 close_fds: Unset[bool] = _UNSET, 510 audit_callback: Unset[_AuditFnT] = _UNSET, 511 coerce_arg: Unset[_CoerceArgFnT] = _UNSET, 512 error_limit: Unset[Optional[int]] = _UNSET, 513 read_buffer_limit: Unset[Optional[int]] = _UNSET, 514 ) -> "Command[_RT]": 515 """Return new command with custom options set. 516 517 **path** (str | None) default=None<br> 518 Search path for locating command executable. By default, `path` is None 519 which causes shellous to rely on the `PATH` environment variable. 520 521 **env** (dict[str, str]) default={}<br> 522 Set the environment variables for the subprocess. If `inherit_env` is 523 True, the subprocess will also inherit the environment variables 524 specified by the parent process. 525 526 Using `set(env=...)` will replace all environment variables using the 527 dictionary argument. You can also use the `env(...)` method to modify 528 the existing environment incrementally. 529 530 **inherit_env** (bool) default=True<br> 531 Subprocess should inherit the parent process environment. If this is 532 False, the subprocess will only have environment variables specified 533 by `Command.env`. If `inherit_env` is True, the parent process 534 environment is augmented/overridden by any variables specified in 535 `Command.env`. 536 537 **encoding** (str) default="utf-8"<br> 538 String encoding to use for subprocess input/output. To specify `errors`, 539 append it after a space. For example, use "utf-8 replace" to specify 540 "utf-8" with errors "replace". 541 542 **_return_result** (bool) default=False<br> 543 When True, return a `Result` object instead of the standard output. 544 Private API -- use the `result` modifier instead. 545 546 **_catch_cancelled_error** (bool) default=False<br> 547 When True, raise a `ResultError` when the command is cancelled. 548 Private API -- used internally by PipeRunner. 549 550 **exit_codes** (set[int] | None) default=None<br> 551 Set of allowed exit codes that will not raise a `ResultError`. By default, 552 `exit_codes` is `None` which indicates that 0 is the only valid exit 553 status. Any other exit status will raise a `ResultError`. In addition to 554 sets of integers, you can use a `range` object, e.g. `range(256)` for 555 any positive exit status. 556 557 **timeout** (float | None) default=None<br> 558 Timeout in seconds to wait before we cancel the process. The timer 559 begins immediately after the process is launched. This differs from 560 using `asyncio.wait_for` which includes the process launch time also. 561 If timeout is None (the default), there is no timeout. 562 563 **cancel_timeout** (float) default=3.0 seconds<br> 564 Timeout in seconds to wait for a process to exit after sending it a 565 `cancel_signal`. If the process does not exit after waiting for 566 `cancel_timeout` seconds, we send a kill signal to the process. 567 568 **cancel_signal** (signals.Signal | None) default=signal.SIGTERM<br> 569 Signal sent to a process when it is cancelled. If `cancel_signal` is 570 None, send a `SIGKILL` on Unix and `SIGTERM` (TerminateProcess) on 571 Windows. 572 573 **alt_name** (str| None) default=None<br> 574 Alternative name of the command displayed in logs. Used to resolve 575 ambiguity when the actual command name is a scripting language. 576 577 **pass_fds** (Iterable[int]) default=()<br> 578 Specify open file descriptors to pass to the subprocess. 579 580 **pass_fds_close** (bool) default=False<br> 581 Close the file descriptors in `pass_fds` immediately in the current 582 process immediately after launching the subprocess. 583 584 **_writable** (bool) default=False<br> 585 Used to indicate process substitution is writing. 586 Private API -- use the `writable` modifier instead. 587 588 **_start_new_session** (bool) default=False<br> 589 Private API -- provided for testing purposes only. 590 591 **_preexec_fn** (Callable() | None) default=None<br> 592 Private API -- provided for testing purposes only. 593 594 **pty** (bool | Callable(int)) default=False<br> 595 If True, use a pseudo-terminal (pty) to control the child process. 596 If `pty` is set to a callable, the function must take one int argument 597 for the child side of the pty. The function is called to set the child 598 pty's termios settings before spawning the subprocess. 599 600 shellous provides three utility functions: `shellous.cooked`, 601 `shellous.raw` and `shellous.cbreak` that can be used as arguments to 602 the `pty` option. 603 604 **close_fds** (bool) default=True<br> 605 Close all unnecessary file descriptors in the child process. This 606 defaults to True to align with the default behavior of the subprocess 607 module. 608 609 **audit_callback** (Callable(phase, info) | None) default=None<br> 610 Specify a function to call as the command execution goes through its 611 lifecycle. `audit_callback` is a function called with two arguments, 612 *phase* and *info*. 613 614 *phase* can be one of three values: 615 616 "start": The process is about to start. 617 618 "stop": The process finally stopped. 619 620 "signal": The process is being sent a signal. 621 622 *info* is a dictionary providing more information for the callback. The 623 following keys are currently defined: 624 625 "runner" (Runner): Reference to the Runner object. 626 627 "failure" (str): When phase is "stop", optional string with the 628 name of the exception from launching the process. 629 630 "signal" (str): When phase is "signal", the signal name/number 631 sent to the process, e.g. "SIGTERM". 632 633 The primary use case for `audit_callback` is measuring how long each 634 command takes to run and exporting this information to a metrics 635 framework like Prometheus. 636 637 **coerce_arg** (Callable(arg) | None) default=None<br> 638 Specify a function to call on each command line argument. This function 639 can specify how to coerce unsupported argument types (e.g. dict) to 640 a sequence of strings. This function should return the original value 641 unchanged if there is no conversion needed. 642 643 **error_limit** (int | None) default=1024<br> 644 Specify the number of bytes to store when redirecting STDERR to BUFFER. 645 After reading up to `error_limit` bytes, shellous will continue to 646 read from stderr, but will not store any additional bytes. Setting 647 `error_limit` only affects the internal BUFFER; it has no effect when 648 using other redirection types. 649 650 **read_buffer_limit** (int | None) default=65536<br> 651 Specify the maximum number of bytes to read from a stdout/stderr stream 652 when looking for a separator. Increase this value if you expect the 653 subprocess to emit extremely long lines. If this value is too small, 654 you may get the error: "Separator is not found". 655 656 """ 657 kwargs = locals() 658 del kwargs["self"] 659 if not encoding: 660 raise TypeError("invalid encoding") 661 return Command(self.args, self.options.set(kwargs))
Return new command with custom options set.
path (str | None) default=None
Search path for locating command executable. By default, path is None
which causes shellous to rely on the PATH environment variable.
env (dict[str, str]) default={}
Set the environment variables for the subprocess. If inherit_env is
True, the subprocess will also inherit the environment variables
specified by the parent process.
Using set(env=...) will replace all environment variables using the
dictionary argument. You can also use the env(...) method to modify
the existing environment incrementally.
inherit_env (bool) default=True
Subprocess should inherit the parent process environment. If this is
False, the subprocess will only have environment variables specified
by Command.env. If inherit_env is True, the parent process
environment is augmented/overridden by any variables specified in
Command.env.
encoding (str) default="utf-8"
String encoding to use for subprocess input/output. To specify errors,
append it after a space. For example, use "utf-8 replace" to specify
"utf-8" with errors "replace".
_return_result (bool) default=False
When True, return a Result object instead of the standard output.
Private API -- use the result modifier instead.
_catch_cancelled_error (bool) default=False
When True, raise a ResultError when the command is cancelled.
Private API -- used internally by PipeRunner.
exit_codes (set[int] | None) default=None
Set of allowed exit codes that will not raise a ResultError. By default,
exit_codes is None which indicates that 0 is the only valid exit
status. Any other exit status will raise a ResultError. In addition to
sets of integers, you can use a range object, e.g. range(256) for
any positive exit status.
timeout (float | None) default=None
Timeout in seconds to wait before we cancel the process. The timer
begins immediately after the process is launched. This differs from
using asyncio.wait_for which includes the process launch time also.
If timeout is None (the default), there is no timeout.
cancel_timeout (float) default=3.0 seconds
Timeout in seconds to wait for a process to exit after sending it a
cancel_signal. If the process does not exit after waiting for
cancel_timeout seconds, we send a kill signal to the process.
cancel_signal (signals.Signal | None) default=signal.SIGTERM
Signal sent to a process when it is cancelled. If cancel_signal is
None, send a SIGKILL on Unix and SIGTERM (TerminateProcess) on
Windows.
alt_name (str| None) default=None
Alternative name of the command displayed in logs. Used to resolve
ambiguity when the actual command name is a scripting language.
pass_fds (Iterable[int]) default=()
Specify open file descriptors to pass to the subprocess.
pass_fds_close (bool) default=False
Close the file descriptors in pass_fds immediately in the current
process immediately after launching the subprocess.
_writable (bool) default=False
Used to indicate process substitution is writing.
Private API -- use the writable modifier instead.
_start_new_session (bool) default=False
Private API -- provided for testing purposes only.
_preexec_fn (Callable() | None) default=None
Private API -- provided for testing purposes only.
pty (bool | Callable(int)) default=False
If True, use a pseudo-terminal (pty) to control the child process.
If pty is set to a callable, the function must take one int argument
for the child side of the pty. The function is called to set the child
pty's termios settings before spawning the subprocess.
shellous provides three utility functions: shellous.cooked,
shellous.raw and shellous.cbreak that can be used as arguments to
the pty option.
close_fds (bool) default=True
Close all unnecessary file descriptors in the child process. This
defaults to True to align with the default behavior of the subprocess
module.
audit_callback (Callable(phase, info) | None) default=None
Specify a function to call as the command execution goes through its
lifecycle. audit_callback is a function called with two arguments,
phase and info.
phase can be one of three values:
"start": The process is about to start.
"stop": The process finally stopped.
"signal": The process is being sent a signal.
info is a dictionary providing more information for the callback. The following keys are currently defined:
"runner" (Runner): Reference to the Runner object.
"failure" (str): When phase is "stop", optional string with the
name of the exception from launching the process.
"signal" (str): When phase is "signal", the signal name/number
sent to the process, e.g. "SIGTERM".
The primary use case for audit_callback is measuring how long each
command takes to run and exporting this information to a metrics
framework like Prometheus.
coerce_arg (Callable(arg) | None) default=None
Specify a function to call on each command line argument. This function
can specify how to coerce unsupported argument types (e.g. dict) to
a sequence of strings. This function should return the original value
unchanged if there is no conversion needed.
error_limit (int | None) default=1024
Specify the number of bytes to store when redirecting STDERR to BUFFER.
After reading up to error_limit bytes, shellous will continue to
read from stderr, but will not store any additional bytes. Setting
error_limit only affects the internal BUFFER; it has no effect when
using other redirection types.
read_buffer_limit (int | None) default=65536
Specify the maximum number of bytes to read from a stdout/stderr stream
when looking for a separator. Increase this value if you expect the
subprocess to emit extremely long lines. If this value is too small,
you may get the error: "Separator is not found".
673 def coro( 674 self, 675 *, 676 _run_future: Optional[asyncio.Future[Runner]] = None, 677 ) -> Coroutine[Any, Any, _RT]: 678 "Return coroutine object to run awaitable." 679 return cast( 680 Coroutine[Any, Any, _RT], 681 Runner.run_command(self, _run_future=_run_future), 682 )
Return coroutine object to run awaitable.
684 @contextlib.asynccontextmanager 685 async def prompt( 686 self, 687 prompt: Union[str, list[str], re.Pattern[str], None] = None, 688 *, 689 timeout: Optional[float] = None, 690 normalize_newlines: bool = False, 691 ) -> AsyncGenerator[Prompt, None]: 692 """Run command using the send/expect API. 693 694 This method should be called using `async with`. It returns a `Prompt` 695 object with send() and expect() methods. 696 697 You can optionally set a default `prompt`. This is used by `expect()` 698 when you don't provide another value. 699 700 Use the `timeout` parameter to set the default timeout for operations. 701 702 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 703 This conversion is done before matching with `expect()`. This option 704 does not affect strings sent with `send()`. 705 """ 706 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 707 708 cli = None 709 try: 710 async with Runner(cmd) as run: 711 cli = Prompt( 712 run, 713 default_prompt=prompt, 714 default_timeout=timeout, 715 normalize_newlines=normalize_newlines, 716 ) 717 yield cli 718 cli.close() 719 finally: 720 if cli is not None: 721 cli._finish_() # pyright: ignore[reportPrivateUsage]
Run command using the send/expect API.
This method should be called using async with. It returns a Prompt
object with send() and expect() methods.
You can optionally set a default prompt. This is used by expect()
when you don't provide another value.
Use the timeout parameter to set the default timeout for operations.
Set normalize_newlines to True to convert incoming CR and CR-LF to LF.
This conversion is done before matching with expect(). This option
does not affect strings sent with send().
723 def __await__(self) -> "Generator[Any, None, _RT]": 724 "Run process and return the standard output." 725 return self.coro().__await__()
Run process and return the standard output.
727 async def __aenter__(self) -> Runner: 728 "Enter the async context manager." 729 return await context_aenter(self, Runner(self))
Enter the async context manager.
740 def __aiter__(self) -> AsyncIterator[str]: 741 "Return async iterator to iterate over output lines." 742 return aiter_preflight(self)._readlines()
Return async iterator to iterate over output lines.
95@dataclass(frozen=True) 96class Options: # pylint: disable=too-many-instance-attributes 97 "Concrete class for per-command options." 98 99 path: Optional[str] = None 100 "Optional search path to use instead of PATH environment variable." 101 102 env: Optional[EnvironmentDict] = field(default=None, repr=False) 103 "Additional environment variables for command." 104 105 inherit_env: bool = True 106 "True if subprocess should inherit the current environment variables." 107 108 input: _RedirectT = Redirect.DEFAULT 109 "Input object to bind to stdin." 110 111 input_close: bool = False 112 "True if input object should be closed after subprocess launch." 113 114 output: _RedirectT = Redirect.DEFAULT 115 "Output object to bind to stdout." 116 117 output_append: bool = False 118 "True if output object should be opened in append mode." 119 120 output_close: bool = False 121 "True if output object should be closed after subprocess launch." 122 123 error: _RedirectT = Redirect.DEFAULT 124 "Error object to bind to stderr." 125 126 error_append: bool = False 127 "True if error object should be opened in append mode." 128 129 error_close: bool = False 130 "True if error object should be closed after subprocess launch." 131 132 error_limit: Optional[int] = DEFAULT_ERROR_LIMIT 133 "Bytes of stderr to buffer in memory (`sh.BUFFER`). None means unlimited." 134 135 encoding: str = "utf-8" 136 "Specifies encoding of input/output." 137 138 _return_result: bool = False 139 "True if we should return `Result` object instead of the output text/bytes." 140 141 _catch_cancelled_error: bool = False 142 "True if we should raise `ResultError` after clean up from cancelled task." 143 144 exit_codes: Optional[Container[int]] = None 145 "Set of exit codes that do not raise a `ResultError`. None means {0}." 146 147 timeout: Optional[float] = None 148 "Timeout in seconds that we wait before cancelling the process." 149 150 cancel_timeout: float = 3.0 151 "Timeout in seconds that we wait for a cancelled process to terminate." 152 153 cancel_signal: Optional[signal.Signals] = signal.SIGTERM 154 "The signal sent to terminate a cancelled process." 155 156 alt_name: Optional[str] = None 157 "Alternate name for the command to use when logging." 158 159 pass_fds: Iterable[int] = () 160 "File descriptors to pass to the command." 161 162 pass_fds_close: bool = False 163 "True if pass_fds should be closed after subprocess launch." 164 165 _writable: bool = False 166 "True if using process substitution in write mode." 167 168 _start_new_session: bool = False 169 "True if child process should start a new session with `setsid` call." 170 171 _preexec_fn: _PreexecFnT = None 172 "Function to call in child process after fork from parent." 173 174 pty: PtyAdapterOrBool = False 175 "True if child process should be controlled using a pseudo-terminal (pty)." 176 177 close_fds: bool = True 178 "True if child process should close all file descriptors." 179 180 audit_callback: _AuditFnT = None 181 "Function called to audit stages of process execution." 182 183 coerce_arg: _CoerceArgFnT = None 184 "Function called to coerce top level arguments." 185 186 read_buffer_limit: Optional[int] = None 187 "Maximum number of bytes to read when looking for a separator." 188 189 def runtime_env(self) -> Optional[dict[str, str]]: 190 "@private Return our `env` merged with the global environment." 191 if self.inherit_env: 192 if not self.env: 193 return None 194 return os.environ | self.env 195 196 if self.env: 197 return dict(self.env) # make copy of dict 198 return {} 199 200 def set_stdin(self, input_: Any, close: bool) -> "Options": 201 "@private Return new options with `input` configured." 202 if input_ is None: 203 raise TypeError("invalid stdin") 204 205 if input_ == Redirect.STDOUT: 206 raise ValueError("STDOUT is only supported by stderr") 207 208 return dataclasses.replace( 209 self, 210 input=input_, 211 input_close=close, 212 ) 213 214 def set_stdout(self, output: Any, append: bool, close: bool) -> "Options": 215 "@private Return new options with `output` configured." 216 if output is None: 217 raise TypeError("invalid stdout") 218 219 if output == Redirect.STDOUT: 220 raise ValueError("STDOUT is only supported by stderr") 221 222 return dataclasses.replace( 223 self, 224 output=output, 225 output_append=append, 226 output_close=close, 227 ) 228 229 def set_stderr(self, error: Any, append: bool, close: bool) -> "Options": 230 "@private Return new options with `error` configured." 231 if error is None: 232 raise TypeError("invalid stderr") 233 234 return dataclasses.replace( 235 self, 236 error=error, 237 error_append=append, 238 error_close=close, 239 ) 240 241 def add_env(self, updates: dict[str, Any]) -> "Options": 242 "@private Return new options with augmented environment." 243 new_env = EnvironmentDict(self.env, updates) 244 return dataclasses.replace(self, env=new_env) 245 246 def set(self, kwds: dict[str, Any]) -> "Options": 247 """@private Return new options with given properties updated. 248 249 See `Command.set` for option reference. 250 """ 251 kwds = {key: value for key, value in kwds.items() if value is not _UNSET} 252 if "env" in kwds: 253 # The "env" property is stored as an `EnvironmentDict`. 254 new_env = kwds["env"] 255 if new_env: 256 kwds["env"] = EnvironmentDict(None, new_env) 257 else: 258 kwds["env"] = None 259 return dataclasses.replace(self, **kwds) 260 261 @overload 262 def which(self, name: bytes) -> Optional[bytes]: 263 "@private Find the command with the given name and return its path." 264 265 @overload 266 def which( 267 self, name: Union[str, os.PathLike[Any]] 268 ) -> Optional[Union[str, os.PathLike[Any]]]: 269 "@private Find the command with the given name and return its path." 270 271 def which( 272 self, name: Union[str, bytes, os.PathLike[Any]] 273 ) -> Optional[Union[str, bytes, os.PathLike[Any]]]: 274 "@private Find the command with the given name and return its path." 275 return shutil.which(name, path=self.path)
Concrete class for per-command options.
Bytes of stderr to buffer in memory (sh.BUFFER). None means unlimited.
Set of exit codes that do not raise a ResultError. None means {0}.
The signal sent to terminate a cancelled process.
True if child process should be controlled using a pseudo-terminal (pty).
41@dataclass(frozen=True) 42class Pipeline(Generic[_RT]): 43 "A Pipeline is a sequence of commands." 44 45 commands: tuple[shellous.Command[_RT], ...] = () 46 47 @staticmethod 48 def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]": 49 "Create a new Pipeline." 50 return Pipeline(commands) 51 52 def __post_init__(self) -> None: 53 "Validate the pipeline." 54 if len(self.commands) == 0: 55 raise ValueError("Pipeline must include at least one command") 56 57 @property 58 def name(self) -> str: 59 "Return the name of the pipeline." 60 return "|".join(cmd.name for cmd in self.commands) 61 62 @property 63 def options(self) -> shellous.Options: 64 "Return the last command's options." 65 return self.commands[-1].options 66 67 def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]": 68 "Set stdin on the first command of the pipeline." 69 new_first = self.commands[0].stdin(input_, close=close) 70 new_commands = (new_first, *self.commands[1:]) 71 return dataclasses.replace(self, commands=new_commands) 72 73 def stdout( 74 self, 75 output: Any, 76 *, 77 append: bool = False, 78 close: bool = False, 79 ) -> "Pipeline[_RT]": 80 "Set stdout on the last command of the pipeline." 81 new_last = self.commands[-1].stdout(output, append=append, close=close) 82 new_commands = (*self.commands[0:-1], new_last) 83 return dataclasses.replace(self, commands=new_commands) 84 85 def stderr( 86 self, 87 error: Any, 88 *, 89 append: bool = False, 90 close: bool = False, 91 ) -> "Pipeline[_RT]": 92 "Set stderr on the last command of the pipeline." 93 new_last = self.commands[-1].stderr(error, append=append, close=close) 94 new_commands = (*self.commands[0:-1], new_last) 95 return dataclasses.replace(self, commands=new_commands) 96 97 def _set(self, **kwds: Any): 98 "Set options on last command of the pipeline." 99 new_last = self.commands[-1].set(**kwds) 100 new_commands = (*self.commands[0:-1], new_last) 101 return dataclasses.replace(self, commands=new_commands) 102 103 def coro(self) -> Coroutine[Any, Any, _RT]: 104 "Return coroutine object for pipeline." 105 return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self)) 106 107 @contextlib.asynccontextmanager 108 async def prompt( 109 self, 110 prompt: Union[str, list[str], re.Pattern[str], None] = None, 111 *, 112 timeout: Optional[float] = None, 113 normalize_newlines: bool = False, 114 ) -> AsyncGenerator[Prompt, None]: 115 """Run pipeline using the send/expect API. 116 117 This method should be called using `async with`. It returns a `Prompt` 118 object with send() and expect() methods. 119 120 You can optionally set a default `prompt`. This is used by `expect()` 121 when you don't provide another value. 122 123 Use the `timeout` parameter to set the default timeout for operations. 124 125 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 126 This conversion is done before matching with `expect()`. This option 127 does not affect strings sent with `send()`. 128 """ 129 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 130 131 cli = None 132 try: 133 async with PipeRunner(cmd, capturing=True) as run: 134 cli = Prompt( 135 run, 136 default_prompt=prompt, 137 default_timeout=timeout, 138 normalize_newlines=normalize_newlines, 139 ) 140 yield cli 141 cli.close() 142 finally: 143 if cli is not None: 144 cli._finish_() # pyright: ignore[reportPrivateUsage] 145 146 def _add(self, item: Union["shellous.Command[Any]", "Pipeline[Any]"]): 147 if isinstance(item, shellous.Command): 148 return dataclasses.replace(self, commands=(*self.commands, item)) 149 return dataclasses.replace( 150 self, 151 commands=self.commands + item.commands, 152 ) 153 154 def __len__(self) -> int: 155 "Return number of commands in pipe." 156 return len(self.commands) 157 158 def __getitem__(self, key: int) -> shellous.Command[Any]: 159 "Return specified command by index." 160 return self.commands[key] 161 162 def __call__(self, *args: Any) -> "Pipeline[_RT]": 163 if args: 164 raise TypeError("Calling pipeline with 1 or more arguments.") 165 return self 166 167 @overload 168 def __or__( 169 self, rhs: "Union[shellous.Command[shellous.Result], Pipeline[shellous.Result]]" 170 ) -> "Pipeline[shellous.Result]": ... # pragma: no cover 171 172 @overload 173 def __or__( 174 self, rhs: "Union[shellous.Command[str], Pipeline[str]]" 175 ) -> "Pipeline[str]": ... # pragma: no cover 176 177 @overload 178 def __or__(self, rhs: StdoutType) -> "Pipeline[_RT]": ... # pragma: no cover 179 180 def __or__(self, rhs: Any) -> "Pipeline[Any]": 181 if isinstance(rhs, (shellous.Command, Pipeline)): 182 return self._add(rhs) # pyright: ignore[reportUnknownArgumentType] 183 if isinstance(rhs, STDOUT_TYPES): 184 return self.stdout(rhs) 185 if isinstance(rhs, (str, bytes)): 186 raise TypeError( 187 f"{type(rhs)!r} unsupported for | output (Use 'pathlib.Path')" 188 ) 189 return NotImplemented 190 191 def __ror__(self, lhs: StdinType) -> "Pipeline[_RT]": 192 if isinstance(lhs, STDIN_TYPES): # pyright: ignore[reportUnnecessaryIsInstance] 193 return self.stdin(lhs) 194 return NotImplemented 195 196 def __rshift__(self, rhs: StdoutType) -> "Pipeline[_RT]": 197 if isinstance( 198 rhs, STDOUT_TYPES 199 ): # pyright: ignore[reportUnnecessaryIsInstance] 200 return self.stdout(rhs, append=True) 201 if isinstance(rhs, (str, bytes)): 202 raise TypeError( 203 f"{type(rhs)!r} unsupported for >> output (Use 'pathlib.Path')" 204 ) 205 return NotImplemented 206 207 @property 208 def writable(self) -> "Pipeline[_RT]": 209 "Set writable=True option on last command of pipeline." 210 return self._set(_writable=True) 211 212 @property 213 def result(self) -> "Pipeline[shellous.Result]": 214 "Set `_return_result` and `exit_codes`." 215 return cast( 216 Pipeline[shellous.Result], 217 self._set(_return_result=True, exit_codes=range(-255, 256)), 218 ) 219 220 def __await__(self) -> "Generator[Any, None, _RT]": 221 return self.coro().__await__() # FP pylint: disable=no-member 222 223 async def __aenter__(self) -> PipeRunner: 224 "Enter the async context manager." 225 return await context_aenter(self, PipeRunner(self, capturing=True)) 226 227 async def __aexit__( 228 self, 229 exc_type: Optional[type[BaseException]], 230 exc_value: Optional[BaseException], 231 exc_tb: Optional[TracebackType], 232 ) -> Optional[bool]: 233 "Exit the async context manager." 234 return await context_aexit(self, exc_type, exc_value, exc_tb) 235 236 def __aiter__(self) -> AsyncIterator[str]: 237 "Return async iterator to iterate over output lines." 238 return aiter_preflight(self)._readlines() 239 240 async def _readlines(self): 241 "Async generator to iterate over lines." 242 async with PipeRunner(self, capturing=True) as run: 243 async for line in run: 244 yield line
A Pipeline is a sequence of commands.
47 @staticmethod 48 def create(*commands: shellous.Command[_T]) -> "Pipeline[_T]": 49 "Create a new Pipeline." 50 return Pipeline(commands)
Create a new Pipeline.
57 @property 58 def name(self) -> str: 59 "Return the name of the pipeline." 60 return "|".join(cmd.name for cmd in self.commands)
Return the name of the pipeline.
62 @property 63 def options(self) -> shellous.Options: 64 "Return the last command's options." 65 return self.commands[-1].options
Return the last command's options.
67 def stdin(self, input_: Any, *, close: bool = False) -> "Pipeline[_RT]": 68 "Set stdin on the first command of the pipeline." 69 new_first = self.commands[0].stdin(input_, close=close) 70 new_commands = (new_first, *self.commands[1:]) 71 return dataclasses.replace(self, commands=new_commands)
Set stdin on the first command of the pipeline.
73 def stdout( 74 self, 75 output: Any, 76 *, 77 append: bool = False, 78 close: bool = False, 79 ) -> "Pipeline[_RT]": 80 "Set stdout on the last command of the pipeline." 81 new_last = self.commands[-1].stdout(output, append=append, close=close) 82 new_commands = (*self.commands[0:-1], new_last) 83 return dataclasses.replace(self, commands=new_commands)
Set stdout on the last command of the pipeline.
85 def stderr( 86 self, 87 error: Any, 88 *, 89 append: bool = False, 90 close: bool = False, 91 ) -> "Pipeline[_RT]": 92 "Set stderr on the last command of the pipeline." 93 new_last = self.commands[-1].stderr(error, append=append, close=close) 94 new_commands = (*self.commands[0:-1], new_last) 95 return dataclasses.replace(self, commands=new_commands)
Set stderr on the last command of the pipeline.
103 def coro(self) -> Coroutine[Any, Any, _RT]: 104 "Return coroutine object for pipeline." 105 return cast(Coroutine[Any, Any, _RT], PipeRunner.run_pipeline(self))
Return coroutine object for pipeline.
107 @contextlib.asynccontextmanager 108 async def prompt( 109 self, 110 prompt: Union[str, list[str], re.Pattern[str], None] = None, 111 *, 112 timeout: Optional[float] = None, 113 normalize_newlines: bool = False, 114 ) -> AsyncGenerator[Prompt, None]: 115 """Run pipeline using the send/expect API. 116 117 This method should be called using `async with`. It returns a `Prompt` 118 object with send() and expect() methods. 119 120 You can optionally set a default `prompt`. This is used by `expect()` 121 when you don't provide another value. 122 123 Use the `timeout` parameter to set the default timeout for operations. 124 125 Set `normalize_newlines` to True to convert incoming CR and CR-LF to LF. 126 This conversion is done before matching with `expect()`. This option 127 does not affect strings sent with `send()`. 128 """ 129 cmd = self.stdin(Redirect.CAPTURE).stdout(Redirect.CAPTURE) 130 131 cli = None 132 try: 133 async with PipeRunner(cmd, capturing=True) as run: 134 cli = Prompt( 135 run, 136 default_prompt=prompt, 137 default_timeout=timeout, 138 normalize_newlines=normalize_newlines, 139 ) 140 yield cli 141 cli.close() 142 finally: 143 if cli is not None: 144 cli._finish_() # pyright: ignore[reportPrivateUsage]
Run pipeline using the send/expect API.
This method should be called using async with. It returns a Prompt
object with send() and expect() methods.
You can optionally set a default prompt. This is used by expect()
when you don't provide another value.
Use the timeout parameter to set the default timeout for operations.
Set normalize_newlines to True to convert incoming CR and CR-LF to LF.
This conversion is done before matching with expect(). This option
does not affect strings sent with send().
154 def __len__(self) -> int: 155 "Return number of commands in pipe." 156 return len(self.commands)
Return number of commands in pipe.
158 def __getitem__(self, key: int) -> shellous.Command[Any]: 159 "Return specified command by index." 160 return self.commands[key]
Return specified command by index.
207 @property 208 def writable(self) -> "Pipeline[_RT]": 209 "Set writable=True option on last command of pipeline." 210 return self._set(_writable=True)
Set writable=True option on last command of pipeline.
212 @property 213 def result(self) -> "Pipeline[shellous.Result]": 214 "Set `_return_result` and `exit_codes`." 215 return cast( 216 Pipeline[shellous.Result], 217 self._set(_return_result=True, exit_codes=range(-255, 256)), 218 )
Set _return_result and exit_codes.
24class Prompt: 25 """Utility class to help with an interactive prompt session. 26 27 When you are controlling a co-process you will usually "send" some 28 text to it, and then "expect" a response. The "expect" operation can use 29 a regular expression to match different types of responses. 30 31 Create a new `Prompt` instance using the `prompt()` API. 32 33 ``` 34 # In this example, we are using a default prompt of "??? ". 35 # Setting the PS1 environment variable tells the shell to use this as 36 # the shell prompt. 37 cmd = sh("sh").env(PS1="??? ").set(pty=True) 38 39 async with cmd.prompt("??? ", timeout=3.0) as cli: 40 # Turn off terminal echo. 41 cli.echo = False 42 43 # Wait for greeting and initial prompt. Calling expect() with no 44 # argument will match the default prompt "??? ". 45 greeting, _ = await cli.expect() 46 47 # Send a command and wait for the response. 48 await cli.send("echo hello") 49 answer, _ = await cli.expect() 50 assert answer == "hello\\r\\n" 51 ``` 52 """ 53 54 _runner: Union[Runner, PipeRunner] 55 _encoding: str 56 _default_prompt: Optional[re.Pattern[str]] 57 _default_timeout: Optional[float] 58 _normalize_newlines: bool 59 _chunk_size: int 60 _decoder: codecs.IncrementalDecoder 61 _pending: str = "" 62 _at_eof: bool = False 63 _result: Optional[Result] = None 64 65 def __init__( 66 self, 67 runner: Runner, 68 *, 69 default_prompt: Union[str, list[str], re.Pattern[str], None] = None, 70 default_timeout: Optional[float] = None, 71 normalize_newlines: bool = False, 72 _chunk_size: int = _DEFAULT_CHUNK_SIZE, 73 ): 74 assert runner.stdin is not None 75 assert runner.stdout is not None 76 77 if isinstance(default_prompt, (str, list)): 78 default_prompt = _regex_compile_exact(default_prompt) 79 80 self._runner = runner 81 self._encoding = runner.options.encoding 82 self._default_prompt = default_prompt 83 self._default_timeout = default_timeout 84 self._normalize_newlines = normalize_newlines 85 self._chunk_size = _chunk_size 86 self._decoder = _make_decoder(self._encoding, normalize_newlines) 87 88 if LOG_PROMPT: 89 LOGGER.info( 90 "Prompt[pid=%s]: --- BEGIN --- name=%r", 91 self._runner.pid, 92 self._runner.name, 93 ) 94 95 @property 96 def at_eof(self) -> bool: 97 "True if the prompt reader is at the end of file." 98 return self._at_eof and not self._pending 99 100 @property 101 def pending(self) -> str: 102 """Characters that still remain in the `pending` buffer.""" 103 return self._pending 104 105 @property 106 def echo(self) -> bool: 107 """True if PTY is in echo mode. 108 109 If the runner is not using a PTY, always return False. 110 111 When the process is using a PTY, you can enable/disable terminal echo 112 mode by setting the `echo` property to True/False. 113 """ 114 if not isinstance(self._runner, Runner) or self._runner.pty_fd is None: 115 return False 116 return pty_util.get_term_echo(self._runner.pty_fd) 117 118 @echo.setter 119 def echo(self, value: bool) -> None: 120 """Set echo mode for the PTY. 121 122 Raise an error if the runner is not using a PTY. 123 """ 124 if not isinstance(self._runner, Runner) or self._runner.pty_fd is None: 125 raise RuntimeError("Cannot set echo mode. Not running in a PTY.") 126 pty_util.set_term_echo(self._runner.pty_fd, value) 127 128 @property 129 def result(self) -> Result: 130 """The `Result` of the co-process when it exited. 131 132 You can only retrieve this property *after* the `async with` block 133 exits where the co-process is running: 134 135 ``` 136 async with cmd.prompt() as cli: 137 ... 138 # Access `cli.result` here. 139 ``` 140 141 Inside the `async with` block, raise a RuntimeError because the process 142 has not exited yet. 143 """ 144 if self._result is None: 145 raise RuntimeError("Prompt process is still running") 146 return self._result 147 148 async def send( 149 self, 150 text: Union[bytes, str], 151 *, 152 end: Optional[str] = _DEFAULT_LINE_END, 153 no_echo: bool = False, 154 timeout: Optional[float] = None, 155 ) -> None: 156 """Write some `text` to co-process standard input and append a newline. 157 158 The `text` parameter is the string that you want to write. Use a `bytes` 159 object to send some raw bytes (e.g. to the terminal driver). 160 161 The default line ending is "\n". Use the `end` parameter to change the 162 line ending. To omit the line ending entirely, specify `end=None`. 163 164 Set `no_echo` to True when you are writing a password. When you set 165 `no_echo` to True, the `send` method will wait for terminal 166 echo mode to be disabled before writing the text. If shellous logging 167 is enabled, the sensitive information will **not** be logged. 168 169 Use the `timeout` parameter to override the default timeout. Normally, 170 data is delivered immediately and this method returns making a trip 171 through the event loop. However, there are situations where the 172 co-process input pipeline fills and we have to wait for it to 173 drain. 174 175 When this method needs to wait for the co-process input pipe to drain, 176 this method will concurrently read from the output pipe into the pending 177 buffer. This is necessary to avoid a deadlock situation where everything 178 stops because neither process can make progress. 179 """ 180 if end is None: 181 end = "" 182 183 if no_echo: 184 await self._wait_no_echo() 185 186 if isinstance(text, bytes): 187 data = text + encode_bytes(end, self._encoding) 188 else: 189 data = encode_bytes(text + end, self._encoding) 190 191 stdin = self._runner.stdin 192 assert stdin is not None 193 stdin.write(data) 194 195 if LOG_PROMPT: 196 self._log_send(data, no_echo) 197 198 # Drain our write to stdin. 199 cancelled, ex = await harvest_results( 200 self._drain(stdin), 201 timeout=timeout or self._default_timeout, 202 ) 203 if cancelled: 204 raise asyncio.CancelledError() 205 if isinstance(ex[0], Exception): 206 raise ex[0] 207 208 async def expect( 209 self, 210 prompt: Union[str, list[str], re.Pattern[str], None] = None, 211 *, 212 timeout: Optional[float] = None, 213 ) -> tuple[str, re.Match[str]]: 214 """Read from co-process standard output until `prompt` pattern matches. 215 216 Returns a 2-tuple of (output, match) where `output` is the text *before* 217 the prompt pattern and `match` is a `re.Match` object for the prompt 218 text itself. 219 220 If `expect` reaches EOF or a timeout occurs before the prompt pattern 221 matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread 222 characters will be available in the `pending` buffer. 223 224 After this method returns, there may still be characters read from stdout 225 that remain in the `pending` buffer. These are the characters *after* the 226 prompt pattern. You can examine these using the `pending` property. 227 Subsequent calls to expect will examine this buffer first before 228 reading new data from the output pipe. 229 230 By default, this method will use the default `timeout` for the `Prompt` 231 object if one is set. You can use the `timeout` parameter to specify 232 a custom timeout in seconds. 233 234 Prompt Patterns 235 ~~~~~~~~~~~~~~~ 236 237 The `expect()` method supports matching fixed strings and regular 238 expressions. The type of the `prompt` parameter determines the type of 239 search. 240 241 No argument or `None`: 242 Use the default prompt pattern. If there is no default prompt, 243 raise a TypeError. 244 `str`: 245 Match this string exactly. 246 `list[str]`: 247 Match one of these strings exactly. 248 `re.Pattern[str]`: 249 Match the given regular expression. 250 251 When matching a regular expression, only a single Pattern object is 252 supported. To match multiple regular expressions, combine them into a 253 single regular expression using *alternation* syntax (|). 254 255 The `expect()` method returns a 2-tuple (output, match). The `match` 256 is the result of the regular expression search (re.Match). If you 257 specify your prompt as a string or list of strings, it is still compiled 258 into a regular expression that produces an `re.Match` object. You can 259 examine the `match` object to determine the prompt value found. 260 261 This method conducts a regular expression search on streaming data. The 262 `expect()` method reads a new chunk of data into the `pending` buffer 263 and then searches it. You must be careful in writing a regular 264 expression so that the search is agnostic to how the incoming chunks of 265 data arrive. Consider including a boundary condition at the end of your pattern. 266 For example, instead of searching for the open-ended pattern`[a-z]+`, 267 search for the pattern `[a-z]+[^a-z]` which ends with a non-letter 268 character. 269 270 Examples 271 ~~~~~~~~ 272 273 Expect an exact string: 274 275 ``` 276 await cli.expect("ftp> ") 277 await cli.send(command) 278 response, _ = await cli.expect("ftp> ") 279 ``` 280 281 Expect a choice of strings: 282 283 ``` 284 _, m = await cli.expect(["Login: ", "Password: ", "ftp> "]) 285 match m[0]: 286 case "Login: ": 287 await cli.send(login) 288 case "Password: ": 289 await cli.send(password) 290 case "ftp> ": 291 await cli.send(command) 292 ``` 293 294 Read until EOF: 295 296 ``` 297 data = await cli.read_all() 298 ``` 299 300 Read the contents of the `pending` buffer without filling the buffer 301 with any new data from the co-process pipe: 302 303 ``` 304 data = await cli.read_pending() 305 ``` 306 """ 307 if prompt is None: 308 prompt = self._default_prompt 309 if prompt is None: 310 raise TypeError("prompt is required when default prompt is not set") 311 elif isinstance(prompt, (str, list)): 312 prompt = _regex_compile_exact(prompt) 313 314 if self._pending: 315 result = self._search_pending(prompt) 316 if result is not None: 317 return result 318 319 if self._at_eof: 320 raise EOFError("Prompt has reached EOF") 321 322 cancelled, (result,) = await harvest_results( 323 self._read_to_pattern(prompt), 324 timeout=timeout or self._default_timeout, 325 ) 326 if cancelled: 327 raise asyncio.CancelledError() 328 if isinstance(result, Exception): 329 raise result 330 331 return result 332 333 async def read_all( 334 self, 335 *, 336 timeout: Optional[float] = None, 337 ) -> str: 338 """Read from co-process output until EOF. 339 340 If we are already at EOF, return "". 341 """ 342 if not self._at_eof: 343 cancelled, (result,) = await harvest_results( 344 self._read_some(tag="@read_all"), 345 timeout=timeout or self._default_timeout, 346 ) 347 if cancelled: 348 raise asyncio.CancelledError() 349 if isinstance(result, Exception): 350 raise result 351 352 return self.read_pending() 353 354 def read_pending(self) -> str: 355 """Read the contents of the pending buffer and empty it. 356 357 This method does not fill the pending buffer with any new data from the 358 co-process output pipe. If the pending buffer is already empty, return 359 "". 360 """ 361 result = self._pending 362 self._pending = "" 363 return result 364 365 async def command( 366 self, 367 text: str, 368 *, 369 end: str = _DEFAULT_LINE_END, 370 no_echo: bool = False, 371 prompt: Union[str, re.Pattern[str], None] = None, 372 timeout: Optional[float] = None, 373 allow_eof: bool = False, 374 ) -> str: 375 """Send some text to the co-process and return the response. 376 377 This method is equivalent to calling send() following by expect(). 378 However, the return value is simpler; `command()` does not return the 379 `re.Match` object. 380 381 If you call this method *after* the co-process output pipe has already 382 returned EOF, raise `EOFError`. 383 384 If `allow_eof` is True, this method will read data up to EOF instead of 385 raising an EOFError. 386 """ 387 if self._at_eof: 388 raise EOFError("Prompt has reached EOF") 389 390 await self.send(text, end=end, no_echo=no_echo, timeout=timeout) 391 try: 392 result, _ = await self.expect(prompt, timeout=timeout) 393 except EOFError: 394 if not allow_eof: 395 raise 396 result = self.read_pending() 397 398 return result 399 400 def close(self) -> None: 401 "Close stdin to end the prompt session." 402 stdin = self._runner.stdin 403 assert stdin is not None 404 405 if isinstance(self._runner, Runner) and self._runner.pty_eof: 406 # Write EOF twice; once to end the current line, and the second 407 # time to signal the end. 408 stdin.write(self._runner.pty_eof * 2) 409 if LOG_PROMPT: 410 LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid) 411 412 else: 413 stdin.close() 414 if LOG_PROMPT: 415 LOGGER.info("Prompt[pid=%s] close", self._runner.pid) 416 417 def _finish_(self) -> None: 418 "Internal method called when process exits to fetch the `Result` and cache it." 419 self._result = self._runner.result(check=False) 420 if LOG_PROMPT: 421 LOGGER.info( 422 "Prompt[pid=%s]: --- END --- result=%r", 423 self._runner.pid, 424 self._result, 425 ) 426 427 async def _read_to_pattern( 428 self, 429 pattern: re.Pattern[str], 430 ) -> tuple[str, re.Match[str]]: 431 """Read text up to part that matches the pattern. 432 433 Returns 2-tuple with (text, match). 434 """ 435 stdout = self._runner.stdout 436 assert stdout is not None 437 assert self._chunk_size > 0 438 439 while not self._at_eof: 440 _prev_len = len(self._pending) # debug check 441 442 try: 443 # Read chunk and check for EOF. 444 chunk = await stdout.read(self._chunk_size) 445 if not chunk: 446 self._at_eof = True 447 except asyncio.CancelledError: 448 if LOG_PROMPT: 449 LOGGER.info( 450 "Prompt[pid=%s] receive cancelled: pending=%r", 451 self._runner.pid, 452 self._pending, 453 ) 454 raise 455 456 if LOG_PROMPT: 457 self._log_receive(chunk) 458 459 # Decode eligible bytes into our buffer. 460 data = self._decoder.decode(chunk, final=self._at_eof) 461 if not data and not self._at_eof: 462 continue 463 self._pending += data 464 465 result = self._search_pending(pattern) 466 if result is not None: 467 return result 468 469 assert self._at_eof or len(self._pending) > _prev_len # debug check 470 471 raise EOFError("Prompt has reached EOF") 472 473 def _search_pending( 474 self, 475 pattern: re.Pattern[str], 476 ) -> Optional[tuple[str, re.Match[str]]]: 477 """Search our `pending` buffer for the pattern. 478 479 If we find a match, we return the data up to the portion that matched 480 and leave the trailing data in the `pending` buffer. This method returns 481 (result, match) or None if there is no match. 482 """ 483 found = pattern.search(self._pending) 484 if found: 485 result = self._pending[0 : found.start(0)] 486 self._pending = self._pending[found.end(0) :] 487 if LOG_PROMPT: 488 LOGGER.info( 489 "Prompt[pid=%s] found: %r [%s CHARS PENDING]", 490 self._runner.pid, 491 found, 492 len(self._pending), 493 ) 494 return (result, found) 495 496 return None 497 498 async def _drain(self, stream: asyncio.StreamWriter) -> None: 499 "Drain stream while reading into buffer concurrently." 500 read_task = asyncio.create_task( 501 self._read_some(tag="@drain", concurrent_cancel=True) 502 ) 503 try: 504 await stream.drain() 505 506 finally: 507 if not read_task.done(): 508 read_task.cancel() 509 await read_task 510 511 async def _read_some( 512 self, 513 *, 514 tag: str = "", 515 concurrent_cancel: bool = False, 516 ) -> None: 517 "Read into `pending` buffer until cancelled or EOF." 518 stdout = self._runner.stdout 519 assert stdout is not None 520 assert self._chunk_size > 0 521 522 while not self._at_eof: 523 # Yield time to other tasks; read() doesn't yield as long as there 524 # is data to read. We need to provide a cancel point when this 525 # method is called during `drain`. 526 if concurrent_cancel: 527 await asyncio.sleep(0) 528 529 # Read chunk and check for EOF. 530 chunk = await stdout.read(self._chunk_size) 531 if not chunk: 532 self._at_eof = True 533 534 if LOG_PROMPT: 535 self._log_receive(chunk, tag) 536 537 # Decode eligible bytes into our buffer. 538 data = self._decoder.decode(chunk, final=self._at_eof) 539 self._pending += data 540 541 async def _wait_no_echo(self): 542 "Wait for terminal echo mode to be disabled." 543 if LOG_PROMPT: 544 LOGGER.info("Prompt[pid=%s] wait: no_echo", self._runner.pid) 545 546 for _ in range(4 * 30): 547 if not self.echo: 548 break 549 await asyncio.sleep(0.25) 550 else: 551 raise RuntimeError("Timed out: Terminal echo mode remains enabled.") 552 553 def _log_send(self, data: bytes, no_echo: bool): 554 "Log data as it is being sent." 555 pid = self._runner.pid 556 557 if no_echo: 558 LOGGER.info("Prompt[pid=%s] send: [[HIDDEN]]", pid) 559 else: 560 data_len = len(data) 561 if data_len > _LOG_LIMIT: 562 LOGGER.info( 563 "Prompt[pid=%s] send: [%d B] %r...%r", 564 pid, 565 data_len, 566 data[: _LOG_LIMIT - _LOG_LIMIT_END], 567 data[-_LOG_LIMIT_END:], 568 ) 569 else: 570 LOGGER.info( 571 "Prompt[pid=%s] send: [%d B] %r", 572 pid, 573 data_len, 574 data, 575 ) 576 577 def _log_receive(self, data: bytes, tag: str = ""): 578 "Log data as it is being received." 579 pid = self._runner.pid 580 data_len = len(data) 581 582 if data_len > _LOG_LIMIT: 583 LOGGER.info( 584 "Prompt[pid=%s] receive%s: [%d B] %r...%r", 585 pid, 586 tag, 587 data_len, 588 data[: _LOG_LIMIT - _LOG_LIMIT_END], 589 data[-_LOG_LIMIT_END:], 590 ) 591 else: 592 LOGGER.info( 593 "Prompt[pid=%s] receive%s: [%d B] %r", 594 pid, 595 tag, 596 data_len, 597 data, 598 )
Utility class to help with an interactive prompt session.
When you are controlling a co-process you will usually "send" some text to it, and then "expect" a response. The "expect" operation can use a regular expression to match different types of responses.
Create a new Prompt instance using the prompt() API.
# In this example, we are using a default prompt of "??? ".
# Setting the PS1 environment variable tells the shell to use this as
# the shell prompt.
cmd = sh("sh").env(PS1="??? ").set(pty=True)
async with cmd.prompt("??? ", timeout=3.0) as cli:
# Turn off terminal echo.
cli.echo = False
# Wait for greeting and initial prompt. Calling expect() with no
# argument will match the default prompt "??? ".
greeting, _ = await cli.expect()
# Send a command and wait for the response.
await cli.send("echo hello")
answer, _ = await cli.expect()
assert answer == "hello\r\n"
65 def __init__( 66 self, 67 runner: Runner, 68 *, 69 default_prompt: Union[str, list[str], re.Pattern[str], None] = None, 70 default_timeout: Optional[float] = None, 71 normalize_newlines: bool = False, 72 _chunk_size: int = _DEFAULT_CHUNK_SIZE, 73 ): 74 assert runner.stdin is not None 75 assert runner.stdout is not None 76 77 if isinstance(default_prompt, (str, list)): 78 default_prompt = _regex_compile_exact(default_prompt) 79 80 self._runner = runner 81 self._encoding = runner.options.encoding 82 self._default_prompt = default_prompt 83 self._default_timeout = default_timeout 84 self._normalize_newlines = normalize_newlines 85 self._chunk_size = _chunk_size 86 self._decoder = _make_decoder(self._encoding, normalize_newlines) 87 88 if LOG_PROMPT: 89 LOGGER.info( 90 "Prompt[pid=%s]: --- BEGIN --- name=%r", 91 self._runner.pid, 92 self._runner.name, 93 )
95 @property 96 def at_eof(self) -> bool: 97 "True if the prompt reader is at the end of file." 98 return self._at_eof and not self._pending
True if the prompt reader is at the end of file.
100 @property 101 def pending(self) -> str: 102 """Characters that still remain in the `pending` buffer.""" 103 return self._pending
Characters that still remain in the pending buffer.
105 @property 106 def echo(self) -> bool: 107 """True if PTY is in echo mode. 108 109 If the runner is not using a PTY, always return False. 110 111 When the process is using a PTY, you can enable/disable terminal echo 112 mode by setting the `echo` property to True/False. 113 """ 114 if not isinstance(self._runner, Runner) or self._runner.pty_fd is None: 115 return False 116 return pty_util.get_term_echo(self._runner.pty_fd)
True if PTY is in echo mode.
If the runner is not using a PTY, always return False.
When the process is using a PTY, you can enable/disable terminal echo
mode by setting the echo property to True/False.
128 @property 129 def result(self) -> Result: 130 """The `Result` of the co-process when it exited. 131 132 You can only retrieve this property *after* the `async with` block 133 exits where the co-process is running: 134 135 ``` 136 async with cmd.prompt() as cli: 137 ... 138 # Access `cli.result` here. 139 ``` 140 141 Inside the `async with` block, raise a RuntimeError because the process 142 has not exited yet. 143 """ 144 if self._result is None: 145 raise RuntimeError("Prompt process is still running") 146 return self._result
The Result of the co-process when it exited.
You can only retrieve this property after the async with block
exits where the co-process is running:
async with cmd.prompt() as cli:
...
# Access `cli.result` here.
Inside the async with block, raise a RuntimeError because the process
has not exited yet.
148 async def send( 149 self, 150 text: Union[bytes, str], 151 *, 152 end: Optional[str] = _DEFAULT_LINE_END, 153 no_echo: bool = False, 154 timeout: Optional[float] = None, 155 ) -> None: 156 """Write some `text` to co-process standard input and append a newline. 157 158 The `text` parameter is the string that you want to write. Use a `bytes` 159 object to send some raw bytes (e.g. to the terminal driver). 160 161 The default line ending is "\n". Use the `end` parameter to change the 162 line ending. To omit the line ending entirely, specify `end=None`. 163 164 Set `no_echo` to True when you are writing a password. When you set 165 `no_echo` to True, the `send` method will wait for terminal 166 echo mode to be disabled before writing the text. If shellous logging 167 is enabled, the sensitive information will **not** be logged. 168 169 Use the `timeout` parameter to override the default timeout. Normally, 170 data is delivered immediately and this method returns making a trip 171 through the event loop. However, there are situations where the 172 co-process input pipeline fills and we have to wait for it to 173 drain. 174 175 When this method needs to wait for the co-process input pipe to drain, 176 this method will concurrently read from the output pipe into the pending 177 buffer. This is necessary to avoid a deadlock situation where everything 178 stops because neither process can make progress. 179 """ 180 if end is None: 181 end = "" 182 183 if no_echo: 184 await self._wait_no_echo() 185 186 if isinstance(text, bytes): 187 data = text + encode_bytes(end, self._encoding) 188 else: 189 data = encode_bytes(text + end, self._encoding) 190 191 stdin = self._runner.stdin 192 assert stdin is not None 193 stdin.write(data) 194 195 if LOG_PROMPT: 196 self._log_send(data, no_echo) 197 198 # Drain our write to stdin. 199 cancelled, ex = await harvest_results( 200 self._drain(stdin), 201 timeout=timeout or self._default_timeout, 202 ) 203 if cancelled: 204 raise asyncio.CancelledError() 205 if isinstance(ex[0], Exception): 206 raise ex[0]
Write some text to co-process standard input and append a newline.
The `text` parameter is the string that you want to write. Use a `bytes`
object to send some raw bytes (e.g. to the terminal driver).
The default line ending is "
". Use the end parameter to change the
line ending. To omit the line ending entirely, specify end=None.
Set `no_echo` to True when you are writing a password. When you set
`no_echo` to True, the `send` method will wait for terminal
echo mode to be disabled before writing the text. If shellous logging
is enabled, the sensitive information will **not** be logged.
Use the `timeout` parameter to override the default timeout. Normally,
data is delivered immediately and this method returns making a trip
through the event loop. However, there are situations where the
co-process input pipeline fills and we have to wait for it to
drain.
When this method needs to wait for the co-process input pipe to drain,
this method will concurrently read from the output pipe into the pending
buffer. This is necessary to avoid a deadlock situation where everything
stops because neither process can make progress.
208 async def expect( 209 self, 210 prompt: Union[str, list[str], re.Pattern[str], None] = None, 211 *, 212 timeout: Optional[float] = None, 213 ) -> tuple[str, re.Match[str]]: 214 """Read from co-process standard output until `prompt` pattern matches. 215 216 Returns a 2-tuple of (output, match) where `output` is the text *before* 217 the prompt pattern and `match` is a `re.Match` object for the prompt 218 text itself. 219 220 If `expect` reaches EOF or a timeout occurs before the prompt pattern 221 matches, it raises an `EOFError` or `asyncio.TimeoutError`. The unread 222 characters will be available in the `pending` buffer. 223 224 After this method returns, there may still be characters read from stdout 225 that remain in the `pending` buffer. These are the characters *after* the 226 prompt pattern. You can examine these using the `pending` property. 227 Subsequent calls to expect will examine this buffer first before 228 reading new data from the output pipe. 229 230 By default, this method will use the default `timeout` for the `Prompt` 231 object if one is set. You can use the `timeout` parameter to specify 232 a custom timeout in seconds. 233 234 Prompt Patterns 235 ~~~~~~~~~~~~~~~ 236 237 The `expect()` method supports matching fixed strings and regular 238 expressions. The type of the `prompt` parameter determines the type of 239 search. 240 241 No argument or `None`: 242 Use the default prompt pattern. If there is no default prompt, 243 raise a TypeError. 244 `str`: 245 Match this string exactly. 246 `list[str]`: 247 Match one of these strings exactly. 248 `re.Pattern[str]`: 249 Match the given regular expression. 250 251 When matching a regular expression, only a single Pattern object is 252 supported. To match multiple regular expressions, combine them into a 253 single regular expression using *alternation* syntax (|). 254 255 The `expect()` method returns a 2-tuple (output, match). The `match` 256 is the result of the regular expression search (re.Match). If you 257 specify your prompt as a string or list of strings, it is still compiled 258 into a regular expression that produces an `re.Match` object. You can 259 examine the `match` object to determine the prompt value found. 260 261 This method conducts a regular expression search on streaming data. The 262 `expect()` method reads a new chunk of data into the `pending` buffer 263 and then searches it. You must be careful in writing a regular 264 expression so that the search is agnostic to how the incoming chunks of 265 data arrive. Consider including a boundary condition at the end of your pattern. 266 For example, instead of searching for the open-ended pattern`[a-z]+`, 267 search for the pattern `[a-z]+[^a-z]` which ends with a non-letter 268 character. 269 270 Examples 271 ~~~~~~~~ 272 273 Expect an exact string: 274 275 ``` 276 await cli.expect("ftp> ") 277 await cli.send(command) 278 response, _ = await cli.expect("ftp> ") 279 ``` 280 281 Expect a choice of strings: 282 283 ``` 284 _, m = await cli.expect(["Login: ", "Password: ", "ftp> "]) 285 match m[0]: 286 case "Login: ": 287 await cli.send(login) 288 case "Password: ": 289 await cli.send(password) 290 case "ftp> ": 291 await cli.send(command) 292 ``` 293 294 Read until EOF: 295 296 ``` 297 data = await cli.read_all() 298 ``` 299 300 Read the contents of the `pending` buffer without filling the buffer 301 with any new data from the co-process pipe: 302 303 ``` 304 data = await cli.read_pending() 305 ``` 306 """ 307 if prompt is None: 308 prompt = self._default_prompt 309 if prompt is None: 310 raise TypeError("prompt is required when default prompt is not set") 311 elif isinstance(prompt, (str, list)): 312 prompt = _regex_compile_exact(prompt) 313 314 if self._pending: 315 result = self._search_pending(prompt) 316 if result is not None: 317 return result 318 319 if self._at_eof: 320 raise EOFError("Prompt has reached EOF") 321 322 cancelled, (result,) = await harvest_results( 323 self._read_to_pattern(prompt), 324 timeout=timeout or self._default_timeout, 325 ) 326 if cancelled: 327 raise asyncio.CancelledError() 328 if isinstance(result, Exception): 329 raise result 330 331 return result
Read from co-process standard output until prompt pattern matches.
Returns a 2-tuple of (output, match) where output is the text before
the prompt pattern and match is a re.Match object for the prompt
text itself.
If expect reaches EOF or a timeout occurs before the prompt pattern
matches, it raises an EOFError or asyncio.TimeoutError. The unread
characters will be available in the pending buffer.
After this method returns, there may still be characters read from stdout
that remain in the pending buffer. These are the characters after the
prompt pattern. You can examine these using the pending property.
Subsequent calls to expect will examine this buffer first before
reading new data from the output pipe.
By default, this method will use the default timeout for the Prompt
object if one is set. You can use the timeout parameter to specify
a custom timeout in seconds.
Prompt Patterns
~~~
The expect() method supports matching fixed strings and regular
expressions. The type of the prompt parameter determines the type of
search.
No argument or None:
Use the default prompt pattern. If there is no default prompt,
raise a TypeError.
str:
Match this string exactly.
list[str]:
Match one of these strings exactly.
re.Pattern[str]:
Match the given regular expression.
When matching a regular expression, only a single Pattern object is supported. To match multiple regular expressions, combine them into a single regular expression using alternation syntax (|).
The expect() method returns a 2-tuple (output, match). The match
is the result of the regular expression search (re.Match). If you
specify your prompt as a string or list of strings, it is still compiled
into a regular expression that produces an re.Match object. You can
examine the match object to determine the prompt value found.
This method conducts a regular expression search on streaming data. The
expect() method reads a new chunk of data into the pending buffer
and then searches it. You must be careful in writing a regular
expression so that the search is agnostic to how the incoming chunks of
data arrive. Consider including a boundary condition at the end of your pattern.
For example, instead of searching for the open-ended pattern[a-z]+,
search for the pattern [a-z]+[^a-z] which ends with a non-letter
character.
Examples
~~~~
Expect an exact string:
await cli.expect("ftp> ")
await cli.send(command)
response, _ = await cli.expect("ftp> ")
Expect a choice of strings:
_, m = await cli.expect(["Login: ", "Password: ", "ftp> "])
match m[0]:
case "Login: ":
await cli.send(login)
case "Password: ":
await cli.send(password)
case "ftp> ":
await cli.send(command)
Read until EOF:
data = await cli.read_all()
Read the contents of the pending buffer without filling the buffer
with any new data from the co-process pipe:
data = await cli.read_pending()
333 async def read_all( 334 self, 335 *, 336 timeout: Optional[float] = None, 337 ) -> str: 338 """Read from co-process output until EOF. 339 340 If we are already at EOF, return "". 341 """ 342 if not self._at_eof: 343 cancelled, (result,) = await harvest_results( 344 self._read_some(tag="@read_all"), 345 timeout=timeout or self._default_timeout, 346 ) 347 if cancelled: 348 raise asyncio.CancelledError() 349 if isinstance(result, Exception): 350 raise result 351 352 return self.read_pending()
Read from co-process output until EOF.
If we are already at EOF, return "".
354 def read_pending(self) -> str: 355 """Read the contents of the pending buffer and empty it. 356 357 This method does not fill the pending buffer with any new data from the 358 co-process output pipe. If the pending buffer is already empty, return 359 "". 360 """ 361 result = self._pending 362 self._pending = "" 363 return result
Read the contents of the pending buffer and empty it.
This method does not fill the pending buffer with any new data from the co-process output pipe. If the pending buffer is already empty, return "".
365 async def command( 366 self, 367 text: str, 368 *, 369 end: str = _DEFAULT_LINE_END, 370 no_echo: bool = False, 371 prompt: Union[str, re.Pattern[str], None] = None, 372 timeout: Optional[float] = None, 373 allow_eof: bool = False, 374 ) -> str: 375 """Send some text to the co-process and return the response. 376 377 This method is equivalent to calling send() following by expect(). 378 However, the return value is simpler; `command()` does not return the 379 `re.Match` object. 380 381 If you call this method *after* the co-process output pipe has already 382 returned EOF, raise `EOFError`. 383 384 If `allow_eof` is True, this method will read data up to EOF instead of 385 raising an EOFError. 386 """ 387 if self._at_eof: 388 raise EOFError("Prompt has reached EOF") 389 390 await self.send(text, end=end, no_echo=no_echo, timeout=timeout) 391 try: 392 result, _ = await self.expect(prompt, timeout=timeout) 393 except EOFError: 394 if not allow_eof: 395 raise 396 result = self.read_pending() 397 398 return result
Send some text to the co-process and return the response.
This method is equivalent to calling send() following by expect().
However, the return value is simpler; command() does not return the
re.Match object.
If you call this method after the co-process output pipe has already
returned EOF, raise EOFError.
If allow_eof is True, this method will read data up to EOF instead of
raising an EOFError.
400 def close(self) -> None: 401 "Close stdin to end the prompt session." 402 stdin = self._runner.stdin 403 assert stdin is not None 404 405 if isinstance(self._runner, Runner) and self._runner.pty_eof: 406 # Write EOF twice; once to end the current line, and the second 407 # time to signal the end. 408 stdin.write(self._runner.pty_eof * 2) 409 if LOG_PROMPT: 410 LOGGER.info("Prompt[pid=%s] send: [[EOF]]", self._runner.pid) 411 412 else: 413 stdin.close() 414 if LOG_PROMPT: 415 LOGGER.info("Prompt[pid=%s] close", self._runner.pid)
Close stdin to end the prompt session.
200def cbreak(rows: int = 0, cols: int = 0) -> PtyAdapter: 201 "Return a function that sets PtyOptions.child_fd to cbreak mode." 202 203 def _pty_set_cbreak(fdesc: int): 204 tty.setcbreak(fdesc) 205 if rows or cols: 206 _set_term_size(fdesc, rows, cols) 207 assert _get_eof(fdesc) == b"" 208 209 return _pty_set_cbreak
Return a function that sets PtyOptions.child_fd to cbreak mode.
212def cooked(rows: int = 0, cols: int = 0, echo: bool = True) -> PtyAdapter: 213 "Return a function that leaves PtyOptions.child_fd in cooked mode." 214 215 def _pty_set_canonical(fdesc: int): 216 if rows or cols: 217 _set_term_size(fdesc, rows, cols) 218 if not echo: 219 set_term_echo(fdesc, False) 220 assert _get_eof(fdesc) == b"\x04" 221 222 return _pty_set_canonical
Return a function that leaves PtyOptions.child_fd in cooked mode.
188def raw(rows: int = 0, cols: int = 0) -> PtyAdapter: 189 "Return a function that sets PtyOptions.child_fd to raw mode." 190 191 def _pty_set_raw(fdesc: int): 192 tty.setraw(fdesc) 193 if rows or cols: 194 _set_term_size(fdesc, rows, cols) 195 assert _get_eof(fdesc) == b"" 196 197 return _pty_set_raw
Return a function that sets PtyOptions.child_fd to raw mode.
26@dataclass(frozen=True, **_KW_ONLY) 27class Result: 28 "Concrete class for the result of a Command." 29 30 exit_code: int 31 "Command's exit status. If < 0, this is a negated signal number." 32 33 output_bytes: bytes 34 "Output of command as bytes. May be None if there is no output." 35 36 error_bytes: bytes 37 "Limited standard error from command if not redirected." 38 39 cancelled: bool 40 "Command was cancelled." 41 42 encoding: str 43 "Output encoding." 44 45 @property 46 def output(self) -> str: 47 "Output of command as a string." 48 return decode_bytes(self.output_bytes, self.encoding) 49 50 @property 51 def error(self) -> str: 52 "Error from command as a string (if it is not redirected)." 53 return decode_bytes(self.error_bytes, self.encoding) 54 55 @property 56 def exit_signal(self) -> Optional[signal.Signals]: 57 "Signal that caused the command to exit, or None if no signal." 58 if self.exit_code >= 0: 59 return None 60 return signal.Signals(-self.exit_code) 61 62 def __bool__(self) -> bool: 63 "Return true if exit_code is 0." 64 return self.exit_code == 0
Concrete class for the result of a Command.
45 @property 46 def output(self) -> str: 47 "Output of command as a string." 48 return decode_bytes(self.output_bytes, self.encoding)
Output of command as a string.
50 @property 51 def error(self) -> str: 52 "Error from command as a string (if it is not redirected)." 53 return decode_bytes(self.error_bytes, self.encoding)
Error from command as a string (if it is not redirected).
55 @property 56 def exit_signal(self) -> Optional[signal.Signals]: 57 "Signal that caused the command to exit, or None if no signal." 58 if self.exit_code >= 0: 59 return None 60 return signal.Signals(-self.exit_code)
Signal that caused the command to exit, or None if no signal.
14class ResultError(Exception): 15 "Represents a non-zero exit status." 16 17 @property 18 def result(self) -> "shellous.Result": 19 "Result of the command." 20 return self.args[0]
Represents a non-zero exit status.
386class Runner: 387 """Runner is an asynchronous context manager that runs a command. 388 389 ``` 390 async with Runner(cmd) as run: 391 # process streams: run.stdin, run.stdout, run.stderr (if not None) 392 result = run.result() 393 ``` 394 """ 395 396 stdin: Optional[asyncio.StreamWriter] = None 397 "Process standard input." 398 399 stdout: Optional[asyncio.StreamReader] = None 400 "Process standard output." 401 402 stderr: Optional[asyncio.StreamReader] = None 403 "Process standard error." 404 405 _options: _RunOptions 406 _tasks: list[asyncio.Task[Any]] 407 _proc: Optional["asyncio.subprocess.Process"] = None 408 _cancelled: bool = False 409 _timer: Optional[asyncio.TimerHandle] = None 410 _timed_out: bool = False 411 _last_signal: Optional[int] = None 412 413 def __init__(self, command: "shellous.Command[Any]"): 414 self._options = _RunOptions(command) 415 self._tasks = [] 416 417 @property 418 def name(self) -> str: 419 "Return name of process being run." 420 return self.command.name 421 422 @property 423 def options(self) -> "shellous.Options": 424 "Return options for process being run." 425 return self.command.options 426 427 @property 428 def command(self) -> "shellous.Command[Any]": 429 "Return the command being run." 430 return self._options.command 431 432 @property 433 def pid(self) -> Optional[int]: 434 "Return the command's process ID." 435 if not self._proc: 436 return None 437 return self._proc.pid 438 439 @property 440 def returncode(self) -> Optional[int]: 441 "Process's exit code." 442 if not self._proc: 443 if self._cancelled: 444 # The process was cancelled before starting. 445 return CANCELLED_EXIT_CODE 446 return None 447 code = self._proc.returncode 448 if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None: 449 # After sending a signal, `waitpid` may fail to locate the child 450 # process. In this case, map the status to the last signal we sent. 451 # For more on this, see https://github.com/python/cpython/issues/87744 452 return -self._last_signal # pylint: disable=invalid-unary-operand-type 453 return code 454 455 @property 456 def cancelled(self) -> bool: 457 "Return True if the command was cancelled." 458 return self._cancelled 459 460 @property 461 def pty_fd(self) -> Optional[int]: 462 """The file descriptor used to communicate with the child PTY process. 463 464 Returns None if the process is not using a PTY. 465 """ 466 pty_fds = self._options.pty_fds 467 if pty_fds is not None: 468 return pty_fds.parent_fd 469 return None 470 471 @property 472 def pty_eof(self) -> Optional[bytes]: 473 """Byte sequence used to indicate EOF when written to the PTY child. 474 475 Returns None if process is not using a PTY. 476 """ 477 pty_fds = self._options.pty_fds 478 if pty_fds is not None: 479 return pty_fds.eof 480 return None 481 482 def result(self, *, check: bool = True) -> Result: 483 "Check process exit code and raise a ResultError if necessary." 484 code = self.returncode 485 if code is None: 486 raise TypeError("Runner.result(): Process has not exited") 487 488 result = Result( 489 exit_code=code, 490 output_bytes=bytes(self._options.output_bytes or b""), 491 error_bytes=bytes(self._options.error_bytes or b""), 492 cancelled=self._cancelled, 493 encoding=self._options.encoding, 494 ) 495 496 if not check: 497 return result 498 499 return check_result( 500 result, 501 self.command.options, 502 self._cancelled, 503 self._timed_out, 504 ) 505 506 def add_task( 507 self, 508 coro: Coroutine[Any, Any, _T], 509 tag: str = "", 510 ) -> asyncio.Task[_T]: 511 "Add a background task." 512 task_name = f"{self.name}#{tag}" 513 task = asyncio.create_task(coro, name=task_name) 514 self._tasks.append(task) 515 return task 516 517 def send_signal(self, sig: int) -> None: 518 "Send an arbitrary signal to the process if it is running." 519 if self.returncode is None: 520 self._signal(sig) 521 522 def cancel(self) -> None: 523 "Cancel the running process if it is running." 524 if self.returncode is None: 525 self._signal(self.command.options.cancel_signal) 526 527 def _is_bsd_pty(self) -> bool: 528 "Return true if we're running a pty on BSD." 529 return BSD_DERIVED and bool(self._options.pty_fds) 530 531 @log_method(LOG_DETAIL) 532 async def _wait(self) -> None: 533 "Normal wait for background I/O tasks and process to finish." 534 assert self._proc 535 536 try: 537 if self._tasks: 538 await harvest(*self._tasks, trustee=self) 539 if self._is_bsd_pty(): 540 await self._waiter() 541 542 except asyncio.CancelledError: 543 LOGGER.debug("Runner.wait cancelled %r", self) 544 self._set_cancelled() 545 self._tasks.clear() # all tasks were cancelled 546 await self._kill() 547 548 except Exception as ex: 549 LOGGER.debug("Runner.wait exited with error %r ex=%r", self, ex) 550 self._tasks.clear() # all tasks were cancelled 551 await self._kill() 552 raise # re-raise exception 553 554 @log_method(LOG_DETAIL) 555 async def _wait_pid(self): 556 "Manually poll `waitpid` until process finishes." 557 assert self._is_bsd_pty() 558 559 while True: 560 assert self._proc is not None # (pyright) 561 562 if poll_wait_pid(self._proc): 563 break 564 await asyncio.sleep(0.025) 565 566 @log_method(LOG_DETAIL) 567 async def _kill(self): 568 "Kill process and wait for it to finish." 569 assert self._proc 570 571 cancel_timeout = self.command.options.cancel_timeout 572 cancel_signal = self.command.options.cancel_signal 573 574 try: 575 # If not already done, send cancel signal. 576 if self._proc.returncode is None: 577 self._signal(cancel_signal) 578 579 if self._tasks: 580 await harvest(*self._tasks, timeout=cancel_timeout, trustee=self) 581 582 if self._proc.returncode is None: 583 await harvest(self._waiter(), timeout=cancel_timeout, trustee=self) 584 585 except (asyncio.CancelledError, asyncio.TimeoutError) as ex: 586 LOGGER.warning("Runner.kill %r (ex)=%r", self, ex) 587 if _is_cancelled(ex): 588 self._set_cancelled() 589 await self._kill_wait() 590 591 except (Exception, GeneratorExit) as ex: 592 LOGGER.warning("Runner.kill %r ex=%r", self, ex) 593 await self._kill_wait() 594 raise 595 596 def _signal(self, sig: Optional[int]): 597 "Send a signal to the process." 598 assert self._proc is not None # (pyright) 599 600 if LOG_DETAIL: 601 LOGGER.debug("Runner.signal %r signal=%r", self, sig) 602 self._audit_callback("signal", signal=sig) 603 604 if sig is None: 605 self._proc.kill() 606 else: 607 self._proc.send_signal(sig) 608 self._last_signal = sig 609 610 @log_method(LOG_DETAIL) 611 async def _kill_wait(self): 612 "Wait for killed process to exit." 613 assert self._proc 614 615 # Check if process is already done. 616 if self._proc.returncode is not None: 617 return 618 619 try: 620 self._signal(None) 621 await harvest(self._waiter(), timeout=_KILL_TIMEOUT, trustee=self) 622 except asyncio.TimeoutError as ex: 623 # Manually check if the process is still running. 624 if poll_wait_pid(self._proc): 625 LOGGER.warning("%r process reaped manually %r", self, self._proc) 626 else: 627 LOGGER.error("%r failed to kill process %r", self, self._proc) 628 raise RuntimeError(f"Unable to kill process {self._proc!r}") from ex 629 630 @log_method(LOG_DETAIL) 631 async def __aenter__(self): 632 "Set up redirections and launch subprocess." 633 self._audit_callback("start") 634 try: 635 return await self._start() 636 except BaseException as ex: 637 self._stop_timer() # failsafe just in case 638 self._audit_callback("stop", failure=type(ex).__name__) 639 raise 640 finally: 641 if self._cancelled and self.command.options._catch_cancelled_error: 642 # Raises ResultError instead of CancelledError. 643 self.result() 644 645 @log_method(LOG_DETAIL) 646 async def _start(self): 647 "Set up redirections and launch subprocess." 648 # assert self._proc is None 649 assert not self._tasks 650 651 try: 652 # Set up subprocess arguments and launch subprocess. 653 with self._options as opts: 654 await self._subprocess_spawn(opts) 655 656 assert self._proc is not None 657 stdin = self._proc.stdin 658 stdout = self._proc.stdout 659 stderr = self._proc.stderr 660 661 # Assign pty streams. 662 if opts.pty_fds: 663 assert (stdin, stdout) == (None, None) 664 stdin, stdout = opts.pty_fds.writer, opts.pty_fds.reader 665 666 if stderr is not None: 667 limit = None 668 if opts.error_bytes is not None: 669 error = opts.error_bytes 670 limit = opts.command.options.error_limit 671 elif opts.is_stderr_only: 672 assert stdout is None 673 assert opts.output_bytes is not None 674 error = opts.output_bytes 675 else: 676 error = opts.command.options.error 677 stderr = self._setup_output_sink( 678 stderr, error, opts.encoding, "stderr", limit 679 ) 680 681 if stdout is not None: 682 limit = None 683 if opts.output_bytes is not None: 684 output = opts.output_bytes 685 else: 686 output = opts.command.options.output 687 stdout = self._setup_output_sink( 688 stdout, output, opts.encoding, "stdout", limit 689 ) 690 691 if stdin is not None: 692 stdin = self._setup_input_source(stdin, opts) 693 694 except (Exception, asyncio.CancelledError) as ex: 695 LOGGER.debug("Runner._start %r ex=%r", self, ex) 696 if _is_cancelled(ex): 697 self._set_cancelled() 698 if self._proc: 699 await self._kill() 700 raise 701 702 # Make final streams available. These may be different from `self.proc` 703 # versions. 704 self.stdin = stdin 705 self.stdout = stdout 706 self.stderr = stderr 707 708 # Add a task to monitor for when the process finishes. 709 if not self._is_bsd_pty(): 710 self.add_task(self._waiter(), "waiter") 711 712 # Set a timer to cancel the current task after a timeout. 713 self._start_timer(self.command.options.timeout) 714 715 return self 716 717 @log_method(LOG_DETAIL) 718 async def _subprocess_spawn(self, opts: _RunOptions): 719 "Start the subprocess." 720 assert self._proc is None 721 722 # Second half of pty setup. 723 if opts.pty_fds: 724 opts.pty_fds = await opts.pty_fds.open_streams() 725 726 # Check for task cancellation and yield right before exec'ing. If the 727 # current task is already cancelled, this will raise a CancelledError, 728 # and we save ourselves the work of launching and immediately killing 729 # a process. 730 await asyncio.sleep(0) 731 732 # Launch the subprocess (always completes even if cancelled). 733 await uninterrupted(self._subprocess_exec(opts)) 734 735 # Launch the process substitution commands (if any). 736 for cmd in opts.subcmds: 737 self.add_task(cmd.coro(), "procsub") 738 739 @log_method(LOG_DETAIL) 740 async def _subprocess_exec(self, opts: _RunOptions): 741 "Start the subprocess and assign to `self.proc`." 742 with log_timer("asyncio.create_subprocess_exec"): 743 sys.audit(EVENT_SHELLOUS_EXEC, opts.pos_args[0]) 744 with pty_util.set_ignore_child_watcher( 745 BSD_DERIVED and opts.pty_fds is not None 746 ): 747 self._proc = await asyncio.create_subprocess_exec( 748 *opts.pos_args, 749 **opts.kwd_args, 750 ) 751 752 @log_method(LOG_DETAIL) 753 async def _waiter(self): 754 "Run task that waits for process to exit." 755 assert self._proc is not None # (pyright) 756 757 try: 758 if self._is_bsd_pty(): 759 await self._wait_pid() 760 else: 761 await self._proc.wait() 762 finally: 763 self._stop_timer() 764 765 def _set_cancelled(self): 766 "Set the cancelled flag, and cancel any inflight timers." 767 self._cancelled = True 768 self._stop_timer() 769 770 def _start_timer(self, timeout: Optional[float]): 771 "Start an optional timer to cancel the process if `timeout` desired." 772 assert self._timer is None 773 if timeout is not None: 774 loop = asyncio.get_running_loop() 775 task = asyncio.current_task() 776 assert task is not None 777 self._timer = loop.call_later( 778 timeout, 779 self._set_timer_expired, 780 task, 781 ) 782 783 def _set_timer_expired(self, main_task: asyncio.Task[Any]): 784 "Set a flag when the timer expires and cancel the main task." 785 self._timed_out = True 786 self._timer = None 787 main_task.cancel() 788 789 def _stop_timer(self): 790 if self._timer: 791 self._timer.cancel() 792 self._timer = None 793 794 def _setup_input_source( 795 self, 796 stream: asyncio.StreamWriter, 797 opts: _RunOptions, 798 ): 799 "Set up a task to read from custom input source." 800 tag = "stdin" 801 eof = opts.pty_fds.eof if opts.pty_fds else None 802 803 if opts.input_bytes is not None: 804 self.add_task(redir.write_stream(opts.input_bytes, stream, eof), tag) 805 return None 806 807 source = opts.command.options.input 808 809 if isinstance(source, asyncio.StreamReader): 810 self.add_task(redir.write_reader(source, stream, eof), tag) 811 return None 812 813 if isinstance(source, io.BytesIO): 814 self.add_task(redir.write_stream(source.getvalue(), stream, eof), tag) 815 return None 816 817 if isinstance(source, io.StringIO): 818 input_bytes = encode_bytes(source.getvalue(), opts.encoding) 819 self.add_task(redir.write_stream(input_bytes, stream, eof), tag) 820 return None 821 822 return stream 823 824 def _setup_output_sink( 825 self, 826 stream: asyncio.StreamReader, 827 sink: Any, 828 encoding: str, 829 tag: str, 830 limit: Optional[int] = None, 831 ) -> Optional[asyncio.StreamReader]: 832 "Set up a task to write to custom output sink." 833 if isinstance(sink, io.StringIO): 834 self.add_task(redir.copy_stringio(stream, sink, encoding), tag) 835 return None 836 837 if isinstance(sink, io.BytesIO): 838 self.add_task(redir.copy_bytesio(stream, sink), tag) 839 return None 840 841 if isinstance(sink, bytearray): 842 # N.B. `limit` is only supported for bytearray. 843 if limit is not None: 844 self.add_task(redir.copy_bytearray_limit(stream, sink, limit), tag) 845 else: 846 self.add_task(redir.copy_bytearray(stream, sink), tag) 847 return None 848 849 if isinstance(sink, Logger): 850 self.add_task(redir.copy_logger(stream, sink, encoding), tag) 851 return None 852 853 if isinstance(sink, asyncio.StreamWriter): 854 self.add_task(redir.copy_streamwriter(stream, sink), tag) 855 return None 856 857 return stream 858 859 @log_method(LOG_DETAIL) 860 async def __aexit__( 861 self, 862 _exc_type: Union[type[BaseException], None], 863 exc_value: Union[BaseException, None], 864 _exc_tb: Optional[TracebackType], 865 ): 866 "Wait for process to exit and handle cancellation." 867 suppress = False 868 try: 869 suppress = await self._finish(exc_value) 870 except asyncio.CancelledError: 871 LOGGER.debug("Runner cancelled inside _finish %r", self) 872 self._set_cancelled() 873 finally: 874 self._stop_timer() # failsafe just in case 875 self._audit_callback("stop") 876 # If `timeout` expired, raise TimeoutError rather than CancelledError. 877 if ( 878 self._cancelled 879 and self._timed_out 880 and not self.command.options._catch_cancelled_error 881 ): 882 raise asyncio.TimeoutError() 883 return suppress 884 885 @log_method(LOG_DETAIL) 886 async def _finish(self, exc_value: Union[BaseException, None]): 887 "Finish the run. Return True only if `exc_value` should be suppressed." 888 assert self._proc 889 890 try: 891 if exc_value is not None: 892 if _is_cancelled(exc_value): 893 self._set_cancelled() 894 await self._kill() 895 return self._cancelled 896 897 await self._wait() 898 return False 899 900 finally: 901 await self._close() 902 903 @log_method(LOG_DETAIL) 904 async def _close(self): 905 "Make sure that our resources are properly closed." 906 assert self._proc is not None 907 908 if self._options.pty_fds: 909 self._options.pty_fds.close() 910 911 # Make sure the transport is closed (for asyncio and uvloop). 912 self._proc._transport.close() # pyright: ignore 913 914 # _close can be called when unwinding exceptions. We need to handle 915 # the case that the process has not exited yet. 916 if self._proc.returncode is None: 917 LOGGER.critical("Runner._close process still running %r", self._proc) 918 return 919 920 try: 921 # Make sure that original stdin is properly closed. `wait_closed` 922 # will raise a BrokenPipeError if not all input was properly written. 923 if self._proc.stdin is not None: 924 self._proc.stdin.close() 925 await harvest( 926 self._proc.stdin.wait_closed(), 927 timeout=_CLOSE_TIMEOUT, 928 cancel_finish=True, # finish `wait_closed` if cancelled 929 trustee=self, 930 ) 931 932 except asyncio.TimeoutError: 933 LOGGER.critical("Runner._close %r timeout stdin=%r", self, self._proc.stdin) 934 935 def _audit_callback( 936 self, 937 phase: str, 938 *, 939 failure: str = "", 940 signal: Optional[int] = None, 941 ): 942 "Call `audit_callback` if there is one." 943 callback = self.command.options.audit_callback 944 if callback: 945 sig = _signame(signal) if phase == "signal" else "" 946 info: shellous.AuditEventInfo = { 947 "runner": self, 948 "failure": failure, 949 "signal": sig, 950 } 951 callback(phase, info) 952 953 def __repr__(self) -> str: 954 "Return string representation of Runner." 955 cancelled = " cancelled" if self._cancelled else "" 956 if self._proc: 957 procinfo = f" pid={self._proc.pid} exit_code={self.returncode}" 958 else: 959 procinfo = " pid=None" 960 return f"<Runner {self.name!r}{cancelled}{procinfo}>" 961 962 async def _readlines(self): 963 "Iterate over lines in stdout/stderr" 964 stream = self.stdout or self.stderr 965 if stream: 966 async for line in redir.read_lines(stream, self._options.encoding): 967 yield line 968 969 def __aiter__(self) -> AsyncIterator[str]: 970 "Return asynchronous iterator over stdout/stderr." 971 return self._readlines() 972 973 @staticmethod 974 async def run_command( 975 command: "shellous.Command[Any]", 976 *, 977 _run_future: Optional[asyncio.Future["Runner"]] = None, 978 ) -> Union[str, Result]: 979 "Run a command. This is the main entry point for Runner." 980 if not _run_future and _is_multiple_capture(command): 981 LOGGER.warning("run_command: multiple capture requires 'async with'") 982 _cleanup(command) 983 raise ValueError("multiple capture requires 'async with'") 984 985 async with Runner(command) as run: 986 if _run_future is not None: 987 # Return streams to caller in another task. 988 _run_future.set_result(run) 989 990 result = run.result() 991 if command.options._return_result: 992 return result 993 return result.output
Runner is an asynchronous context manager that runs a command.
async with Runner(cmd) as run:
# process streams: run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
417 @property 418 def name(self) -> str: 419 "Return name of process being run." 420 return self.command.name
Return name of process being run.
422 @property 423 def options(self) -> "shellous.Options": 424 "Return options for process being run." 425 return self.command.options
Return options for process being run.
427 @property 428 def command(self) -> "shellous.Command[Any]": 429 "Return the command being run." 430 return self._options.command
Return the command being run.
432 @property 433 def pid(self) -> Optional[int]: 434 "Return the command's process ID." 435 if not self._proc: 436 return None 437 return self._proc.pid
Return the command's process ID.
439 @property 440 def returncode(self) -> Optional[int]: 441 "Process's exit code." 442 if not self._proc: 443 if self._cancelled: 444 # The process was cancelled before starting. 445 return CANCELLED_EXIT_CODE 446 return None 447 code = self._proc.returncode 448 if code == _UNKNOWN_EXIT_CODE and self._last_signal is not None: 449 # After sending a signal, `waitpid` may fail to locate the child 450 # process. In this case, map the status to the last signal we sent. 451 # For more on this, see https://github.com/python/cpython/issues/87744 452 return -self._last_signal # pylint: disable=invalid-unary-operand-type 453 return code
Process's exit code.
455 @property 456 def cancelled(self) -> bool: 457 "Return True if the command was cancelled." 458 return self._cancelled
Return True if the command was cancelled.
460 @property 461 def pty_fd(self) -> Optional[int]: 462 """The file descriptor used to communicate with the child PTY process. 463 464 Returns None if the process is not using a PTY. 465 """ 466 pty_fds = self._options.pty_fds 467 if pty_fds is not None: 468 return pty_fds.parent_fd 469 return None
The file descriptor used to communicate with the child PTY process.
Returns None if the process is not using a PTY.
471 @property 472 def pty_eof(self) -> Optional[bytes]: 473 """Byte sequence used to indicate EOF when written to the PTY child. 474 475 Returns None if process is not using a PTY. 476 """ 477 pty_fds = self._options.pty_fds 478 if pty_fds is not None: 479 return pty_fds.eof 480 return None
Byte sequence used to indicate EOF when written to the PTY child.
Returns None if process is not using a PTY.
482 def result(self, *, check: bool = True) -> Result: 483 "Check process exit code and raise a ResultError if necessary." 484 code = self.returncode 485 if code is None: 486 raise TypeError("Runner.result(): Process has not exited") 487 488 result = Result( 489 exit_code=code, 490 output_bytes=bytes(self._options.output_bytes or b""), 491 error_bytes=bytes(self._options.error_bytes or b""), 492 cancelled=self._cancelled, 493 encoding=self._options.encoding, 494 ) 495 496 if not check: 497 return result 498 499 return check_result( 500 result, 501 self.command.options, 502 self._cancelled, 503 self._timed_out, 504 )
Check process exit code and raise a ResultError if necessary.
506 def add_task( 507 self, 508 coro: Coroutine[Any, Any, _T], 509 tag: str = "", 510 ) -> asyncio.Task[_T]: 511 "Add a background task." 512 task_name = f"{self.name}#{tag}" 513 task = asyncio.create_task(coro, name=task_name) 514 self._tasks.append(task) 515 return task
Add a background task.
517 def send_signal(self, sig: int) -> None: 518 "Send an arbitrary signal to the process if it is running." 519 if self.returncode is None: 520 self._signal(sig)
Send an arbitrary signal to the process if it is running.
522 def cancel(self) -> None: 523 "Cancel the running process if it is running." 524 if self.returncode is None: 525 self._signal(self.command.options.cancel_signal)
Cancel the running process if it is running.
630 @log_method(LOG_DETAIL) 631 async def __aenter__(self): 632 "Set up redirections and launch subprocess." 633 self._audit_callback("start") 634 try: 635 return await self._start() 636 except BaseException as ex: 637 self._stop_timer() # failsafe just in case 638 self._audit_callback("stop", failure=type(ex).__name__) 639 raise 640 finally: 641 if self._cancelled and self.command.options._catch_cancelled_error: 642 # Raises ResultError instead of CancelledError. 643 self.result()
Set up redirections and launch subprocess.
969 def __aiter__(self) -> AsyncIterator[str]: 970 "Return asynchronous iterator over stdout/stderr." 971 return self._readlines()
Return asynchronous iterator over stdout/stderr.
973 @staticmethod 974 async def run_command( 975 command: "shellous.Command[Any]", 976 *, 977 _run_future: Optional[asyncio.Future["Runner"]] = None, 978 ) -> Union[str, Result]: 979 "Run a command. This is the main entry point for Runner." 980 if not _run_future and _is_multiple_capture(command): 981 LOGGER.warning("run_command: multiple capture requires 'async with'") 982 _cleanup(command) 983 raise ValueError("multiple capture requires 'async with'") 984 985 async with Runner(command) as run: 986 if _run_future is not None: 987 # Return streams to caller in another task. 988 _run_future.set_result(run) 989 990 result = run.result() 991 if command.options._return_result: 992 return result 993 return result.output
Run a command. This is the main entry point for Runner.
996class PipeRunner: 997 """PipeRunner is an asynchronous context manager that runs a pipeline. 998 999 ``` 1000 async with pipe.run() as run: 1001 # process run.stdin, run.stdout, run.stderr (if not None) 1002 result = run.result() 1003 ``` 1004 """ 1005 1006 stdin: Optional[asyncio.StreamWriter] = None 1007 "Pipeline standard input." 1008 1009 stdout: Optional[asyncio.StreamReader] = None 1010 "Pipeline standard output." 1011 1012 stderr: Optional[asyncio.StreamReader] = None 1013 "Pipeline standard error." 1014 1015 _pipe: "shellous.Pipeline[Any]" 1016 _capturing: bool 1017 _tasks: list[asyncio.Task[Any]] 1018 _encoding: str 1019 _cancelled: bool = False 1020 _results: Optional[list[Union[BaseException, Result]]] = None 1021 _pid: int = -1 1022 1023 def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool): 1024 """`capturing=True` indicates we are within an `async with` block and 1025 client needs to access `stdin` and `stderr` streams. 1026 """ 1027 assert len(pipe.commands) > 1 1028 1029 self._pipe = pipe 1030 self._cancelled = False 1031 self._tasks = [] 1032 self._capturing = capturing 1033 self._encoding = pipe.options.encoding 1034 1035 @property 1036 def name(self) -> str: 1037 "Return name of the pipeline." 1038 return self._pipe.name 1039 1040 @property 1041 def options(self) -> "shellous.Options": 1042 """Return options for pipeline being run. 1043 1044 These are the options for the last command in the pipeline. 1045 """ 1046 return self._pipe.options 1047 1048 @property 1049 def pid(self) -> Optional[int]: 1050 """Return the process ID for the first command in the pipeline. 1051 1052 The PID is only available when `capturing=True`. 1053 """ 1054 if self._pid < 0: 1055 return None 1056 return self._pid 1057 1058 def result(self, *, check: bool = True) -> Result: 1059 "Return `Result` object for PipeRunner." 1060 assert self._results is not None 1061 1062 result = convert_result_list(self._results, self._cancelled) 1063 if not check: 1064 return result 1065 1066 return check_result(result, self._pipe.options, self._cancelled) 1067 1068 def add_task( 1069 self, 1070 coro: Coroutine[Any, Any, _T], 1071 tag: str = "", 1072 ) -> asyncio.Task[_T]: 1073 "Add a background task." 1074 task_name = f"{self.name}#{tag}" 1075 task = asyncio.create_task(coro, name=task_name) 1076 self._tasks.append(task) 1077 return task 1078 1079 @log_method(LOG_DETAIL) 1080 async def _wait(self, *, kill: bool = False): 1081 "Wait for pipeline to finish." 1082 assert self._results is None 1083 1084 if kill: 1085 LOGGER.debug("PipeRunner.wait killing pipe %r", self) 1086 for task in self._tasks: 1087 task.cancel() 1088 1089 cancelled, self._results = await harvest_results(*self._tasks, trustee=self) 1090 if cancelled: 1091 self._cancelled = True 1092 self._tasks.clear() # clear all tasks when done 1093 1094 @log_method(LOG_DETAIL) 1095 async def __aenter__(self) -> "PipeRunner": 1096 "Set up redirections and launch pipeline." 1097 try: 1098 return await self._start() 1099 except (Exception, asyncio.CancelledError) as ex: 1100 LOGGER.warning("PipeRunner enter %r ex=%r", self, ex) 1101 if _is_cancelled(ex): 1102 self._cancelled = True 1103 await self._wait(kill=True) 1104 raise 1105 1106 @log_method(LOG_DETAIL) 1107 async def __aexit__( 1108 self, 1109 _exc_type: Union[type[BaseException], None], 1110 exc_value: Union[BaseException, None], 1111 _exc_tb: Optional[TracebackType], 1112 ): 1113 "Wait for pipeline to exit and handle cancellation." 1114 suppress = False 1115 try: 1116 suppress = await self._finish(exc_value) 1117 except asyncio.CancelledError: 1118 LOGGER.warning("PipeRunner cancelled inside _finish %r", self) 1119 self._cancelled = True 1120 return suppress 1121 1122 @log_method(LOG_DETAIL) 1123 async def _finish(self, exc_value: Optional[BaseException]) -> bool: 1124 "Wait for pipeline to exit and handle cancellation." 1125 if exc_value is not None: 1126 LOGGER.warning("PipeRunner._finish exc_value=%r", exc_value) 1127 if _is_cancelled(exc_value): 1128 self._cancelled = True 1129 await self._wait(kill=True) 1130 return self._cancelled 1131 1132 await self._wait() 1133 return False 1134 1135 @log_method(LOG_DETAIL) 1136 async def _start(self): 1137 "Set up redirection and launch pipeline." 1138 open_fds: list[int] = [] 1139 1140 try: 1141 stdin = None 1142 stdout = None 1143 stderr = None 1144 1145 cmds = self._setup_pipeline(open_fds) 1146 1147 if self._capturing: 1148 stdin, stdout, stderr = await self._setup_capturing(cmds) 1149 else: 1150 for cmd in cmds: 1151 self.add_task(cmd.coro()) 1152 1153 self.stdin = stdin 1154 self.stdout = stdout 1155 self.stderr = stderr 1156 1157 return self 1158 1159 except BaseException: # pylint: disable=broad-except 1160 # Clean up after any exception *including* CancelledError. 1161 close_fds(open_fds) 1162 raise 1163 1164 def _setup_pipeline(self, open_fds: list[int]): 1165 """Return the pipeline stitched together with pipe fd's. 1166 1167 Each created open file descriptor is added to `open_fds` so it can 1168 be closed if there's an exception later. 1169 """ 1170 cmds = list(self._pipe.commands) 1171 1172 cmd_count = len(cmds) 1173 for i in range(cmd_count - 1): 1174 read_fd, write_fd = os.pipe() 1175 open_fds.extend((read_fd, write_fd)) 1176 1177 cmds[i] = cmds[i].stdout(write_fd, close=True) 1178 cmds[i + 1] = cmds[i + 1].stdin(read_fd, close=True) 1179 1180 for i in range(cmd_count): 1181 cmds[i] = cmds[i].set(_return_result=True, _catch_cancelled_error=True) 1182 1183 return cmds 1184 1185 @log_method(LOG_DETAIL) 1186 async def _setup_capturing(self, cmds: "list[shellous.Command[Any]]"): 1187 """Set up capturing and return (stdin, stdout, stderr) streams.""" 1188 loop = asyncio.get_event_loop() 1189 first_fut = loop.create_future() 1190 last_fut = loop.create_future() 1191 1192 first_coro = cmds[0].coro(_run_future=first_fut) 1193 last_coro = cmds[-1].coro(_run_future=last_fut) 1194 middle_coros = [cmd.coro() for cmd in cmds[1:-1]] 1195 1196 # Tag each task name with the index of the command in the pipe. 1197 self.add_task(first_coro, "0") 1198 for i, coro in enumerate(middle_coros): 1199 self.add_task(coro, str(i + 1)) 1200 self.add_task(last_coro, str(len(cmds) - 1)) 1201 1202 # When capturing, we need the first and last commands in the 1203 # pipe to signal when they are ready. 1204 first_ready, last_ready = await asyncio.gather(first_fut, last_fut) 1205 1206 stdin, stdout, stderr = ( 1207 first_ready.stdin, 1208 last_ready.stdout, 1209 last_ready.stderr, 1210 ) 1211 self._pid = first_ready.pid 1212 1213 return (stdin, stdout, stderr) 1214 1215 def __repr__(self) -> str: 1216 "Return string representation of PipeRunner." 1217 cancelled_info = "" 1218 if self._cancelled: 1219 cancelled_info = " cancelled" 1220 result_info = "" 1221 if self._results: 1222 result_info = f" results={self._results!r}" 1223 return f"<PipeRunner {self.name!r}{cancelled_info}{result_info}>" 1224 1225 async def _readlines(self) -> AsyncIterator[str]: 1226 "Iterate over lines in stdout/stderr" 1227 stream = self.stdout or self.stderr 1228 if stream: 1229 async for line in redir.read_lines(stream, self._encoding): 1230 yield line 1231 1232 def __aiter__(self) -> AsyncIterator[str]: 1233 "Return asynchronous iterator over stdout/stderr." 1234 return self._readlines() 1235 1236 @staticmethod 1237 async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]: 1238 "Run a pipeline. This is the main entry point for PipeRunner." 1239 run = PipeRunner(pipe, capturing=False) 1240 async with run: 1241 pass 1242 1243 result = run.result() 1244 if pipe.options._return_result: 1245 return result 1246 return result.output
PipeRunner is an asynchronous context manager that runs a pipeline.
async with pipe.run() as run:
# process run.stdin, run.stdout, run.stderr (if not None)
result = run.result()
1023 def __init__(self, pipe: "shellous.Pipeline[Any]", *, capturing: bool): 1024 """`capturing=True` indicates we are within an `async with` block and 1025 client needs to access `stdin` and `stderr` streams. 1026 """ 1027 assert len(pipe.commands) > 1 1028 1029 self._pipe = pipe 1030 self._cancelled = False 1031 self._tasks = [] 1032 self._capturing = capturing 1033 self._encoding = pipe.options.encoding
1035 @property 1036 def name(self) -> str: 1037 "Return name of the pipeline." 1038 return self._pipe.name
Return name of the pipeline.
1040 @property 1041 def options(self) -> "shellous.Options": 1042 """Return options for pipeline being run. 1043 1044 These are the options for the last command in the pipeline. 1045 """ 1046 return self._pipe.options
Return options for pipeline being run.
These are the options for the last command in the pipeline.
1048 @property 1049 def pid(self) -> Optional[int]: 1050 """Return the process ID for the first command in the pipeline. 1051 1052 The PID is only available when `capturing=True`. 1053 """ 1054 if self._pid < 0: 1055 return None 1056 return self._pid
Return the process ID for the first command in the pipeline.
The PID is only available when capturing=True.
1058 def result(self, *, check: bool = True) -> Result: 1059 "Return `Result` object for PipeRunner." 1060 assert self._results is not None 1061 1062 result = convert_result_list(self._results, self._cancelled) 1063 if not check: 1064 return result 1065 1066 return check_result(result, self._pipe.options, self._cancelled)
Return Result object for PipeRunner.
1068 def add_task( 1069 self, 1070 coro: Coroutine[Any, Any, _T], 1071 tag: str = "", 1072 ) -> asyncio.Task[_T]: 1073 "Add a background task." 1074 task_name = f"{self.name}#{tag}" 1075 task = asyncio.create_task(coro, name=task_name) 1076 self._tasks.append(task) 1077 return task
Add a background task.
1094 @log_method(LOG_DETAIL) 1095 async def __aenter__(self) -> "PipeRunner": 1096 "Set up redirections and launch pipeline." 1097 try: 1098 return await self._start() 1099 except (Exception, asyncio.CancelledError) as ex: 1100 LOGGER.warning("PipeRunner enter %r ex=%r", self, ex) 1101 if _is_cancelled(ex): 1102 self._cancelled = True 1103 await self._wait(kill=True) 1104 raise
Set up redirections and launch pipeline.
1232 def __aiter__(self) -> AsyncIterator[str]: 1233 "Return asynchronous iterator over stdout/stderr." 1234 return self._readlines()
Return asynchronous iterator over stdout/stderr.
1236 @staticmethod 1237 async def run_pipeline(pipe: "shellous.Pipeline[Any]") -> Union[str, Result]: 1238 "Run a pipeline. This is the main entry point for PipeRunner." 1239 run = PipeRunner(pipe, capturing=False) 1240 async with run: 1241 pass 1242 1243 result = run.result() 1244 if pipe.options._return_result: 1245 return result 1246 return result.output
Run a pipeline. This is the main entry point for PipeRunner.
75class AuditEventInfo(TypedDict): 76 """Info attached to each audit callback event. 77 78 See `audit_callback` in `Command.set` for more information. 79 """ 80 81 runner: Runner 82 "Reference to the Runner object." 83 84 failure: str 85 "When phase is 'stop', the name of the exception from starting the process." 86 87 signal: str 88 "When phase is 'signal', the signal name/number sent to the process."
Info attached to each audit callback event.
See audit_callback in Command.set for more information.