finsy

Finsy P4Runtime Controller Library

pypi documentation ci codecov codespace

Finsy is a P4Runtime controller library written in Python using asyncio. Finsy includes support for gNMI.

Check out the examples directory for some demonstration programs.

Installation

Finsy requires Python 3.10 or later. To install the latest version, type pip install finsy.

P4Runtime Scripts

With Finsy, you can write a Python script that reads/writes P4Runtime entities for a single switch.

Here is a complete example that retrieves the P4Info from a switch:

import finsy as fy

async def main():
    async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
        # Print out a description of the switch's P4Info, if one is configured.
        print(sw1.p4info)

fy.run(main())

Here is another example that prints out all non-default table entries.

import finsy as fy

async def main():
    async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
        # Do a wildcard read for table entries.
        async for entry in sw1.read(fy.P4TableEntry()):
            print(entry)

fy.run(main())

P4Runtime Controller

You can also write a P4Runtime controller that manages multiple switches independently. Your controller can react to events from the Switch by changing the contents of P4 tables.

Each switch is managed by an async ready_handler function. Your ready_handler function can read or update various P4Runtime entities in the switch. It can also create tasks to listen for packets or digests.

When you write P4Runtime updates to the switch, you use a unary operator (+, -, \~) to specify the operation: INSERT (+), DELETE (-) or MODIFY (\~).

import finsy as fy

async def ready_handler(sw: fy.Switch):
    await sw.delete_all()
    await sw.write(
        [
            # Insert (+) multicast group with ports 1, 2, 3 and CONTROLLER.
            +fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3, 255]),
            # Modify (~) default table entry to flood all unmatched packets.
            ~fy.P4TableEntry(
                "ipv4",
                action=fy.Action("flood"),
                is_default_action=True,
            ),
        ]
    )

    async for packet in sw.read_packets():
        print(f"{sw.name}: {packet}")

Use the SwitchOptions class to specify each switch's settings, including the p4info/p4blob and ready_handler. Use the Controller class to drive multiple switch connections. Each switch will call back into your ready_handler function after the P4Runtime connection is established.

from pathlib import Path

options = fy.SwitchOptions(
    p4info=Path("hello.p4info.txt"),
    p4blob=Path("hello.json"),
    ready_handler=ready_handler,
)

controller = fy.Controller([
    fy.Switch("sw1", "127.0.0.1:50001", options),
    fy.Switch("sw2", "127.0.0.1:50002", options),
    fy.Switch("sw3", "127.0.0.1:50003", options),
])

fy.run(controller.run())

Your ready_handler can spawn concurrent tasks with the Switch.create_task method. Tasks created this way will have their lifetimes managed by the switch object.

If the switch disconnects or its role changes to backup, the task running your ready_handler (and any tasks it spawned) will be cancelled and the ready_handler will begin again.

For more examples, see the examples directory.

Switch Read/Write API

The Switch class provides the API for interacting with P4Runtime switches. You will control a Switch object with a "ready handler" function. The ready handler is an async function that is called when the switch is ready to accept commands.

Your ready handler will typically write some control entities to the switch, then listen for incoming events and react to them with more writes. You may occasionally read entities from the switch.

When your ready handler is invoked, there is already a P4Runtime channel established, with client arbitration completed, and pipeline configured as specified in SwitchOptions.

Here is an example skeleton program. The ready handler is named ready().

async def ready(switch: fy.Switch):
    # Check if switch is the primary. If not, we may want to proceed
    # in read-only mode. In this example, ignore switch if it's a backup.
    if not switch.is_primary:
        return

    # If we're reconnecting to a switch, it will already have runtime state.
    # In this example, we just delete all entities and start over.
    await switch.delete_all()

    # Provision the pipeline with one or more `write` transactions. Each
    # `write` is a single WriteRequest which may contain multiple updates.
    await switch.write(
        # [Next section will cover what goes here.]
    )

    # Listen for events and respond to them. This "infinite" loop will
    # continue until the Switch disconnects, changes primary/backup status,
    # or the controller is stopped.
    async for packet in switch.read_packets():
        await handle_packet(switch, packet)

The Switch class provides a switch.create_task method to start a managed task. Tasks allow you to perform concurrent operations on the same switch. We could have written the last stanza above that reads packets in an infinite loop as a separate task. It's okay for the ready handler function to return early; any tasks it created will still run.

Writes

Use the write() method to write one or more P4Runtime updates and packets.

A P4Runtime update supports one of three operations: INSERT, MODIFY or DELETE. Some entities support all three operations. Other entities only support MODIFY.

Entity Operations Permitted Related Classes
P4TableEntry INSERT, MODIFY, DELETE Match, Action, IndirectAction, P4MeterConfig, P4CounterData, P4MeterCounterData
P4ActionProfileMember INSERT, MODIFY, DELETE
P4ActionProfileGroup INSERT, MODIFY, DELETE P4Member
P4MulticastGroupEntry INSERT, MODIFY, DELETE
P4CloneSessionEntry INSERT, MODIFY, DELETE
P4DigestEntry INSERT, MODIFY, DELETE
P4ExternEntry INSERT, MODIFY, DELETE
P4RegisterEntry MODIFY
P4CounterEntry MODIFY P4CounterData
P4DirectCounterEntry MODIFY P4CounterData
P4MeterEntry MODIFY P4MeterConfig, P4MeterCounterData
P4DirectMeterEntry MODIFY
P4ValueSetEntry MODIFY P4ValueSetMember

Insert/Modify/Delete Updates

To specify the operation, use a unary + (INSERT), ~ (MODIFY), or - (DELETE). If you do not specify the operation, write will raise a ValueError exception.

Here is an example showing how to insert and delete two different entities in the same WriteRequest.

await switch.write([
    +fy.P4TableEntry(          # unary + means INSERT
        "ipv4", 
        match=fy.Match(dest="192.168.1.0/24"),
        action=fy.Action("forward", port=1),
    ),
    -fy.P4TableEntry(          # unary - means DELETE
        "ipv4", 
        match=fy.Match(dest="192.168.2.0/24"),
        action=fy.Action("forward", port=2),
    ),
])

You should not insert, modify or delete the same entry in the same WriteRequest.

If you are performing the same operation on all entities, you can use the Switch insert, delete, or modify methods.

await switch.insert([
    fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3]),
    fy.P4MulticastGroupEntry(2, replicas=[4, 5, 6]),
])

Modify-Only Updates

For entities that only support the modify operation, you do not need to specify the operation. (You can optionally use ~.)

await switch.write([
    fy.P4RegisterEntry("reg1", index=0, data=0),
    fy.P4RegisterEntry("reg1", index=1, data=1),
    fy.P4RegisterEntry("reg1", index=2, data=2),
])

You can also use the modify method:

await switch.modify([
    fy.P4RegisterEntry("reg1", index=0, data=0),
    fy.P4RegisterEntry("reg1", index=1, data=1),
    fy.P4RegisterEntry("reg1", index=2, data=2),
])

If you pass a modify-only entity to the insert or delete methods, the P4Runtime server will return an error.

Sending Packets

Use the write method to send a packet.

await switch.write([fy.P4PacketOut(b"a payload.....", port=3)])

You can include other entities in the same call. Any non-update objects (e.g. P4PacketOut, P4DigestListAck) will be sent before the WriteRequest.

Listening for Packets

To receive packets, use the async iterator Switch.read_packets(). In this example, pkt is a P4PacketIn object.

read_packets can filter for a specific eth_type.

# Read packets filtering only for ARP (eth_type == 0x0806).
async for pkt in switch.read_packets(eth_types={0x0806}):
    # You can access the packet payload `pkt.payload` or any metadata value,
    # e.g. `pkt['ingress_port']`
    print(pkt.payload)
    print(pkt['ingress_port'])

Listening for Digests

To receive digests, use the async iterator Switch.read_digests. You must specify the name of the digest from your P4 program.

async for digest in switch.read_digests("digest_t"):
    # You can access the digest metadata e.g. `digest['ingress_port']`
    # Your code may need to update table entries based on the digest data.
    # To ack the digest, write `digest.ack()`.
    await switch.write([entry, ...])
    await switch.write([digest.ack()])

To acknowledge the digest entry, you can write digest.ack().

Listening for Idle Timeouts

To receive idle timeout notifications, use the async iterator Switch.read_idle_timeouts. You will receive a P4IdleTimeoutNotification which contains multiple table entries -- one for each entry that timed out.

async for timeout in switch.read_idle_timeouts():
    for entry in timeout.table_entry:
        print(timeout.timestamp, entry)

Other Events

A P4 switch may report other events using the EventEmitter API. See the SwitchEvent class for the event types. Each switch has a switch.ee attribute that lets your code register for event callbacks.

Development and Testing

Perform these steps to set up your local environment for Finsy development, or try the codespace. Finsy requires Python 3.10 or later. If poetry is not installed, follow these directions to install it.

Clone and Prepare a Virtual Environment

The poetry install command installs all development dependencies into the virtual environment (venv).

$ git clone https://github.com/byllyfish/finsy.git
$ cd finsy
$ python3 -m venv .venv
$ poetry install

Run Unit Tests

When you run pytest from the top level of the repository, you will run the unit tests.

$ poetry run pytest

Run Integration Tests

When you run pytest from within the examples directory, you will run the integration tests instead of the unit tests. The integration tests run the example programs against a Mininet network. Docker or podman are required.

$ cd examples
$ poetry run pytest

API Documentation

  1"""
  2.. include:: ../README.md
  3
  4# API Documentation
  5"""
  6
  7# Copyright (c) 2022-2023 Bill Fisher
  8#
  9# Licensed under the Apache License, Version 2.0 (the "License");
 10# you may not use this file except in compliance with the License.
 11# You may obtain a copy of the License at
 12#
 13#     http://www.apache.org/licenses/LICENSE-2.0
 14#
 15# Unless required by applicable law or agreed to in writing, software
 16# distributed under the License is distributed on an "AS IS" BASIS,
 17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 18# See the License for the specific language governing permissions and
 19# limitations under the License.
 20
 21__version__ = "0.25.0"
 22
 23import sys
 24
 25if sys.version_info < (3, 10):  # pragma: no cover
 26    raise RuntimeError("Requires Python 3.10 or later.")
 27
 28from .controller import Controller
 29from .gnmiclient import GNMIClient, GNMISubscription, GNMIUpdate
 30from .gnmipath import GNMIPath
 31from .grpcutil import GRPCCredentialsTLS, GRPCStatusCode
 32from .log import LoggerAdapter
 33from .macaddr import MACAddress
 34from .p4client import P4Client, P4ClientError, P4Error
 35from .p4entity import (
 36    P4ActionProfileGroup,
 37    P4ActionProfileMember,
 38    P4CloneSessionEntry,
 39    P4CounterData,
 40    P4CounterEntry,
 41    P4DigestEntry,
 42    P4DigestList,
 43    P4DigestListAck,
 44    P4DirectCounterEntry,
 45    P4DirectMeterEntry,
 46    P4ExternEntry,
 47    P4IndirectAction,
 48    P4Member,
 49    P4MeterConfig,
 50    P4MeterCounterData,
 51    P4MeterEntry,
 52    P4MulticastGroupEntry,
 53    P4PacketIn,
 54    P4PacketOut,
 55    P4RegisterEntry,
 56    P4TableAction,
 57    P4TableEntry,
 58    P4TableMatch,
 59    P4ValueSetEntry,
 60)
 61from .p4schema import P4ConfigAction, P4CounterUnit, P4Schema
 62from .ports import SwitchPort, SwitchPortList
 63from .runner import run
 64from .switch import Switch, SwitchEvent, SwitchOptions
 65
 66Match = P4TableMatch
 67"`Match` is an alias for P4TableMatch."
 68
 69Action = P4TableAction
 70"`Action` is an alias for P4TableAction."
 71
 72IndirectAction = P4IndirectAction
 73"`IndirectAction` is an alias for P4IndirectAction."
 74
 75__all__ = [
 76    "run",
 77    "Controller",
 78    "LoggerAdapter",
 79    "MACAddress",
 80    "P4ActionProfileGroup",
 81    "P4ActionProfileMember",
 82    "P4Client",
 83    "P4ClientError",
 84    "P4CloneSessionEntry",
 85    "P4CounterData",
 86    "P4CounterEntry",
 87    "P4CounterUnit",
 88    "P4DigestEntry",
 89    "P4DigestList",
 90    "P4DigestListAck",
 91    "P4DirectCounterEntry",
 92    "P4DirectMeterEntry",
 93    "P4Error",
 94    "P4ExternEntry",
 95    "IndirectAction",  # alias for P4IndirectAction
 96    "P4IndirectAction",
 97    "P4Member",
 98    "P4MeterConfig",
 99    "P4MeterCounterData",
100    "P4MeterEntry",
101    "P4MulticastGroupEntry",
102    "P4PacketIn",
103    "P4PacketOut",
104    "P4RegisterEntry",
105    "Action",  # alias for P4TableAction
106    "P4TableAction",
107    "P4TableEntry",
108    "Match",  # alias for P4TableMatch
109    "P4TableMatch",
110    "P4ValueSetEntry",
111    "P4ConfigAction",
112    "P4Schema",
113    "Switch",
114    "SwitchEvent",
115    "SwitchOptions",
116    "SwitchPort",
117    "SwitchPortList",
118    "GNMIClient",
119    "GNMIPath",
120    "GNMISubscription",
121    "GNMIUpdate",
122    "GRPCCredentialsTLS",
123    "GRPCStatusCode",
124]
def run(coro: Coroutine[Any, Any, NoneType]) -> None:
53def run(coro: Coroutine[Any, Any, None]) -> None:
54    """`finsy.run` provides a useful wrapper around `asyncio.run`.
55
56    This function implements common boilerplate for running a Finsy application.
57
58    - Set up basic logging to stderr at the INFO log level.
59    - Set up a signal handler for SIGTERM that shuts down gracefully.
60    - Set up caching of P4Info data so common definitions are re-used.
61
62    Example:
63
64    ```python
65    import finsy as fy
66
67    async def main():
68        async with fy.Switch("sw", "127.0.0.1:50001") as sw:
69            print(sw.p4info)
70
71    if __name__ == "__main__":
72        fy.run(main())
73    ```
74
75    If you choose to use `asyncio.run` instead, your P4Schema/P4Info objects
76    will not be eligible for sharing. You can create your own `P4SchemaCache`
77    context manager to implement this.
78    """
79    try:
80        asyncio.run(_finsy_boilerplate(coro))
81    except (KeyboardInterrupt, asyncio.CancelledError):
82        pass

finsy.run provides a useful wrapper around asyncio.run.

This function implements common boilerplate for running a Finsy application.

  • Set up basic logging to stderr at the INFO log level.
  • Set up a signal handler for SIGTERM that shuts down gracefully.
  • Set up caching of P4Info data so common definitions are re-used.

Example:

import finsy as fy

async def main():
    async with fy.Switch("sw", "127.0.0.1:50001") as sw:
        print(sw.p4info)

if __name__ == "__main__":
    fy.run(main())

If you choose to use asyncio.run instead, your P4Schema/P4Info objects will not be eligible for sharing. You can create your own P4SchemaCache context manager to implement this.

@final
class Controller:
 30@final
 31class Controller:
 32    """Represents a collection of P4Runtime switches.
 33
 34    Each `Switch` in the Controller is identified by its name. Each name must
 35    be unique.
 36
 37    ```
 38    switches = [
 39        fy.Switch("sw1", "10.0.0.1:50000"),
 40        fy.Switch("sw2", "10.0.0.2:50000"),
 41    ]
 42    controller = fy.Controller(switches)
 43    await controller.run()
 44    ```
 45
 46    A Controller can be running or stopped. There are two ways to run a
 47    Controller. You can use the `Controller.run()` method, or you can use
 48    a Controller as a context manager.
 49
 50    ```
 51    controller = fy.Controller(switches)
 52    async with controller:
 53        # Let controller run for 60 seconds.
 54        await asyncio.sleep(60)
 55    ```
 56
 57    You can `add` or `remove` switches regardless of whether a Controller is
 58    running or not. If the Controller is not running, adding or removing a Switch is
 59    instantaneous. If the Controller is running, adding a Switch will start
 60    it running asynchronously. Removing a Switch will schedule the Switch to stop,
 61    but defer actual removal until the Switch has stopped asynchronously.
 62
 63    When a switch starts inside a Controller, it fires the `CONTROLLER_ENTER`
 64    event. When a switch stops inside a Controller, it fires the `CONTROLLER_LEAVE`
 65    event.
 66
 67    A Controller supports these methods to access its contents:
 68
 69    - len(controller): Return number of switches in the controller.
 70    - controller[name]: Return the switch with the given name.
 71    - controller.get(name): Return the switch with the given name, or None if not found.
 72
 73    You can iterate over the switches in a Controller using a for loop:
 74
 75    ```
 76    for switch in controller:
 77        print(switch.name)
 78    ```
 79
 80    Any task or sub-task running inside a controller can retrieve its
 81    Controller object using the `Controller.current()` method.
 82    """
 83
 84    _switches: dict[str, Switch]
 85    _pending_removal: set[Switch]
 86    _task_count: CountdownFuture
 87
 88    _control_task: asyncio.Task[Any] | None = None
 89    "Keep track of controller's main task."
 90
 91    def __init__(self, switches: Iterable[Switch] = ()):
 92        self._switches = {}
 93        self._pending_removal = set()
 94        self._task_count = CountdownFuture()
 95
 96        for switch in switches:
 97            if switch.name in self._switches:
 98                raise ValueError(f"Switch named {switch.name!r} already exists")
 99            self._switches[switch.name] = switch
100
101    @property
102    def running(self) -> bool:
103        "True if Controller is running."
104        return self._control_task is not None
105
106    async def run(self) -> None:
107        "Run the controller."
108        async with self:
109            await wait_for_cancel()
110
111    def stop(self) -> None:
112        "Stop the controller if it is running."
113        if self._control_task is not None:
114            self._control_task.cancel()
115
116    async def __aenter__(self) -> Self:
117        "Run the controller as a context manager (see also run())."
118        assert not self.running, "Controller.__aenter__ is not re-entrant"
119        assert self._task_count.value() == 0
120        assert not self._pending_removal
121
122        self._control_task = asyncio.current_task()
123        _CONTROLLER.set(self)
124
125        try:
126            # Start each switch running.
127            for switch in self:
128                self._start_switch(switch)
129        except Exception:
130            self._control_task = None
131            _CONTROLLER.set(None)
132            raise
133
134        return self
135
136    async def __aexit__(
137        self,
138        _exc_type: type[BaseException] | None,
139        _exc_val: BaseException | None,
140        _exc_tb: TracebackType | None,
141    ) -> bool | None:
142        "Run the controller as a context manager (see also run())."
143        assert self.running
144
145        try:
146            # Stop all the switches.
147            for switch in self:
148                self._stop_switch(switch)
149
150            # Wait for switch tasks to finish.
151            await self._task_count.wait()
152
153        finally:
154            self._control_task = None
155            _CONTROLLER.set(None)
156
157    def add(self, switch: Switch) -> None:
158        """Add a switch to the controller.
159
160        If the controller is running, tell the switch to start.
161        """
162        if switch.name in self._switches:
163            raise ValueError(f"Switch named {switch.name!r} already exists")
164
165        self._switches[switch.name] = switch
166        if self.running:
167            self._start_switch(switch)
168
169    def remove(self, switch: Switch) -> asyncio.Event:
170        """Remove a switch from the controller.
171
172        If the controller is running, tell the switch to stop and schedule it
173        for removal when it fully stops.
174        """
175        name = switch.name
176        if self._switches.get(name, None) is not switch:
177            raise ValueError(f"Switch named {name!r} not found")
178
179        del self._switches[name]
180
181        event = asyncio.Event()
182        if self.running:
183            # When controller is running, event will complete when switch
184            # is actually stopped.
185            self._stop_switch(switch)
186            self._pending_removal.add(switch)
187
188            def _controller_leave(sw: Switch):
189                self._pending_removal.discard(sw)
190                event.set()
191
192            switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave)  # type: ignore
193        else:
194            # When controller is not running, event completes immediately.
195            event.set()
196
197        return event
198
199    def _start_switch(self, switch: Switch):
200        "Start the switch's control task."
201        LOGGER.debug("Controller._start_switch: %r", switch)
202        assert switch._control_task is None  # pyright: ignore[reportPrivateUsage]
203
204        switch.ee.emit(SwitchEvent.CONTROLLER_ENTER, switch)
205
206        task = asyncio.create_task(switch.run(), name=f"fy:{switch.name}")
207        switch._control_task = task  # pyright: ignore[reportPrivateUsage]
208        self._task_count.increment()
209
210        def _switch_done(done: asyncio.Task[Any]):
211            switch._control_task = None  # pyright: ignore[reportPrivateUsage]
212            switch.ee.emit(SwitchEvent.CONTROLLER_LEAVE, switch)
213            self._task_count.decrement()
214
215            if not done.cancelled():
216                ex = done.exception()
217                if ex is not None:
218                    if not isinstance(ex, SwitchFailFastError):
219                        # The `fail_fast` error has already been logged. If
220                        # it's any other error, log it. (There shouldn't be
221                        # any other error.)
222                        LOGGER.critical(
223                            "Controller task %r failed",
224                            done.get_name(),
225                            exc_info=ex,
226                        )
227                    # Shutdown the program cleanly due to switch failure.
228                    raise SystemExit(99)
229
230        task.add_done_callback(_switch_done)
231
232    def _stop_switch(self, switch: Switch):
233        "Stop the switch's control task."
234        LOGGER.debug("Controller._stop_switch: %r", switch)
235
236        if switch._control_task is not None:  # pyright: ignore[reportPrivateUsage]
237            switch._control_task.cancel()  # pyright: ignore[reportPrivateUsage]
238
239    def __len__(self) -> int:
240        "Return switch count."
241        return len(self._switches)
242
243    def __iter__(self) -> Iterator[Switch]:
244        "Iterate over switches."
245        return iter(self._switches.values())
246
247    def __getitem__(self, name: str) -> Switch:
248        "Retrieve switch by name."
249        return self._switches[name]
250
251    def get(self, name: str) -> Switch | None:
252        "Retrieve switch by name, or return None if not found."
253        return self._switches.get(name)
254
255    @staticmethod
256    def current() -> "Controller":
257        "Return the current Controller object."
258        result = _CONTROLLER.get()
259        if result is None:
260            raise RuntimeError("controller does not exist")
261        return result

Represents a collection of P4Runtime switches.

Each Switch in the Controller is identified by its name. Each name must be unique.

switches = [
    fy.Switch("sw1", "10.0.0.1:50000"),
    fy.Switch("sw2", "10.0.0.2:50000"),
]
controller = fy.Controller(switches)
await controller.run()

A Controller can be running or stopped. There are two ways to run a Controller. You can use the Controller.run() method, or you can use a Controller as a context manager.

controller = fy.Controller(switches)
async with controller:
    # Let controller run for 60 seconds.
    await asyncio.sleep(60)

You can add or remove switches regardless of whether a Controller is running or not. If the Controller is not running, adding or removing a Switch is instantaneous. If the Controller is running, adding a Switch will start it running asynchronously. Removing a Switch will schedule the Switch to stop, but defer actual removal until the Switch has stopped asynchronously.

When a switch starts inside a Controller, it fires the CONTROLLER_ENTER event. When a switch stops inside a Controller, it fires the CONTROLLER_LEAVE event.

A Controller supports these methods to access its contents:

  • len(controller): Return number of switches in the controller.
  • controller[name]: Return the switch with the given name.
  • controller.get(name): Return the switch with the given name, or None if not found.

You can iterate over the switches in a Controller using a for loop:

for switch in controller:
    print(switch.name)

Any task or sub-task running inside a controller can retrieve its Controller object using the Controller.current() method.

Controller(switches: Iterable[Switch] = ())
91    def __init__(self, switches: Iterable[Switch] = ()):
92        self._switches = {}
93        self._pending_removal = set()
94        self._task_count = CountdownFuture()
95
96        for switch in switches:
97            if switch.name in self._switches:
98                raise ValueError(f"Switch named {switch.name!r} already exists")
99            self._switches[switch.name] = switch
running: bool
101    @property
102    def running(self) -> bool:
103        "True if Controller is running."
104        return self._control_task is not None

True if Controller is running.

async def run(self) -> None:
106    async def run(self) -> None:
107        "Run the controller."
108        async with self:
109            await wait_for_cancel()

Run the controller.

def stop(self) -> None:
111    def stop(self) -> None:
112        "Stop the controller if it is running."
113        if self._control_task is not None:
114            self._control_task.cancel()

Stop the controller if it is running.

async def __aenter__(self) -> typing_extensions.Self:
116    async def __aenter__(self) -> Self:
117        "Run the controller as a context manager (see also run())."
118        assert not self.running, "Controller.__aenter__ is not re-entrant"
119        assert self._task_count.value() == 0
120        assert not self._pending_removal
121
122        self._control_task = asyncio.current_task()
123        _CONTROLLER.set(self)
124
125        try:
126            # Start each switch running.
127            for switch in self:
128                self._start_switch(switch)
129        except Exception:
130            self._control_task = None
131            _CONTROLLER.set(None)
132            raise
133
134        return self

Run the controller as a context manager (see also run()).

def add(self, switch: Switch) -> None:
157    def add(self, switch: Switch) -> None:
158        """Add a switch to the controller.
159
160        If the controller is running, tell the switch to start.
161        """
162        if switch.name in self._switches:
163            raise ValueError(f"Switch named {switch.name!r} already exists")
164
165        self._switches[switch.name] = switch
166        if self.running:
167            self._start_switch(switch)

Add a switch to the controller.

If the controller is running, tell the switch to start.

def remove(self, switch: Switch) -> asyncio.locks.Event:
169    def remove(self, switch: Switch) -> asyncio.Event:
170        """Remove a switch from the controller.
171
172        If the controller is running, tell the switch to stop and schedule it
173        for removal when it fully stops.
174        """
175        name = switch.name
176        if self._switches.get(name, None) is not switch:
177            raise ValueError(f"Switch named {name!r} not found")
178
179        del self._switches[name]
180
181        event = asyncio.Event()
182        if self.running:
183            # When controller is running, event will complete when switch
184            # is actually stopped.
185            self._stop_switch(switch)
186            self._pending_removal.add(switch)
187
188            def _controller_leave(sw: Switch):
189                self._pending_removal.discard(sw)
190                event.set()
191
192            switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave)  # type: ignore
193        else:
194            # When controller is not running, event completes immediately.
195            event.set()
196
197        return event

Remove a switch from the controller.

If the controller is running, tell the switch to stop and schedule it for removal when it fully stops.

def __len__(self) -> int:
239    def __len__(self) -> int:
240        "Return switch count."
241        return len(self._switches)

Return switch count.

def __iter__(self) -> Iterator[Switch]:
243    def __iter__(self) -> Iterator[Switch]:
244        "Iterate over switches."
245        return iter(self._switches.values())

Iterate over switches.

def __getitem__(self, name: str) -> Switch:
247    def __getitem__(self, name: str) -> Switch:
248        "Retrieve switch by name."
249        return self._switches[name]

Retrieve switch by name.

def get(self, name: str) -> Switch | None:
251    def get(self, name: str) -> Switch | None:
252        "Retrieve switch by name, or return None if not found."
253        return self._switches.get(name)

Retrieve switch by name, or return None if not found.

@staticmethod
def current() -> Controller:
255    @staticmethod
256    def current() -> "Controller":
257        "Return the current Controller object."
258        result = _CONTROLLER.get()
259        if result is None:
260            raise RuntimeError("controller does not exist")
261        return result

Return the current Controller object.

class LoggerAdapter(logging.LoggerAdapter):
68class LoggerAdapter(_BaseLoggerAdapter):
69    """Custom log adapter to include the name of the current task."""
70
71    def process(
72        self,
73        msg: Any,
74        kwargs: MutableMapping[str, Any],
75    ) -> tuple[Any, MutableMapping[str, Any]]:
76        """Process the logging message and keyword arguments passed in to a
77        logging call to insert contextual information.
78        """
79        task_name = _get_current_task_name()
80        return f"[{task_name}] {msg}", kwargs
81
82    def info(self, msg: Any, *args: Any, **kwargs: Any) -> None:
83        """INFO level uses a concise task name represention for readability."""
84        if self.logger.isEnabledFor(logging.INFO):
85            task_name = _get_current_task_name(True)
86            self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)

Custom log adapter to include the name of the current task.

def process( self, msg: Any, kwargs: MutableMapping[str, Any]) -> tuple[typing.Any, typing.MutableMapping[str, typing.Any]]:
71    def process(
72        self,
73        msg: Any,
74        kwargs: MutableMapping[str, Any],
75    ) -> tuple[Any, MutableMapping[str, Any]]:
76        """Process the logging message and keyword arguments passed in to a
77        logging call to insert contextual information.
78        """
79        task_name = _get_current_task_name()
80        return f"[{task_name}] {msg}", kwargs

Process the logging message and keyword arguments passed in to a logging call to insert contextual information.

def info(self, msg: Any, *args: Any, **kwargs: Any) -> None:
82    def info(self, msg: Any, *args: Any, **kwargs: Any) -> None:
83        """INFO level uses a concise task name represention for readability."""
84        if self.logger.isEnabledFor(logging.INFO):
85            task_name = _get_current_task_name(True)
86            self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)

INFO level uses a concise task name represention for readability.

Inherited Members
logging.LoggerAdapter
LoggerAdapter
logger
extra
debug
warning
warn
error
exception
critical
log
isEnabledFor
setLevel
getEffectiveLevel
hasHandlers
manager
name
@functools.total_ordering
class MACAddress:
 24@functools.total_ordering
 25class MACAddress:
 26    """Concrete class for a MAC address."""
 27
 28    __slots__ = ("_mac", "__weakref__")
 29    _mac: int
 30
 31    def __init__(self, value: object) -> None:
 32        "Construct a MAC address from a string, integer or bytes object."
 33        match value:
 34            case int():
 35                self._mac = _from_int(value)
 36            case bytes():
 37                self._mac = _from_bytes(value)
 38            case MACAddress():
 39                self._mac = value._mac
 40            case _:
 41                # Input argument is a MAC string, or an object that is
 42                # formatted as MAC string (behave like ipaddress module).
 43                self._mac = _from_string(str(value))
 44
 45    @property
 46    def packed(self) -> bytes:
 47        "Return the MAC address a byte string."
 48        return _to_bytes(self._mac)
 49
 50    @property
 51    def max_prefixlen(self) -> int:
 52        "Return the maximum prefix length (48 bits)."
 53        return _BIT_WIDTH
 54
 55    @property
 56    def is_multicast(self) -> bool:
 57        "Return true if MAC address has the multicast bit set."
 58        return self._mac & (1 << 40) != 0
 59
 60    @property
 61    def is_private(self) -> bool:
 62        "Return true if MAC address has the locally administered bit set."
 63        return self._mac & (1 << 41) != 0
 64
 65    @property
 66    def is_global(self) -> bool:
 67        "Return true if the locally administered bit is not set."
 68        return not self.is_private
 69
 70    @property
 71    def is_unspecified(self) -> bool:
 72        "Return true if MAC address is all zeros."
 73        return self._mac == 0
 74
 75    @property
 76    def is_broadcast(self) -> bool:
 77        "Return true if MAC address is the broadcast address."
 78        return self._mac == (1 << _BIT_WIDTH) - 1
 79
 80    def __int__(self) -> int:
 81        return self._mac
 82
 83    def __eq__(self, rhs: object) -> bool:
 84        if not isinstance(rhs, MACAddress):
 85            return NotImplemented
 86        return self._mac == rhs._mac
 87
 88    def __lt__(self, rhs: object) -> bool:
 89        if not isinstance(rhs, MACAddress):
 90            return NotImplemented
 91        return self._mac < rhs._mac
 92
 93    def __repr__(self) -> str:
 94        return f"MACAddress({_to_string(self._mac)!r})"
 95
 96    def __str__(self) -> str:
 97        return _to_string(self._mac)
 98
 99    def __hash__(self) -> int:
100        return hash(hex(self._mac))

Concrete class for a MAC address.

MACAddress(value: object)
31    def __init__(self, value: object) -> None:
32        "Construct a MAC address from a string, integer or bytes object."
33        match value:
34            case int():
35                self._mac = _from_int(value)
36            case bytes():
37                self._mac = _from_bytes(value)
38            case MACAddress():
39                self._mac = value._mac
40            case _:
41                # Input argument is a MAC string, or an object that is
42                # formatted as MAC string (behave like ipaddress module).
43                self._mac = _from_string(str(value))

Construct a MAC address from a string, integer or bytes object.

packed: bytes
45    @property
46    def packed(self) -> bytes:
47        "Return the MAC address a byte string."
48        return _to_bytes(self._mac)

Return the MAC address a byte string.

max_prefixlen: int
50    @property
51    def max_prefixlen(self) -> int:
52        "Return the maximum prefix length (48 bits)."
53        return _BIT_WIDTH

Return the maximum prefix length (48 bits).

is_multicast: bool
55    @property
56    def is_multicast(self) -> bool:
57        "Return true if MAC address has the multicast bit set."
58        return self._mac & (1 << 40) != 0

Return true if MAC address has the multicast bit set.

is_private: bool
60    @property
61    def is_private(self) -> bool:
62        "Return true if MAC address has the locally administered bit set."
63        return self._mac & (1 << 41) != 0

Return true if MAC address has the locally administered bit set.

is_global: bool
65    @property
66    def is_global(self) -> bool:
67        "Return true if the locally administered bit is not set."
68        return not self.is_private

Return true if the locally administered bit is not set.

is_unspecified: bool
70    @property
71    def is_unspecified(self) -> bool:
72        "Return true if MAC address is all zeros."
73        return self._mac == 0

Return true if MAC address is all zeros.

is_broadcast: bool
75    @property
76    def is_broadcast(self) -> bool:
77        "Return true if MAC address is the broadcast address."
78        return self._mac == (1 << _BIT_WIDTH) - 1

Return true if MAC address is the broadcast address.

@decodable('action_profile_group')
@dataclass(slots=True)
class P4ActionProfileGroup(finsy.p4entity._P4Writable):
1445@decodable("action_profile_group")
1446@dataclass(slots=True)
1447class P4ActionProfileGroup(_P4Writable):
1448    "Represents a P4Runtime ActionProfileGroup."
1449
1450    action_profile_id: str = ""
1451    _: KW_ONLY
1452    group_id: int = 0
1453    max_size: int = 0
1454    members: Sequence[P4Member] | None = None
1455
1456    def encode(self, schema: P4Schema) -> p4r.Entity:
1457        "Encode P4ActionProfileGroup as protobuf."
1458        if not self.action_profile_id:
1459            return p4r.Entity(action_profile_group=p4r.ActionProfileGroup())
1460
1461        profile = schema.action_profiles[self.action_profile_id]
1462
1463        if self.members is not None:
1464            members = [member.encode() for member in self.members]
1465        else:
1466            members = None
1467
1468        entry = p4r.ActionProfileGroup(
1469            action_profile_id=profile.id,
1470            group_id=self.group_id,
1471            members=members,
1472            max_size=self.max_size,
1473        )
1474        return p4r.Entity(action_profile_group=entry)
1475
1476    @classmethod
1477    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1478        "Decode protobuf to ActionProfileGroup data."
1479        entry = msg.action_profile_group
1480        if entry.action_profile_id == 0:
1481            return cls()
1482
1483        profile = schema.action_profiles[entry.action_profile_id]
1484
1485        if entry.members:
1486            members = [P4Member.decode(member) for member in entry.members]
1487        else:
1488            members = None
1489
1490        return cls(
1491            action_profile_id=profile.alias,
1492            group_id=entry.group_id,
1493            max_size=entry.max_size,
1494            members=members,
1495        )
1496
1497    def action_str(self, _schema: P4Schema) -> str:
1498        "Return string representation of the weighted members."
1499        if not self.members:
1500            return ""
1501
1502        return " ".join(
1503            [f"{member.weight}*{member.member_id:#x}" for member in self.members]
1504        )

Represents a P4Runtime ActionProfileGroup.

P4ActionProfileGroup( action_profile_id: str = '', *, group_id: int = 0, max_size: int = 0, members: Optional[Sequence[P4Member]] = None)
action_profile_id: str
group_id: int
max_size: int
members: Optional[Sequence[P4Member]]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1456    def encode(self, schema: P4Schema) -> p4r.Entity:
1457        "Encode P4ActionProfileGroup as protobuf."
1458        if not self.action_profile_id:
1459            return p4r.Entity(action_profile_group=p4r.ActionProfileGroup())
1460
1461        profile = schema.action_profiles[self.action_profile_id]
1462
1463        if self.members is not None:
1464            members = [member.encode() for member in self.members]
1465        else:
1466            members = None
1467
1468        entry = p4r.ActionProfileGroup(
1469            action_profile_id=profile.id,
1470            group_id=self.group_id,
1471            members=members,
1472            max_size=self.max_size,
1473        )
1474        return p4r.Entity(action_profile_group=entry)

Encode P4ActionProfileGroup as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1476    @classmethod
1477    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1478        "Decode protobuf to ActionProfileGroup data."
1479        entry = msg.action_profile_group
1480        if entry.action_profile_id == 0:
1481            return cls()
1482
1483        profile = schema.action_profiles[entry.action_profile_id]
1484
1485        if entry.members:
1486            members = [P4Member.decode(member) for member in entry.members]
1487        else:
1488            members = None
1489
1490        return cls(
1491            action_profile_id=profile.alias,
1492            group_id=entry.group_id,
1493            max_size=entry.max_size,
1494            members=members,
1495        )

Decode protobuf to ActionProfileGroup data.

def action_str(self, _schema: P4Schema) -> str:
1497    def action_str(self, _schema: P4Schema) -> str:
1498        "Return string representation of the weighted members."
1499        if not self.members:
1500            return ""
1501
1502        return " ".join(
1503            [f"{member.weight}*{member.member_id:#x}" for member in self.members]
1504        )

Return string representation of the weighted members.

Inherited Members
finsy.p4entity._P4Writable
encode_update
@decodable('action_profile_member')
@dataclass(slots=True)
class P4ActionProfileMember(finsy.p4entity._P4Writable):
1343@decodable("action_profile_member")
1344@dataclass(slots=True)
1345class P4ActionProfileMember(_P4Writable):
1346    "Represents a P4Runtime ActionProfileMember."
1347
1348    action_profile_id: str = ""
1349    _: KW_ONLY
1350    member_id: int = 0
1351    action: P4TableAction | None = None
1352
1353    def encode(self, schema: P4Schema) -> p4r.Entity:
1354        "Encode P4ActionProfileMember as protobuf."
1355        if not self.action_profile_id:
1356            return p4r.Entity(action_profile_member=p4r.ActionProfileMember())
1357
1358        profile = schema.action_profiles[self.action_profile_id]
1359
1360        if self.action:
1361            action = self.action.encode_action(schema)
1362        else:
1363            action = None
1364
1365        entry = p4r.ActionProfileMember(
1366            action_profile_id=profile.id,
1367            member_id=self.member_id,
1368            action=action,
1369        )
1370        return p4r.Entity(action_profile_member=entry)
1371
1372    @classmethod
1373    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1374        "Decode protobuf to ActionProfileMember data."
1375        entry = msg.action_profile_member
1376        if entry.action_profile_id == 0:
1377            return cls()
1378
1379        profile = schema.action_profiles[entry.action_profile_id]
1380
1381        if entry.HasField("action"):
1382            action = P4TableAction.decode_action(entry.action, schema)
1383        else:
1384            action = None
1385
1386        return cls(
1387            action_profile_id=profile.alias,
1388            member_id=entry.member_id,
1389            action=action,
1390        )
1391
1392    def action_str(self, schema: P4Schema) -> str:
1393        "Format the action as a human-readable, canonical string."
1394        if self.action is None:
1395            return NOACTION_STR
1396        return self.action.format_str(schema)

Represents a P4Runtime ActionProfileMember.

P4ActionProfileMember( action_profile_id: str = '', *, member_id: int = 0, action: P4TableAction | None = None)
action_profile_id: str
member_id: int
action: P4TableAction | None
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1353    def encode(self, schema: P4Schema) -> p4r.Entity:
1354        "Encode P4ActionProfileMember as protobuf."
1355        if not self.action_profile_id:
1356            return p4r.Entity(action_profile_member=p4r.ActionProfileMember())
1357
1358        profile = schema.action_profiles[self.action_profile_id]
1359
1360        if self.action:
1361            action = self.action.encode_action(schema)
1362        else:
1363            action = None
1364
1365        entry = p4r.ActionProfileMember(
1366            action_profile_id=profile.id,
1367            member_id=self.member_id,
1368            action=action,
1369        )
1370        return p4r.Entity(action_profile_member=entry)

Encode P4ActionProfileMember as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1372    @classmethod
1373    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1374        "Decode protobuf to ActionProfileMember data."
1375        entry = msg.action_profile_member
1376        if entry.action_profile_id == 0:
1377            return cls()
1378
1379        profile = schema.action_profiles[entry.action_profile_id]
1380
1381        if entry.HasField("action"):
1382            action = P4TableAction.decode_action(entry.action, schema)
1383        else:
1384            action = None
1385
1386        return cls(
1387            action_profile_id=profile.alias,
1388            member_id=entry.member_id,
1389            action=action,
1390        )

Decode protobuf to ActionProfileMember data.

def action_str(self, schema: P4Schema) -> str:
1392    def action_str(self, schema: P4Schema) -> str:
1393        "Format the action as a human-readable, canonical string."
1394        if self.action is None:
1395            return NOACTION_STR
1396        return self.action.format_str(schema)

Format the action as a human-readable, canonical string.

Inherited Members
finsy.p4entity._P4Writable
encode_update
class P4Client:
239class P4Client:
240    "Implements a P4Runtime client."
241
242    _address: str
243    _credentials: GRPCCredentialsTLS | None
244    _wait_for_ready: bool
245    _channel: grpc.aio.Channel | None = None
246    _stub: p4r_grpc.P4RuntimeStub | None = None
247    _stream: _P4StreamTypeAlias | None = None
248    _complete_request: Callable[[pbutil.PBMessage], None] | None = None
249
250    _schema: P4Schema | None = None
251    "Annotate log messages using this optional P4Info schema."
252
253    def __init__(
254        self,
255        address: str,
256        credentials: GRPCCredentialsTLS | None = None,
257        *,
258        wait_for_ready: bool = True,
259    ) -> None:
260        self._address = address
261        self._credentials = credentials
262        self._wait_for_ready = wait_for_ready
263
264    @property
265    def channel(self) -> grpc.aio.Channel | None:
266        "Return the GRPC channel object, or None if the channel is not open."
267        return self._channel
268
269    async def __aenter__(self) -> Self:
270        await self.open()
271        return self
272
273    async def __aexit__(self, *args: Any) -> bool | None:
274        await self.close()
275
276    async def open(
277        self,
278        *,
279        schema: P4Schema | None = None,
280        complete_request: Callable[[pbutil.PBMessage], None] | None = None,
281    ) -> None:
282        """Open the client channel.
283
284        Note: This method is `async` for forward-compatible reasons.
285        """
286        assert self._channel is None
287        assert self._stub is None
288
289        # Increase max_metadata_size from 8 KB to 64 KB.
290        options = GRPCOptions(
291            max_metadata_size=64 * 1024,  # 64 kilobytes
292            max_reconnect_backoff_ms=15000,  # 15.0 seconds
293        )
294
295        self._channel = grpc_channel(
296            self._address,
297            credentials=self._credentials,
298            options=options,
299            client_type="P4Client",
300        )
301
302        self._stub = p4r_grpc.P4RuntimeStub(self._channel)
303        self._schema = schema
304        self._complete_request = complete_request
305
306    async def close(self) -> None:
307        "Close the client channel."
308        if self._channel is not None:
309            LOGGER.debug("P4Client: close channel %r", self._address)
310
311            if self._stream is not None:
312                self._stream.cancel()
313                self._stream = None
314
315            await self._channel.close()
316            self._channel = None
317            self._stub = None
318            self._schema = None
319            self._complete_request = None
320
321    async def send(self, msg: p4r.StreamMessageRequest) -> None:
322        """Send a message to the stream."""
323        assert self._stub is not None
324
325        if not self._stream or self._stream.done():
326            self._stream = cast(
327                _P4StreamTypeAlias,
328                self._stub.StreamChannel(wait_for_ready=self._wait_for_ready),  # type: ignore
329            )
330
331        self._log_msg(msg)
332
333        try:
334            await self._stream.write(msg)
335        except grpc.RpcError as ex:
336            raise P4ClientError(ex, "send") from None
337
338    async def receive(self) -> p4r.StreamMessageResponse:
339        """Read a message from the stream."""
340        assert self._stream is not None
341
342        try:
343            msg = cast(
344                p4r.StreamMessageResponse,
345                await self._stream.read(),  # type: ignore
346            )
347            if msg is GRPC_EOF:
348                # Treat EOF as a protocol violation.
349                raise RuntimeError("P4Client.receive got EOF!")
350
351        except grpc.RpcError as ex:
352            raise P4ClientError(ex, "receive") from None
353
354        self._log_msg(msg)
355        return msg
356
357    @overload
358    async def request(
359        self, msg: p4r.WriteRequest
360    ) -> p4r.WriteResponse: ...  # pragma: no cover
361
362    @overload
363    async def request(
364        self, msg: p4r.GetForwardingPipelineConfigRequest
365    ) -> p4r.GetForwardingPipelineConfigResponse: ...  # pragma: no cover
366
367    @overload
368    async def request(
369        self, msg: p4r.SetForwardingPipelineConfigRequest
370    ) -> p4r.SetForwardingPipelineConfigResponse: ...  # pragma: no cover
371
372    @overload
373    async def request(
374        self, msg: p4r.CapabilitiesRequest
375    ) -> p4r.CapabilitiesResponse: ...  # pragma: no cover
376
377    async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage:
378        "Send a unary-unary P4Runtime request and wait for the response."
379        assert self._stub is not None
380
381        if self._complete_request:
382            self._complete_request(msg)
383
384        msg_type = type(msg).__name__
385        assert msg_type.endswith("Request")
386        rpc_method = getattr(self._stub, msg_type[:-7])
387
388        self._log_msg(msg)
389        try:
390            reply = await rpc_method(
391                msg,
392                timeout=_DEFAULT_RPC_TIMEOUT,
393            )
394        except grpc.RpcError as ex:
395            raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None
396
397        self._log_msg(reply)
398        return reply
399
400    async def request_iter(
401        self, msg: p4r.ReadRequest
402    ) -> AsyncIterator[p4r.ReadResponse]:
403        "Send a unary-stream P4Runtime read request and wait for the responses."
404        assert self._stub is not None
405
406        if self._complete_request:
407            self._complete_request(msg)
408
409        msg_type = type(msg).__name__
410        assert msg_type.endswith("Request")
411        rpc_method = getattr(self._stub, msg_type[:-7])
412
413        self._log_msg(msg)
414        try:
415            async for reply in rpc_method(
416                msg,
417                timeout=_DEFAULT_RPC_TIMEOUT,
418            ):
419                self._log_msg(reply)
420                yield reply
421        except grpc.RpcError as ex:
422            raise P4ClientError(ex, msg_type) from None
423
424    def _log_msg(self, msg: pbutil.PBMessage) -> None:
425        "Log a P4Runtime request or response."
426        pbutil.log_msg(self._channel, msg, self._schema)

Implements a P4Runtime client.

P4Client( address: str, credentials: GRPCCredentialsTLS | None = None, *, wait_for_ready: bool = True)
253    def __init__(
254        self,
255        address: str,
256        credentials: GRPCCredentialsTLS | None = None,
257        *,
258        wait_for_ready: bool = True,
259    ) -> None:
260        self._address = address
261        self._credentials = credentials
262        self._wait_for_ready = wait_for_ready
channel: grpc.aio._base_channel.Channel | None
264    @property
265    def channel(self) -> grpc.aio.Channel | None:
266        "Return the GRPC channel object, or None if the channel is not open."
267        return self._channel

Return the GRPC channel object, or None if the channel is not open.

async def __aenter__(self) -> typing_extensions.Self:
269    async def __aenter__(self) -> Self:
270        await self.open()
271        return self
async def open( self, *, schema: P4Schema | None = None, complete_request: Optional[Callable[[google.protobuf.message.Message], NoneType]] = None) -> None:
276    async def open(
277        self,
278        *,
279        schema: P4Schema | None = None,
280        complete_request: Callable[[pbutil.PBMessage], None] | None = None,
281    ) -> None:
282        """Open the client channel.
283
284        Note: This method is `async` for forward-compatible reasons.
285        """
286        assert self._channel is None
287        assert self._stub is None
288
289        # Increase max_metadata_size from 8 KB to 64 KB.
290        options = GRPCOptions(
291            max_metadata_size=64 * 1024,  # 64 kilobytes
292            max_reconnect_backoff_ms=15000,  # 15.0 seconds
293        )
294
295        self._channel = grpc_channel(
296            self._address,
297            credentials=self._credentials,
298            options=options,
299            client_type="P4Client",
300        )
301
302        self._stub = p4r_grpc.P4RuntimeStub(self._channel)
303        self._schema = schema
304        self._complete_request = complete_request

Open the client channel.

Note: This method is async for forward-compatible reasons.

async def close(self) -> None:
306    async def close(self) -> None:
307        "Close the client channel."
308        if self._channel is not None:
309            LOGGER.debug("P4Client: close channel %r", self._address)
310
311            if self._stream is not None:
312                self._stream.cancel()
313                self._stream = None
314
315            await self._channel.close()
316            self._channel = None
317            self._stub = None
318            self._schema = None
319            self._complete_request = None

Close the client channel.

async def send(self, msg: p4.v1.p4runtime_pb2.StreamMessageRequest) -> None:
321    async def send(self, msg: p4r.StreamMessageRequest) -> None:
322        """Send a message to the stream."""
323        assert self._stub is not None
324
325        if not self._stream or self._stream.done():
326            self._stream = cast(
327                _P4StreamTypeAlias,
328                self._stub.StreamChannel(wait_for_ready=self._wait_for_ready),  # type: ignore
329            )
330
331        self._log_msg(msg)
332
333        try:
334            await self._stream.write(msg)
335        except grpc.RpcError as ex:
336            raise P4ClientError(ex, "send") from None

Send a message to the stream.

async def receive(self) -> p4.v1.p4runtime_pb2.StreamMessageResponse:
338    async def receive(self) -> p4r.StreamMessageResponse:
339        """Read a message from the stream."""
340        assert self._stream is not None
341
342        try:
343            msg = cast(
344                p4r.StreamMessageResponse,
345                await self._stream.read(),  # type: ignore
346            )
347            if msg is GRPC_EOF:
348                # Treat EOF as a protocol violation.
349                raise RuntimeError("P4Client.receive got EOF!")
350
351        except grpc.RpcError as ex:
352            raise P4ClientError(ex, "receive") from None
353
354        self._log_msg(msg)
355        return msg

Read a message from the stream.

async def request( self, msg: google.protobuf.message.Message) -> google.protobuf.message.Message:
377    async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage:
378        "Send a unary-unary P4Runtime request and wait for the response."
379        assert self._stub is not None
380
381        if self._complete_request:
382            self._complete_request(msg)
383
384        msg_type = type(msg).__name__
385        assert msg_type.endswith("Request")
386        rpc_method = getattr(self._stub, msg_type[:-7])
387
388        self._log_msg(msg)
389        try:
390            reply = await rpc_method(
391                msg,
392                timeout=_DEFAULT_RPC_TIMEOUT,
393            )
394        except grpc.RpcError as ex:
395            raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None
396
397        self._log_msg(reply)
398        return reply

Send a unary-unary P4Runtime request and wait for the response.

async def request_iter( self, msg: p4.v1.p4runtime_pb2.ReadRequest) -> AsyncIterator[p4.v1.p4runtime_pb2.ReadResponse]:
400    async def request_iter(
401        self, msg: p4r.ReadRequest
402    ) -> AsyncIterator[p4r.ReadResponse]:
403        "Send a unary-stream P4Runtime read request and wait for the responses."
404        assert self._stub is not None
405
406        if self._complete_request:
407            self._complete_request(msg)
408
409        msg_type = type(msg).__name__
410        assert msg_type.endswith("Request")
411        rpc_method = getattr(self._stub, msg_type[:-7])
412
413        self._log_msg(msg)
414        try:
415            async for reply in rpc_method(
416                msg,
417                timeout=_DEFAULT_RPC_TIMEOUT,
418            ):
419                self._log_msg(reply)
420                yield reply
421        except grpc.RpcError as ex:
422            raise P4ClientError(ex, msg_type) from None

Send a unary-stream P4Runtime read request and wait for the responses.

class P4ClientError(builtins.Exception):
125class P4ClientError(Exception):
126    "Wrap `grpc.RpcError`."
127
128    _operation: str
129    _status: P4RpcStatus
130    _outer_code: GRPCStatusCode
131    _outer_message: str
132    _schema: P4Schema | None = None  # for annotating sub-value details
133
134    def __init__(
135        self,
136        error: grpc.RpcError,
137        operation: str,
138        *,
139        msg: pbutil.PBMessage | None = None,
140        schema: P4Schema | None = None,
141    ):
142        super().__init__()
143        assert isinstance(error, grpc.aio.AioRpcError)
144
145        self._operation = operation
146        self._status = P4RpcStatus.from_rpc_error(error)
147        self._outer_code = GRPCStatusCode.from_status_code(error.code())
148        self._outer_message = error.details() or ""
149
150        if msg is not None and self.details:
151            self._attach_details(msg)
152            self._schema = schema
153
154        LOGGER.debug("%s failed: %s", operation, self)
155
156    @property
157    def code(self) -> GRPCStatusCode:
158        "GRPC status code."
159        return self._status.code
160
161    @property
162    def message(self) -> str:
163        "GRPC status message."
164        return self._status.message
165
166    @property
167    def details(self) -> dict[int, P4Error]:
168        "Optional details about P4Runtime Write updates that failed."
169        return self._status.details
170
171    @property
172    def is_not_found_only(self) -> bool:
173        """Return True if the only sub-errors are NOT_FOUND."""
174        if self.code != GRPCStatusCode.UNKNOWN:
175            return False
176
177        for err in self.details.values():
178            if err.canonical_code != GRPCStatusCode.NOT_FOUND:
179                return False
180        return True
181
182    @property
183    def is_election_id_used(self) -> bool:
184        """Return true if error is that election ID is in use."""
185        return (
186            self.code == GRPCStatusCode.INVALID_ARGUMENT
187            and _ELECTION_ID_EXISTS.search(self.message) is not None
188        )
189
190    @property
191    def is_pipeline_missing(self) -> bool:
192        "Return true if error is that no pipeline config is set."
193        return (
194            self.code == GRPCStatusCode.FAILED_PRECONDITION
195            and _NO_PIPELINE_CONFIG.search(self.message) is not None
196        )
197
198    def _attach_details(self, msg: pbutil.PBMessage):
199        "Attach the subvalue(s) from the message that caused the error."
200        if isinstance(msg, p4r.WriteRequest):
201            for key, value in self.details.items():
202                value.subvalue = msg.updates[key]
203
204    def __str__(self) -> str:
205        "Return string representation of P4ClientError object."
206        if self.details:
207
208            def _show(value: P4Error):
209                s = repr(value)
210                if self._schema:
211                    s = pbutil.log_annotate(s, self._schema)
212                s = s.replace("\n}\n)", "\n})")  # tidy multiline repr
213                return s.replace("\n", "\n" + " " * 6)  # indent 6 spaces
214
215            items = [""] + [
216                f"  [details.{key}] {_show(val)}" for key, val in self.details.items()
217            ]
218            details = "\n".join(items)
219        else:
220            details = ""
221
222        if self.code == self._outer_code and self.message == self._outer_message:
223            return (
224                f"operation={self._operation} code={self.code!r} "
225                f"message={self.message!r} {details}"
226            )
227        return (
228            f"code={self.code!r} message={self.message!r} "
229            f"details={self.details!r} operation={self._operation} "
230            f"_outer_message={self._outer_message!r} _outer_code={self._outer_code!r}"
231        )

Wrap grpc.RpcError.

P4ClientError( error: grpc.RpcError, operation: str, *, msg: google.protobuf.message.Message | None = None, schema: P4Schema | None = None)
134    def __init__(
135        self,
136        error: grpc.RpcError,
137        operation: str,
138        *,
139        msg: pbutil.PBMessage | None = None,
140        schema: P4Schema | None = None,
141    ):
142        super().__init__()
143        assert isinstance(error, grpc.aio.AioRpcError)
144
145        self._operation = operation
146        self._status = P4RpcStatus.from_rpc_error(error)
147        self._outer_code = GRPCStatusCode.from_status_code(error.code())
148        self._outer_message = error.details() or ""
149
150        if msg is not None and self.details:
151            self._attach_details(msg)
152            self._schema = schema
153
154        LOGGER.debug("%s failed: %s", operation, self)
code: GRPCStatusCode
156    @property
157    def code(self) -> GRPCStatusCode:
158        "GRPC status code."
159        return self._status.code

GRPC status code.

message: str
161    @property
162    def message(self) -> str:
163        "GRPC status message."
164        return self._status.message

GRPC status message.

details: dict[int, P4Error]
166    @property
167    def details(self) -> dict[int, P4Error]:
168        "Optional details about P4Runtime Write updates that failed."
169        return self._status.details

Optional details about P4Runtime Write updates that failed.

is_not_found_only: bool
171    @property
172    def is_not_found_only(self) -> bool:
173        """Return True if the only sub-errors are NOT_FOUND."""
174        if self.code != GRPCStatusCode.UNKNOWN:
175            return False
176
177        for err in self.details.values():
178            if err.canonical_code != GRPCStatusCode.NOT_FOUND:
179                return False
180        return True

Return True if the only sub-errors are NOT_FOUND.

is_election_id_used: bool
182    @property
183    def is_election_id_used(self) -> bool:
184        """Return true if error is that election ID is in use."""
185        return (
186            self.code == GRPCStatusCode.INVALID_ARGUMENT
187            and _ELECTION_ID_EXISTS.search(self.message) is not None
188        )

Return true if error is that election ID is in use.

is_pipeline_missing: bool
190    @property
191    def is_pipeline_missing(self) -> bool:
192        "Return true if error is that no pipeline config is set."
193        return (
194            self.code == GRPCStatusCode.FAILED_PRECONDITION
195            and _NO_PIPELINE_CONFIG.search(self.message) is not None
196        )

Return true if error is that no pipeline config is set.

Inherited Members
builtins.BaseException
with_traceback
args
@decodable('clone_session_entry')
@dataclass(slots=True)
class P4CloneSessionEntry(finsy.p4entity._P4Writable):
1254@decodable("clone_session_entry")
1255@dataclass(slots=True)
1256class P4CloneSessionEntry(_P4Writable):
1257    "Represents a P4Runtime CloneSessionEntry."
1258
1259    session_id: int = 0
1260    _: KW_ONLY
1261    class_of_service: int = 0
1262    packet_length_bytes: int = 0
1263    replicas: Sequence[_ReplicaType] = ()
1264
1265    def encode(self, schema: P4Schema) -> p4r.Entity:
1266        "Encode CloneSessionEntry data as protobuf."
1267        entry = p4r.CloneSessionEntry(
1268            session_id=self.session_id,
1269            class_of_service=self.class_of_service,
1270            packet_length_bytes=self.packet_length_bytes,
1271            replicas=[encode_replica(replica) for replica in self.replicas],
1272        )
1273        return p4r.Entity(
1274            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1275                clone_session_entry=entry
1276            )
1277        )
1278
1279    @classmethod
1280    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1281        "Decode protobuf to CloneSessionEntry data."
1282        entry = msg.packet_replication_engine_entry.clone_session_entry
1283        return cls(
1284            session_id=entry.session_id,
1285            class_of_service=entry.class_of_service,
1286            packet_length_bytes=entry.packet_length_bytes,
1287            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1288        )
1289
1290    def replicas_str(self) -> str:
1291        "Format the replicas as a human-readable, canonical string."
1292        return " ".join(format_replica(rep) for rep in self.replicas)

Represents a P4Runtime CloneSessionEntry.

P4CloneSessionEntry( session_id: int = 0, *, class_of_service: int = 0, packet_length_bytes: int = 0, replicas: Sequence[tuple[int, int] | int] = ())
session_id: int
class_of_service: int
packet_length_bytes: int
replicas: Sequence[tuple[int, int] | int]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1265    def encode(self, schema: P4Schema) -> p4r.Entity:
1266        "Encode CloneSessionEntry data as protobuf."
1267        entry = p4r.CloneSessionEntry(
1268            session_id=self.session_id,
1269            class_of_service=self.class_of_service,
1270            packet_length_bytes=self.packet_length_bytes,
1271            replicas=[encode_replica(replica) for replica in self.replicas],
1272        )
1273        return p4r.Entity(
1274            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1275                clone_session_entry=entry
1276            )
1277        )

Encode CloneSessionEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1279    @classmethod
1280    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1281        "Decode protobuf to CloneSessionEntry data."
1282        entry = msg.packet_replication_engine_entry.clone_session_entry
1283        return cls(
1284            session_id=entry.session_id,
1285            class_of_service=entry.class_of_service,
1286            packet_length_bytes=entry.packet_length_bytes,
1287            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1288        )

Decode protobuf to CloneSessionEntry data.

def replicas_str(self) -> str:
1290    def replicas_str(self) -> str:
1291        "Format the replicas as a human-readable, canonical string."
1292        return " ".join(format_replica(rep) for rep in self.replicas)

Format the replicas as a human-readable, canonical string.

Inherited Members
finsy.p4entity._P4Writable
encode_update
@dataclass(kw_only=True, slots=True)
class P4CounterData:
813@dataclass(kw_only=True, slots=True)
814class P4CounterData:
815    """Represents a P4Runtime object that keeps statistics of bytes and packets.
816
817    Attributes:
818        byte_count (int): the number of octets
819        packet_count (int): the number of packets
820
821    See Also:
822        - P4TableEntry
823        - P4MeterCounterData
824        - P4CounterEntry
825        - P4DirectCounterEntry
826    """
827
828    byte_count: int = 0
829    "The number of octets."
830    packet_count: int = 0
831    "The number of packets."
832
833    def encode(self) -> p4r.CounterData:
834        "Encode object as CounterData."
835        return p4r.CounterData(
836            byte_count=self.byte_count, packet_count=self.packet_count
837        )
838
839    @classmethod
840    def decode(cls, msg: p4r.CounterData) -> Self:
841        "Decode CounterData."
842        return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)

Represents a P4Runtime object that keeps statistics of bytes and packets.

Attributes: byte_count (int): the number of octets packet_count (int): the number of packets

See Also: - P4TableEntry - P4MeterCounterData - P4CounterEntry - P4DirectCounterEntry

P4CounterData(*, byte_count: int = 0, packet_count: int = 0)
byte_count: int

The number of octets.

packet_count: int

The number of packets.

def encode(self) -> p4.v1.p4runtime_pb2.CounterData:
833    def encode(self) -> p4r.CounterData:
834        "Encode object as CounterData."
835        return p4r.CounterData(
836            byte_count=self.byte_count, packet_count=self.packet_count
837        )

Encode object as CounterData.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.CounterData) -> typing_extensions.Self:
839    @classmethod
840    def decode(cls, msg: p4r.CounterData) -> Self:
841        "Decode CounterData."
842        return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)

Decode CounterData.

@decodable('counter_entry')
@dataclass(slots=True)
class P4CounterEntry(finsy.p4entity._P4ModifyOnly):
1640@decodable("counter_entry")
1641@dataclass(slots=True)
1642class P4CounterEntry(_P4ModifyOnly):
1643    "Represents a P4Runtime CounterEntry."
1644
1645    counter_id: str = ""
1646    _: KW_ONLY
1647    index: int | None = None
1648    data: P4CounterData | None = None
1649
1650    @property
1651    def packet_count(self) -> int:
1652        "Packet count from counter data (or 0 if there is no data)."
1653        if self.data is not None:
1654            return self.data.packet_count
1655        return 0
1656
1657    @property
1658    def byte_count(self) -> int:
1659        "Byte count from counter data (or 0 if there is no data)."
1660        if self.data is not None:
1661            return self.data.byte_count
1662        return 0
1663
1664    def encode(self, schema: P4Schema) -> p4r.Entity:
1665        "Encode P4CounterEntry as protobuf."
1666        if not self.counter_id:
1667            return p4r.Entity(counter_entry=p4r.CounterEntry())
1668
1669        counter = schema.counters[self.counter_id]
1670
1671        if self.index is not None:
1672            index = p4r.Index(index=self.index)
1673        else:
1674            index = None
1675
1676        if self.data is not None:
1677            data = self.data.encode()
1678        else:
1679            data = None
1680
1681        entry = p4r.CounterEntry(
1682            counter_id=counter.id,
1683            index=index,
1684            data=data,
1685        )
1686        return p4r.Entity(counter_entry=entry)
1687
1688    @classmethod
1689    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1690        "Decode protobuf to P4CounterEntry."
1691        entry = msg.counter_entry
1692        if not entry.counter_id:
1693            return cls()
1694
1695        counter = schema.counters[entry.counter_id]
1696
1697        if entry.HasField("index"):
1698            index = entry.index.index
1699        else:
1700            index = None
1701
1702        if entry.HasField("data"):
1703            data = P4CounterData.decode(entry.data)
1704        else:
1705            data = None
1706
1707        return cls(counter_id=counter.alias, index=index, data=data)

Represents a P4Runtime CounterEntry.

P4CounterEntry( counter_id: str = '', *, index: int | None = None, data: P4CounterData | None = None)
counter_id: str
index: int | None
data: P4CounterData | None
packet_count: int
1650    @property
1651    def packet_count(self) -> int:
1652        "Packet count from counter data (or 0 if there is no data)."
1653        if self.data is not None:
1654            return self.data.packet_count
1655        return 0

Packet count from counter data (or 0 if there is no data).

byte_count: int
1657    @property
1658    def byte_count(self) -> int:
1659        "Byte count from counter data (or 0 if there is no data)."
1660        if self.data is not None:
1661            return self.data.byte_count
1662        return 0

Byte count from counter data (or 0 if there is no data).

def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1664    def encode(self, schema: P4Schema) -> p4r.Entity:
1665        "Encode P4CounterEntry as protobuf."
1666        if not self.counter_id:
1667            return p4r.Entity(counter_entry=p4r.CounterEntry())
1668
1669        counter = schema.counters[self.counter_id]
1670
1671        if self.index is not None:
1672            index = p4r.Index(index=self.index)
1673        else:
1674            index = None
1675
1676        if self.data is not None:
1677            data = self.data.encode()
1678        else:
1679            data = None
1680
1681        entry = p4r.CounterEntry(
1682            counter_id=counter.id,
1683            index=index,
1684            data=data,
1685        )
1686        return p4r.Entity(counter_entry=entry)

Encode P4CounterEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1688    @classmethod
1689    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1690        "Decode protobuf to P4CounterEntry."
1691        entry = msg.counter_entry
1692        if not entry.counter_id:
1693            return cls()
1694
1695        counter = schema.counters[entry.counter_id]
1696
1697        if entry.HasField("index"):
1698            index = entry.index.index
1699        else:
1700            index = None
1701
1702        if entry.HasField("data"):
1703            data = P4CounterData.decode(entry.data)
1704        else:
1705            data = None
1706
1707        return cls(counter_id=counter.alias, index=index, data=data)

Decode protobuf to P4CounterEntry.

Inherited Members
finsy.p4entity._P4ModifyOnly
encode_update
class P4CounterUnit(finsy.grpcutil._EnumBase):
79class P4CounterUnit(_EnumBase):
80    "IntEnum equivalent to `p4i.CounterSpec.Unit`."
81    UNSPECIFIED = p4i.CounterSpec.Unit.UNSPECIFIED
82    BYTES = p4i.CounterSpec.Unit.BYTES
83    PACKETS = p4i.CounterSpec.Unit.PACKETS
84    BOTH = p4i.CounterSpec.Unit.BOTH

IntEnum equivalent to p4i.CounterSpec.Unit.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
@decodable('digest_entry')
@dataclass(slots=True)
class P4DigestEntry(finsy.p4entity._P4Writable):
1295@decodable("digest_entry")
1296@dataclass(slots=True)
1297class P4DigestEntry(_P4Writable):
1298    "Represents a P4Runtime DigestEntry."
1299
1300    digest_id: str = ""
1301    _: KW_ONLY
1302    max_list_size: int = 0
1303    max_timeout_ns: int = 0
1304    ack_timeout_ns: int = 0
1305
1306    def encode(self, schema: P4Schema) -> p4r.Entity:
1307        "Encode DigestEntry data as protobuf."
1308        if not self.digest_id:
1309            return p4r.Entity(digest_entry=p4r.DigestEntry())
1310
1311        digest = schema.digests[self.digest_id]
1312
1313        if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0:
1314            config = None
1315        else:
1316            config = p4r.DigestEntry.Config(
1317                max_timeout_ns=self.max_timeout_ns,
1318                max_list_size=self.max_list_size,
1319                ack_timeout_ns=self.ack_timeout_ns,
1320            )
1321
1322        entry = p4r.DigestEntry(digest_id=digest.id, config=config)
1323        return p4r.Entity(digest_entry=entry)
1324
1325    @classmethod
1326    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1327        "Decode protobuf to DigestEntry data."
1328        entry = msg.digest_entry
1329        if entry.digest_id == 0:
1330            return cls()
1331
1332        digest = schema.digests[entry.digest_id]
1333
1334        config = entry.config
1335        return cls(
1336            digest.alias,
1337            max_list_size=config.max_list_size,
1338            max_timeout_ns=config.max_timeout_ns,
1339            ack_timeout_ns=config.ack_timeout_ns,
1340        )

Represents a P4Runtime DigestEntry.

P4DigestEntry( digest_id: str = '', *, max_list_size: int = 0, max_timeout_ns: int = 0, ack_timeout_ns: int = 0)
digest_id: str
max_list_size: int
max_timeout_ns: int
ack_timeout_ns: int
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1306    def encode(self, schema: P4Schema) -> p4r.Entity:
1307        "Encode DigestEntry data as protobuf."
1308        if not self.digest_id:
1309            return p4r.Entity(digest_entry=p4r.DigestEntry())
1310
1311        digest = schema.digests[self.digest_id]
1312
1313        if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0:
1314            config = None
1315        else:
1316            config = p4r.DigestEntry.Config(
1317                max_timeout_ns=self.max_timeout_ns,
1318                max_list_size=self.max_list_size,
1319                ack_timeout_ns=self.ack_timeout_ns,
1320            )
1321
1322        entry = p4r.DigestEntry(digest_id=digest.id, config=config)
1323        return p4r.Entity(digest_entry=entry)

Encode DigestEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1325    @classmethod
1326    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1327        "Decode protobuf to DigestEntry data."
1328        entry = msg.digest_entry
1329        if entry.digest_id == 0:
1330            return cls()
1331
1332        digest = schema.digests[entry.digest_id]
1333
1334        config = entry.config
1335        return cls(
1336            digest.alias,
1337            max_list_size=config.max_list_size,
1338            max_timeout_ns=config.max_timeout_ns,
1339            ack_timeout_ns=config.ack_timeout_ns,
1340        )

Decode protobuf to DigestEntry data.

Inherited Members
finsy.p4entity._P4Writable
encode_update
@decodable('digest')
@dataclass(slots=True)
class P4DigestList:
1954@decodable("digest")
1955@dataclass(slots=True)
1956class P4DigestList:
1957    "Represents a P4Runtime DigestList."
1958
1959    digest_id: str
1960    _: KW_ONLY
1961    list_id: int
1962    timestamp: int
1963    data: list[_DataValueType]
1964
1965    @classmethod
1966    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1967        "Decode protobuf to DigestList data."
1968        digest_list = msg.digest
1969        digest = schema.digests[digest_list.digest_id]
1970
1971        type_spec = digest.type_spec
1972        return cls(
1973            digest_id=digest.alias,
1974            list_id=digest_list.list_id,
1975            timestamp=digest_list.timestamp,
1976            data=[type_spec.decode_data(item) for item in digest_list.data],
1977        )
1978
1979    def __len__(self) -> int:
1980        "Return number of values in digest list."
1981        return len(self.data)
1982
1983    def __getitem__(self, key: int) -> _DataValueType:
1984        "Retrieve value at given index from digest list."
1985        return self.data[key]
1986
1987    def __iter__(self) -> Iterator[_DataValueType]:
1988        "Iterate over values in digest list."
1989        return iter(self.data)
1990
1991    def ack(self) -> "P4DigestListAck":
1992        "Return the corresponding DigestListAck message."
1993        return P4DigestListAck(self.digest_id, self.list_id)

Represents a P4Runtime DigestList.

P4DigestList( digest_id: str, *, list_id: int, timestamp: int, data: list[typing.Any])
digest_id: str
list_id: int
timestamp: int
data: list[typing.Any]
@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.StreamMessageResponse, schema: P4Schema) -> typing_extensions.Self:
1965    @classmethod
1966    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1967        "Decode protobuf to DigestList data."
1968        digest_list = msg.digest
1969        digest = schema.digests[digest_list.digest_id]
1970
1971        type_spec = digest.type_spec
1972        return cls(
1973            digest_id=digest.alias,
1974            list_id=digest_list.list_id,
1975            timestamp=digest_list.timestamp,
1976            data=[type_spec.decode_data(item) for item in digest_list.data],
1977        )

Decode protobuf to DigestList data.

def __len__(self) -> int:
1979    def __len__(self) -> int:
1980        "Return number of values in digest list."
1981        return len(self.data)

Return number of values in digest list.

def __getitem__(self, key: int) -> Any:
1983    def __getitem__(self, key: int) -> _DataValueType:
1984        "Retrieve value at given index from digest list."
1985        return self.data[key]

Retrieve value at given index from digest list.

def __iter__(self) -> Iterator[Any]:
1987    def __iter__(self) -> Iterator[_DataValueType]:
1988        "Iterate over values in digest list."
1989        return iter(self.data)

Iterate over values in digest list.

def ack(self) -> P4DigestListAck:
1991    def ack(self) -> "P4DigestListAck":
1992        "Return the corresponding DigestListAck message."
1993        return P4DigestListAck(self.digest_id, self.list_id)

Return the corresponding DigestListAck message.

@dataclass(slots=True)
class P4DigestListAck:
1996@dataclass(slots=True)
1997class P4DigestListAck:
1998    "Represents a P4Runtime DigestListAck."
1999
2000    digest_id: str
2001    list_id: int
2002
2003    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
2004        "Encode DigestListAck data as protobuf."
2005        digest = schema.digests[self.digest_id]
2006
2007        return p4r.StreamMessageRequest(
2008            digest_ack=p4r.DigestListAck(
2009                digest_id=digest.id,
2010                list_id=self.list_id,
2011            )
2012        )

Represents a P4Runtime DigestListAck.

P4DigestListAck(digest_id: str, list_id: int)
digest_id: str
list_id: int
def encode_update( self, schema: P4Schema) -> p4.v1.p4runtime_pb2.StreamMessageRequest:
2003    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
2004        "Encode DigestListAck data as protobuf."
2005        digest = schema.digests[self.digest_id]
2006
2007        return p4r.StreamMessageRequest(
2008            digest_ack=p4r.DigestListAck(
2009                digest_id=digest.id,
2010                list_id=self.list_id,
2011            )
2012        )

Encode DigestListAck data as protobuf.

@decodable('direct_counter_entry')
@dataclass(slots=True)
class P4DirectCounterEntry(finsy.p4entity._P4ModifyOnly):
1710@decodable("direct_counter_entry")
1711@dataclass(slots=True)
1712class P4DirectCounterEntry(_P4ModifyOnly):
1713    "Represents a P4Runtime DirectCounterEntry."
1714
1715    counter_id: str = ""
1716    _: KW_ONLY
1717    table_entry: P4TableEntry | None = None
1718    data: P4CounterData | None = None
1719
1720    @property
1721    def table_id(self) -> str:
1722        "Return table_id of related table."
1723        if self.table_entry is None:
1724            return ""
1725        return self.table_entry.table_id
1726
1727    @property
1728    def packet_count(self) -> int:
1729        "Packet count from counter data (or 0 if there is no data)."
1730        if self.data is not None:
1731            return self.data.packet_count
1732        return 0
1733
1734    @property
1735    def byte_count(self) -> int:
1736        "Byte count from counter data (or 0 if there is no data)."
1737        if self.data is not None:
1738            return self.data.byte_count
1739        return 0
1740
1741    def encode(self, schema: P4Schema) -> p4r.Entity:
1742        "Encode P4DirectCounterEntry as protobuf."
1743        if self.table_entry is None:
1744            # Use `counter_id` to construct a `P4TableEntry` with the proper
1745            # table name.
1746            if self.counter_id:
1747                tb_name = schema.direct_counters[self.counter_id].direct_table_name
1748                table_entry = P4TableEntry(tb_name)
1749            else:
1750                table_entry = P4TableEntry()
1751        else:
1752            table_entry = self.table_entry
1753
1754        if self.data is not None:
1755            data = self.data.encode()
1756        else:
1757            data = None
1758
1759        entry = p4r.DirectCounterEntry(
1760            table_entry=table_entry.encode_entry(schema),
1761            data=data,
1762        )
1763        return p4r.Entity(direct_counter_entry=entry)
1764
1765    @classmethod
1766    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1767        "Decode protobuf to P4DirectCounterEntry."
1768        entry = msg.direct_counter_entry
1769
1770        if entry.HasField("table_entry"):
1771            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1772        else:
1773            table_entry = None
1774
1775        if entry.HasField("data"):
1776            data = P4CounterData.decode(entry.data)
1777        else:
1778            data = None
1779
1780        # Determine `counter_id` from table_entry.
1781        counter_id = ""
1782        if table_entry is not None and table_entry.table_id:
1783            direct_counter = schema.tables[table_entry.table_id].direct_counter
1784            assert direct_counter is not None
1785            counter_id = direct_counter.alias
1786
1787        return cls(counter_id, table_entry=table_entry, data=data)

Represents a P4Runtime DirectCounterEntry.

P4DirectCounterEntry( counter_id: str = '', *, table_entry: P4TableEntry | None = None, data: P4CounterData | None = None)
counter_id: str
table_entry: P4TableEntry | None
data: P4CounterData | None
table_id: str
1720    @property
1721    def table_id(self) -> str:
1722        "Return table_id of related table."
1723        if self.table_entry is None:
1724            return ""
1725        return self.table_entry.table_id

Return table_id of related table.

packet_count: int
1727    @property
1728    def packet_count(self) -> int:
1729        "Packet count from counter data (or 0 if there is no data)."
1730        if self.data is not None:
1731            return self.data.packet_count
1732        return 0

Packet count from counter data (or 0 if there is no data).

byte_count: int
1734    @property
1735    def byte_count(self) -> int:
1736        "Byte count from counter data (or 0 if there is no data)."
1737        if self.data is not None:
1738            return self.data.byte_count
1739        return 0

Byte count from counter data (or 0 if there is no data).

def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1741    def encode(self, schema: P4Schema) -> p4r.Entity:
1742        "Encode P4DirectCounterEntry as protobuf."
1743        if self.table_entry is None:
1744            # Use `counter_id` to construct a `P4TableEntry` with the proper
1745            # table name.
1746            if self.counter_id:
1747                tb_name = schema.direct_counters[self.counter_id].direct_table_name
1748                table_entry = P4TableEntry(tb_name)
1749            else:
1750                table_entry = P4TableEntry()
1751        else:
1752            table_entry = self.table_entry
1753
1754        if self.data is not None:
1755            data = self.data.encode()
1756        else:
1757            data = None
1758
1759        entry = p4r.DirectCounterEntry(
1760            table_entry=table_entry.encode_entry(schema),
1761            data=data,
1762        )
1763        return p4r.Entity(direct_counter_entry=entry)

Encode P4DirectCounterEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1765    @classmethod
1766    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1767        "Decode protobuf to P4DirectCounterEntry."
1768        entry = msg.direct_counter_entry
1769
1770        if entry.HasField("table_entry"):
1771            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1772        else:
1773            table_entry = None
1774
1775        if entry.HasField("data"):
1776            data = P4CounterData.decode(entry.data)
1777        else:
1778            data = None
1779
1780        # Determine `counter_id` from table_entry.
1781        counter_id = ""
1782        if table_entry is not None and table_entry.table_id:
1783            direct_counter = schema.tables[table_entry.table_id].direct_counter
1784            assert direct_counter is not None
1785            counter_id = direct_counter.alias
1786
1787        return cls(counter_id, table_entry=table_entry, data=data)

Decode protobuf to P4DirectCounterEntry.

Inherited Members
finsy.p4entity._P4ModifyOnly
encode_update
@decodable('direct_meter_entry')
@dataclass(kw_only=True, slots=True)
class P4DirectMeterEntry(finsy.p4entity._P4ModifyOnly):
1580@decodable("direct_meter_entry")
1581@dataclass(kw_only=True, slots=True)
1582class P4DirectMeterEntry(_P4ModifyOnly):
1583    "Represents a P4Runtime DirectMeterEntry."
1584
1585    table_entry: P4TableEntry | None = None
1586    config: P4MeterConfig | None = None
1587    counter_data: P4MeterCounterData | None = None
1588
1589    def encode(self, schema: P4Schema) -> p4r.Entity:
1590        "Encode P4DirectMeterEntry as protobuf."
1591        if self.table_entry is not None:
1592            table_entry = self.table_entry.encode_entry(schema)
1593        else:
1594            table_entry = None
1595
1596        if self.config is not None:
1597            config = self.config.encode()
1598        else:
1599            config = None
1600
1601        if self.counter_data is not None:
1602            counter_data = self.counter_data.encode()
1603        else:
1604            counter_data = None
1605
1606        entry = p4r.DirectMeterEntry(
1607            table_entry=table_entry,
1608            config=config,
1609            counter_data=counter_data,
1610        )
1611        return p4r.Entity(direct_meter_entry=entry)
1612
1613    @classmethod
1614    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1615        "Decode protobuf to P4DirectMeterEntry."
1616        entry = msg.direct_meter_entry
1617
1618        if entry.HasField("table_entry"):
1619            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1620        else:
1621            table_entry = None
1622
1623        if entry.HasField("config"):
1624            config = P4MeterConfig.decode(entry.config)
1625        else:
1626            config = None
1627
1628        if entry.HasField("counter_data"):
1629            counter_data = P4MeterCounterData.decode(entry.counter_data)
1630        else:
1631            counter_data = None
1632
1633        return cls(
1634            table_entry=table_entry,
1635            config=config,
1636            counter_data=counter_data,
1637        )

Represents a P4Runtime DirectMeterEntry.

P4DirectMeterEntry( *, table_entry: P4TableEntry | None = None, config: P4MeterConfig | None = None, counter_data: P4MeterCounterData | None = None)
table_entry: P4TableEntry | None
config: P4MeterConfig | None
counter_data: P4MeterCounterData | None
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1589    def encode(self, schema: P4Schema) -> p4r.Entity:
1590        "Encode P4DirectMeterEntry as protobuf."
1591        if self.table_entry is not None:
1592            table_entry = self.table_entry.encode_entry(schema)
1593        else:
1594            table_entry = None
1595
1596        if self.config is not None:
1597            config = self.config.encode()
1598        else:
1599            config = None
1600
1601        if self.counter_data is not None:
1602            counter_data = self.counter_data.encode()
1603        else:
1604            counter_data = None
1605
1606        entry = p4r.DirectMeterEntry(
1607            table_entry=table_entry,
1608            config=config,
1609            counter_data=counter_data,
1610        )
1611        return p4r.Entity(direct_meter_entry=entry)

Encode P4DirectMeterEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1613    @classmethod
1614    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1615        "Decode protobuf to P4DirectMeterEntry."
1616        entry = msg.direct_meter_entry
1617
1618        if entry.HasField("table_entry"):
1619            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1620        else:
1621            table_entry = None
1622
1623        if entry.HasField("config"):
1624            config = P4MeterConfig.decode(entry.config)
1625        else:
1626            config = None
1627
1628        if entry.HasField("counter_data"):
1629            counter_data = P4MeterCounterData.decode(entry.counter_data)
1630        else:
1631            counter_data = None
1632
1633        return cls(
1634            table_entry=table_entry,
1635            config=config,
1636            counter_data=counter_data,
1637        )

Decode protobuf to P4DirectMeterEntry.

Inherited Members
finsy.p4entity._P4ModifyOnly
encode_update
@dataclass
class P4Error:
58@dataclass
59class P4Error:
60    "P4Runtime Error message used to report a single P4-entity error."
61
62    canonical_code: GRPCStatusCode
63    message: str
64    space: str
65    code: int
66    subvalue: pbutil.PBMessage | None = None

P4Runtime Error message used to report a single P4-entity error.

P4Error( canonical_code: GRPCStatusCode, message: str, space: str, code: int, subvalue: google.protobuf.message.Message | None = None)
canonical_code: GRPCStatusCode
message: str
space: str
code: int
subvalue: google.protobuf.message.Message | None = None
@decodable('extern_entry')
@dataclass(kw_only=True, slots=True)
class P4ExternEntry(finsy.p4entity._P4Writable):
2047@decodable("extern_entry")
2048@dataclass(kw_only=True, slots=True)
2049class P4ExternEntry(_P4Writable):
2050    "Represents a P4Runtime ExternEntry."
2051
2052    extern_type_id: str
2053    extern_id: str
2054    entry: pbutil.PBAny
2055
2056    def encode(self, schema: P4Schema) -> p4r.Entity:
2057        "Encode ExternEntry data as protobuf."
2058        extern = schema.externs[self.extern_type_id, self.extern_id]
2059        entry = p4r.ExternEntry(
2060            extern_type_id=extern.extern_type_id,
2061            extern_id=extern.id,
2062            entry=self.entry,
2063        )
2064        return p4r.Entity(extern_entry=entry)
2065
2066    @classmethod
2067    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
2068        "Decode protobuf to ExternEntry data."
2069        entry = msg.extern_entry
2070        extern = schema.externs[entry.extern_type_id, entry.extern_id]
2071        return cls(
2072            extern_type_id=extern.extern_type_name,
2073            extern_id=extern.name,
2074            entry=entry.entry,
2075        )

Represents a P4Runtime ExternEntry.

P4ExternEntry( *, extern_type_id: str, extern_id: str, entry: google.protobuf.any_pb2.Any)
extern_type_id: str
extern_id: str
entry: google.protobuf.any_pb2.Any
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
2056    def encode(self, schema: P4Schema) -> p4r.Entity:
2057        "Encode ExternEntry data as protobuf."
2058        extern = schema.externs[self.extern_type_id, self.extern_id]
2059        entry = p4r.ExternEntry(
2060            extern_type_id=extern.extern_type_id,
2061            extern_id=extern.id,
2062            entry=self.entry,
2063        )
2064        return p4r.Entity(extern_entry=entry)

Encode ExternEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
2066    @classmethod
2067    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
2068        "Decode protobuf to ExternEntry data."
2069        entry = msg.extern_entry
2070        extern = schema.externs[entry.extern_type_id, entry.extern_id]
2071        return cls(
2072            extern_type_id=extern.extern_type_name,
2073            extern_id=extern.name,
2074            entry=entry.entry,
2075        )

Decode protobuf to ExternEntry data.

Inherited Members
finsy.p4entity._P4Writable
encode_update
IndirectAction = <class 'P4IndirectAction'>

IndirectAction is an alias for P4IndirectAction.

@dataclass(slots=True)
class P4IndirectAction:
618@dataclass(slots=True)
619class P4IndirectAction:
620    """Represents a P4Runtime Action reference for an indirect table.
621
622    An indirect action can be either:
623
624    1. a "one-shot" action (action_set)
625    2. a reference to an action profile member (member_id)
626    3. a reference to an action profile group (group_id)
627
628    Only one of action_set, member_id or group_id may be configured. The other
629    values must be None.
630
631    Attributes:
632        action_set: sequence of weighted actions for one-shot
633        member_id: ID of action profile member
634        group_id: ID of action profile group
635
636    Examples:
637
638    ```python
639    # Construct a one-shot action profile.
640    one_shot = P4IndirectAction(
641        2 * P4TableAction("forward", port=1),
642        1 * P4TableAction("forward", port=2),
643    )
644
645    # Refer to an action profile member by ID.
646    member_action = P4IndirectAction(member_id=1)
647
648    # Refer to an action profile group by ID.
649    group_action = P4IndirectAction(group_id=2)
650    ```
651
652    References:
653        - "9.1.2. Action Specification",
654        - "9.2.3. One Shot Action Selector Programming"
655
656    TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and
657        P4GroupAction.
658
659    See Also:
660        - P4TableEntry
661    """
662
663    action_set: Sequence[P4WeightedAction] | None = None
664    "Sequence of weighted actions defining one-shot action profile."
665    _: KW_ONLY
666    member_id: int | None = None
667    "ID of action profile member."
668    group_id: int | None = None
669    "ID of action profile group."
670
671    def __post_init__(self) -> None:
672        if not self._check_invariant():
673            raise ValueError(
674                "exactly one of action_set, member_id, or group_id must be set"
675            )
676
677    def _check_invariant(self) -> bool:
678        "Return true if instance satisfies class invariant."
679        if self.action_set is not None:
680            return self.member_id is None and self.group_id is None
681        if self.member_id is not None:
682            return self.group_id is None
683        return self.group_id is not None
684
685    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
686        "Encode object as a TableAction."
687        if self.action_set is not None:
688            return p4r.TableAction(
689                action_profile_action_set=self.encode_action_set(table)
690            )
691
692        if self.member_id is not None:
693            return p4r.TableAction(action_profile_member_id=self.member_id)
694
695        assert self.group_id is not None
696        return p4r.TableAction(action_profile_group_id=self.group_id)
697
698    def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet:
699        "Encode object as an ActionProfileActionSet."
700        assert self.action_set is not None
701
702        profile_actions: list[p4r.ActionProfileAction] = []
703        for weight, table_action in self.action_set:
704            action = table_action.encode_action(table)
705
706            match weight:
707                case int(weight_value):
708                    watch_port = None
709                case (weight_value, int(watch)):
710                    watch_port = encode_watch_port(watch)
711                case _:  # pyright: ignore[reportUnnecessaryComparison]
712                    raise ValueError(f"unexpected action weight: {weight!r}")
713
714            profile = p4r.ActionProfileAction(action=action, weight=weight_value)
715            if watch_port is not None:
716                profile.watch_port = watch_port
717            profile_actions.append(profile)
718
719        return p4r.ActionProfileActionSet(action_profile_actions=profile_actions)
720
721    @classmethod
722    def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self:
723        "Decode ActionProfileActionSet."
724        action_set = list[P4WeightedAction]()
725
726        for action in msg.action_profile_actions:
727            match action.WhichOneof("watch_kind"):
728                case "watch_port":
729                    weight = (action.weight, decode_watch_port(action.watch_port))
730                case None:
731                    weight = action.weight
732                case other:
733                    # "watch" (deprecated) is not supported
734                    raise ValueError(f"unexpected oneof: {other!r}")
735
736            table_action = P4TableAction.decode_action(action.action, table)
737            action_set.append((weight, table_action))
738
739        return cls(action_set)
740
741    def format_str(self, table: P4Table) -> str:
742        """Format the indirect table action as a human-readable string."""
743        if self.action_set is not None:
744            weighted_actions = [
745                f"{weight}*{action.format_str(table)}"
746                for weight, action in self.action_set
747            ]
748            return " ".join(weighted_actions)
749
750        # Use the name of the action_profile, if we can get it. If not, just
751        # use the placeholder "__indirect".
752        if table.action_profile is not None:
753            profile_name = f"@{table.action_profile.alias}"
754        else:
755            profile_name = "__indirect"
756
757        if self.member_id is not None:
758            return f"{profile_name}[[{self.member_id:#x}]]"
759
760        return f"{profile_name}[{self.group_id:#x}]"
761
762    def __repr__(self) -> str:
763        "Customize representation to make it more concise."
764        if self.action_set is not None:
765            return f"P4IndirectAction(action_set={self.action_set!r})"
766        if self.member_id is not None:
767            return f"P4IndirectAction(member_id={self.member_id!r})"
768        return f"P4IndirectAction(group_id={self.group_id!r})"

Represents a P4Runtime Action reference for an indirect table.

An indirect action can be either:

  1. a "one-shot" action (action_set)
  2. a reference to an action profile member (member_id)
  3. a reference to an action profile group (group_id)

Only one of action_set, member_id or group_id may be configured. The other values must be None.

Attributes: action_set: sequence of weighted actions for one-shot member_id: ID of action profile member group_id: ID of action profile group

Examples:

# Construct a one-shot action profile.
one_shot = P4IndirectAction(
    2 * P4TableAction("forward", port=1),
    1 * P4TableAction("forward", port=2),
)

# Refer to an action profile member by ID.
member_action = P4IndirectAction(member_id=1)

# Refer to an action profile group by ID.
group_action = P4IndirectAction(group_id=2)

References: - "9.1.2. Action Specification", - "9.2.3. One Shot Action Selector Programming"

TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and P4GroupAction.

See Also: - P4TableEntry

P4IndirectAction( action_set: Optional[Sequence[tuple[int | tuple[int, int], P4TableAction]]] = None, *, member_id: int | None = None, group_id: int | None = None)
action_set: Optional[Sequence[tuple[int | tuple[int, int], P4TableAction]]]

Sequence of weighted actions defining one-shot action profile.

member_id: int | None

ID of action profile member.

group_id: int | None

ID of action profile group.

def encode_table_action(self, table: finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.TableAction:
685    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
686        "Encode object as a TableAction."
687        if self.action_set is not None:
688            return p4r.TableAction(
689                action_profile_action_set=self.encode_action_set(table)
690            )
691
692        if self.member_id is not None:
693            return p4r.TableAction(action_profile_member_id=self.member_id)
694
695        assert self.group_id is not None
696        return p4r.TableAction(action_profile_group_id=self.group_id)

Encode object as a TableAction.

def encode_action_set( self, table: finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.ActionProfileActionSet:
698    def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet:
699        "Encode object as an ActionProfileActionSet."
700        assert self.action_set is not None
701
702        profile_actions: list[p4r.ActionProfileAction] = []
703        for weight, table_action in self.action_set:
704            action = table_action.encode_action(table)
705
706            match weight:
707                case int(weight_value):
708                    watch_port = None
709                case (weight_value, int(watch)):
710                    watch_port = encode_watch_port(watch)
711                case _:  # pyright: ignore[reportUnnecessaryComparison]
712                    raise ValueError(f"unexpected action weight: {weight!r}")
713
714            profile = p4r.ActionProfileAction(action=action, weight=weight_value)
715            if watch_port is not None:
716                profile.watch_port = watch_port
717            profile_actions.append(profile)
718
719        return p4r.ActionProfileActionSet(action_profile_actions=profile_actions)

Encode object as an ActionProfileActionSet.

@classmethod
def decode_action_set( cls, msg: p4.v1.p4runtime_pb2.ActionProfileActionSet, table: finsy.p4schema.P4Table) -> typing_extensions.Self:
721    @classmethod
722    def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self:
723        "Decode ActionProfileActionSet."
724        action_set = list[P4WeightedAction]()
725
726        for action in msg.action_profile_actions:
727            match action.WhichOneof("watch_kind"):
728                case "watch_port":
729                    weight = (action.weight, decode_watch_port(action.watch_port))
730                case None:
731                    weight = action.weight
732                case other:
733                    # "watch" (deprecated) is not supported
734                    raise ValueError(f"unexpected oneof: {other!r}")
735
736            table_action = P4TableAction.decode_action(action.action, table)
737            action_set.append((weight, table_action))
738
739        return cls(action_set)

Decode ActionProfileActionSet.

def format_str(self, table: finsy.p4schema.P4Table) -> str:
741    def format_str(self, table: P4Table) -> str:
742        """Format the indirect table action as a human-readable string."""
743        if self.action_set is not None:
744            weighted_actions = [
745                f"{weight}*{action.format_str(table)}"
746                for weight, action in self.action_set
747            ]
748            return " ".join(weighted_actions)
749
750        # Use the name of the action_profile, if we can get it. If not, just
751        # use the placeholder "__indirect".
752        if table.action_profile is not None:
753            profile_name = f"@{table.action_profile.alias}"
754        else:
755            profile_name = "__indirect"
756
757        if self.member_id is not None:
758            return f"{profile_name}[[{self.member_id:#x}]]"
759
760        return f"{profile_name}[{self.group_id:#x}]"

Format the indirect table action as a human-readable string.

@dataclass(slots=True)
class P4Member:
1399@dataclass(slots=True)
1400class P4Member:
1401    """Represents an ActionProfileGroup Member.
1402
1403    See Also:
1404        - P4ActionProfileGroup
1405    """
1406
1407    member_id: int
1408    _: KW_ONLY
1409    weight: P4Weight
1410
1411    def encode(self) -> p4r.ActionProfileGroup.Member:
1412        "Encode P4Member as protobuf."
1413        match self.weight:
1414            case int(weight):
1415                watch_port = None
1416            case (int(weight), int(watch)):
1417                watch_port = encode_watch_port(watch)
1418            case other:  # pyright: ignore[reportUnnecessaryComparison]
1419                raise ValueError(f"unexpected weight: {other!r}")
1420
1421        member = p4r.ActionProfileGroup.Member(
1422            member_id=self.member_id,
1423            weight=weight,
1424        )
1425
1426        if watch_port is not None:
1427            member.watch_port = watch_port
1428        return member
1429
1430    @classmethod
1431    def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self:
1432        "Decode protobuf to P4Member."
1433        match msg.WhichOneof("watch_kind"):
1434            case "watch_port":
1435                weight = (msg.weight, decode_watch_port(msg.watch_port))
1436            case None:
1437                weight = msg.weight
1438            case other:
1439                # "watch" (deprecated) is not supported
1440                raise ValueError(f"unknown oneof: {other!r}")
1441
1442        return cls(member_id=msg.member_id, weight=weight)

Represents an ActionProfileGroup Member.

See Also: - P4ActionProfileGroup

P4Member(member_id: int, *, weight: int | tuple[int, int])
member_id: int
weight: int | tuple[int, int]
def encode(self) -> p4.v1.p4runtime_pb2.Member:
1411    def encode(self) -> p4r.ActionProfileGroup.Member:
1412        "Encode P4Member as protobuf."
1413        match self.weight:
1414            case int(weight):
1415                watch_port = None
1416            case (int(weight), int(watch)):
1417                watch_port = encode_watch_port(watch)
1418            case other:  # pyright: ignore[reportUnnecessaryComparison]
1419                raise ValueError(f"unexpected weight: {other!r}")
1420
1421        member = p4r.ActionProfileGroup.Member(
1422            member_id=self.member_id,
1423            weight=weight,
1424        )
1425
1426        if watch_port is not None:
1427            member.watch_port = watch_port
1428        return member

Encode P4Member as protobuf.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.Member) -> typing_extensions.Self:
1430    @classmethod
1431    def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self:
1432        "Decode protobuf to P4Member."
1433        match msg.WhichOneof("watch_kind"):
1434            case "watch_port":
1435                weight = (msg.weight, decode_watch_port(msg.watch_port))
1436            case None:
1437                weight = msg.weight
1438            case other:
1439                # "watch" (deprecated) is not supported
1440                raise ValueError(f"unknown oneof: {other!r}")
1441
1442        return cls(member_id=msg.member_id, weight=weight)

Decode protobuf to P4Member.

@dataclass(kw_only=True, slots=True)
class P4MeterConfig:
771@dataclass(kw_only=True, slots=True)
772class P4MeterConfig:
773    """Represents a P4Runtime MeterConfig.
774
775    Attributes:
776        cir (int): Committed information rate (units/sec).
777        cburst (int): Committed burst size.
778        pir (int): Peak information rate (units/sec).
779        pburst (int): Peak burst size.
780
781    Example:
782    ```
783    config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)
784    ```
785
786    See Also:
787        - P4TableEntry
788        - P4MeterEntry
789        - P4DirectMeterEntry
790    """
791
792    cir: int
793    "Committed information rate (units/sec)."
794    cburst: int
795    "Committed burst size."
796    pir: int
797    "Peak information rate (units/sec)."
798    pburst: int
799    "Peak burst size."
800
801    def encode(self) -> p4r.MeterConfig:
802        "Encode object as MeterConfig."
803        return p4r.MeterConfig(
804            cir=self.cir, cburst=self.cburst, pir=self.pir, pburst=self.pburst
805        )
806
807    @classmethod
808    def decode(cls, msg: p4r.MeterConfig) -> Self:
809        "Decode MeterConfig."
810        return cls(cir=msg.cir, cburst=msg.cburst, pir=msg.pir, pburst=msg.pburst)

Represents a P4Runtime MeterConfig.

Attributes: cir (int): Committed information rate (units/sec). cburst (int): Committed burst size. pir (int): Peak information rate (units/sec). pburst (int): Peak burst size.

Example:

config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)

See Also: - P4TableEntry - P4MeterEntry - P4DirectMeterEntry

P4MeterConfig(*, cir: int, cburst: int, pir: int, pburst: int)
cir: int

Committed information rate (units/sec).

cburst: int

Committed burst size.

pir: int

Peak information rate (units/sec).

pburst: int

Peak burst size.

def encode(self) -> p4.v1.p4runtime_pb2.MeterConfig:
801    def encode(self) -> p4r.MeterConfig:
802        "Encode object as MeterConfig."
803        return p4r.MeterConfig(
804            cir=self.cir, cburst=self.cburst, pir=self.pir, pburst=self.pburst
805        )

Encode object as MeterConfig.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.MeterConfig) -> typing_extensions.Self:
807    @classmethod
808    def decode(cls, msg: p4r.MeterConfig) -> Self:
809        "Decode MeterConfig."
810        return cls(cir=msg.cir, cburst=msg.cburst, pir=msg.pir, pburst=msg.pburst)

Decode MeterConfig.

@dataclass(kw_only=True, slots=True)
class P4MeterCounterData:
845@dataclass(kw_only=True, slots=True)
846class P4MeterCounterData:
847    """Represents a P4Runtime MeterCounterData that stores per-color counters.
848
849    Attributes:
850        green (CounterData): counter data for packets marked GREEN.
851        yellow (CounterData): counter data for packets marked YELLOW.
852        red (CounterData): counter data for packets marked RED.
853
854    See Also:
855        - P4TableEntry
856        - P4MeterEntry
857        - P4DirectMeterEntry
858    """
859
860    green: P4CounterData
861    "Counter of packets marked GREEN."
862    yellow: P4CounterData
863    "Counter of packets marked YELLOW."
864    red: P4CounterData
865    "Counter of packets marked RED."
866
867    def encode(self) -> p4r.MeterCounterData:
868        "Encode object as MeterCounterData."
869        return p4r.MeterCounterData(
870            green=self.green.encode(),
871            yellow=self.yellow.encode(),
872            red=self.red.encode(),
873        )
874
875    @classmethod
876    def decode(cls, msg: p4r.MeterCounterData) -> Self:
877        "Decode MeterCounterData."
878        return cls(
879            green=P4CounterData.decode(msg.green),
880            yellow=P4CounterData.decode(msg.yellow),
881            red=P4CounterData.decode(msg.red),
882        )

Represents a P4Runtime MeterCounterData that stores per-color counters.

Attributes: green (CounterData): counter data for packets marked GREEN. yellow (CounterData): counter data for packets marked YELLOW. red (CounterData): counter data for packets marked RED.

See Also: - P4TableEntry - P4MeterEntry - P4DirectMeterEntry

P4MeterCounterData( *, green: P4CounterData, yellow: P4CounterData, red: P4CounterData)
green: P4CounterData

Counter of packets marked GREEN.

yellow: P4CounterData

Counter of packets marked YELLOW.

Counter of packets marked RED.

def encode(self) -> p4.v1.p4runtime_pb2.MeterCounterData:
867    def encode(self) -> p4r.MeterCounterData:
868        "Encode object as MeterCounterData."
869        return p4r.MeterCounterData(
870            green=self.green.encode(),
871            yellow=self.yellow.encode(),
872            red=self.red.encode(),
873        )

Encode object as MeterCounterData.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.MeterCounterData) -> typing_extensions.Self:
875    @classmethod
876    def decode(cls, msg: p4r.MeterCounterData) -> Self:
877        "Decode MeterCounterData."
878        return cls(
879            green=P4CounterData.decode(msg.green),
880            yellow=P4CounterData.decode(msg.yellow),
881            red=P4CounterData.decode(msg.red),
882        )

Decode MeterCounterData.

@decodable('meter_entry')
@dataclass(slots=True)
class P4MeterEntry(finsy.p4entity._P4ModifyOnly):
1507@decodable("meter_entry")
1508@dataclass(slots=True)
1509class P4MeterEntry(_P4ModifyOnly):
1510    "Represents a P4Runtime MeterEntry."
1511
1512    meter_id: str = ""
1513    _: KW_ONLY
1514    index: int | None = None
1515    config: P4MeterConfig | None = None
1516    counter_data: P4MeterCounterData | None = None
1517
1518    def encode(self, schema: P4Schema) -> p4r.Entity:
1519        "Encode P4MeterEntry to protobuf."
1520        if not self.meter_id:
1521            return p4r.Entity(meter_entry=p4r.MeterEntry())
1522
1523        meter = schema.meters[self.meter_id]
1524
1525        if self.index is not None:
1526            index = p4r.Index(index=self.index)
1527        else:
1528            index = None
1529
1530        if self.config is not None:
1531            config = self.config.encode()
1532        else:
1533            config = None
1534
1535        if self.counter_data is not None:
1536            counter_data = self.counter_data.encode()
1537        else:
1538            counter_data = None
1539
1540        entry = p4r.MeterEntry(
1541            meter_id=meter.id,
1542            index=index,
1543            config=config,
1544            counter_data=counter_data,
1545        )
1546        return p4r.Entity(meter_entry=entry)
1547
1548    @classmethod
1549    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1550        "Decode protobuf to P4MeterEntry."
1551        entry = msg.meter_entry
1552        if not entry.meter_id:
1553            return cls()
1554
1555        meter = schema.meters[entry.meter_id]
1556
1557        if entry.HasField("index"):
1558            index = entry.index.index
1559        else:
1560            index = None
1561
1562        if entry.HasField("config"):
1563            config = P4MeterConfig.decode(entry.config)
1564        else:
1565            config = None
1566
1567        if entry.HasField("counter_data"):
1568            counter_data = P4MeterCounterData.decode(entry.counter_data)
1569        else:
1570            counter_data = None
1571
1572        return cls(
1573            meter_id=meter.alias,
1574            index=index,
1575            config=config,
1576            counter_data=counter_data,
1577        )

Represents a P4Runtime MeterEntry.

P4MeterEntry( meter_id: str = '', *, index: int | None = None, config: P4MeterConfig | None = None, counter_data: P4MeterCounterData | None = None)
meter_id: str
index: int | None
config: P4MeterConfig | None
counter_data: P4MeterCounterData | None
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1518    def encode(self, schema: P4Schema) -> p4r.Entity:
1519        "Encode P4MeterEntry to protobuf."
1520        if not self.meter_id:
1521            return p4r.Entity(meter_entry=p4r.MeterEntry())
1522
1523        meter = schema.meters[self.meter_id]
1524
1525        if self.index is not None:
1526            index = p4r.Index(index=self.index)
1527        else:
1528            index = None
1529
1530        if self.config is not None:
1531            config = self.config.encode()
1532        else:
1533            config = None
1534
1535        if self.counter_data is not None:
1536            counter_data = self.counter_data.encode()
1537        else:
1538            counter_data = None
1539
1540        entry = p4r.MeterEntry(
1541            meter_id=meter.id,
1542            index=index,
1543            config=config,
1544            counter_data=counter_data,
1545        )
1546        return p4r.Entity(meter_entry=entry)

Encode P4MeterEntry to protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1548    @classmethod
1549    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1550        "Decode protobuf to P4MeterEntry."
1551        entry = msg.meter_entry
1552        if not entry.meter_id:
1553            return cls()
1554
1555        meter = schema.meters[entry.meter_id]
1556
1557        if entry.HasField("index"):
1558            index = entry.index.index
1559        else:
1560            index = None
1561
1562        if entry.HasField("config"):
1563            config = P4MeterConfig.decode(entry.config)
1564        else:
1565            config = None
1566
1567        if entry.HasField("counter_data"):
1568            counter_data = P4MeterCounterData.decode(entry.counter_data)
1569        else:
1570            counter_data = None
1571
1572        return cls(
1573            meter_id=meter.alias,
1574            index=index,
1575            config=config,
1576            counter_data=counter_data,
1577        )

Decode protobuf to P4MeterEntry.

Inherited Members
finsy.p4entity._P4ModifyOnly
encode_update
@decodable('multicast_group_entry')
@dataclass(slots=True)
class P4MulticastGroupEntry(finsy.p4entity._P4Writable):
1216@decodable("multicast_group_entry")
1217@dataclass(slots=True)
1218class P4MulticastGroupEntry(_P4Writable):
1219    "Represents a P4Runtime MulticastGroupEntry."
1220
1221    multicast_group_id: int = 0
1222    _: KW_ONLY
1223    replicas: Sequence[_ReplicaType] = ()
1224    metadata: bytes = b""
1225
1226    def encode(self, schema: P4Schema) -> p4r.Entity:
1227        "Encode MulticastGroupEntry data as protobuf."
1228        entry = p4r.MulticastGroupEntry(
1229            multicast_group_id=self.multicast_group_id,
1230            replicas=[encode_replica(replica) for replica in self.replicas],
1231            metadata=self.metadata,
1232        )
1233        return p4r.Entity(
1234            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1235                multicast_group_entry=entry
1236            )
1237        )
1238
1239    @classmethod
1240    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1241        "Decode protobuf to MulticastGroupEntry data."
1242        entry = msg.packet_replication_engine_entry.multicast_group_entry
1243        return cls(
1244            multicast_group_id=entry.multicast_group_id,
1245            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1246            metadata=entry.metadata,
1247        )
1248
1249    def replicas_str(self) -> str:
1250        "Format the replicas as a human-readable, canonical string."
1251        return " ".join(format_replica(rep) for rep in self.replicas)

Represents a P4Runtime MulticastGroupEntry.

P4MulticastGroupEntry( multicast_group_id: int = 0, *, replicas: Sequence[tuple[int, int] | int] = (), metadata: bytes = b'')
multicast_group_id: int
replicas: Sequence[tuple[int, int] | int]
metadata: bytes
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1226    def encode(self, schema: P4Schema) -> p4r.Entity:
1227        "Encode MulticastGroupEntry data as protobuf."
1228        entry = p4r.MulticastGroupEntry(
1229            multicast_group_id=self.multicast_group_id,
1230            replicas=[encode_replica(replica) for replica in self.replicas],
1231            metadata=self.metadata,
1232        )
1233        return p4r.Entity(
1234            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1235                multicast_group_entry=entry
1236            )
1237        )

Encode MulticastGroupEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1239    @classmethod
1240    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1241        "Decode protobuf to MulticastGroupEntry data."
1242        entry = msg.packet_replication_engine_entry.multicast_group_entry
1243        return cls(
1244            multicast_group_id=entry.multicast_group_id,
1245            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1246            metadata=entry.metadata,
1247        )

Decode protobuf to MulticastGroupEntry data.

def replicas_str(self) -> str:
1249    def replicas_str(self) -> str:
1250        "Format the replicas as a human-readable, canonical string."
1251        return " ".join(format_replica(rep) for rep in self.replicas)

Format the replicas as a human-readable, canonical string.

Inherited Members
finsy.p4entity._P4Writable
encode_update
@decodable('packet')
@dataclass(slots=True)
class P4PacketIn:
1884@decodable("packet")
1885@dataclass(slots=True)
1886class P4PacketIn:
1887    "Represents a P4Runtime PacketIn."
1888
1889    payload: bytes
1890    _: KW_ONLY
1891    metadata: _MetadataDictType
1892
1893    @classmethod
1894    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1895        "Decode protobuf to PacketIn data."
1896        packet = msg.packet
1897        cpm = schema.controller_packet_metadata.get("packet_in")
1898        if cpm is None:
1899            # There is no controller metadata. Warn if message has any.
1900            pkt_meta = packet.metadata
1901            if pkt_meta:
1902                LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta)
1903            return cls(packet.payload, metadata={})
1904
1905        return cls(
1906            packet.payload,
1907            metadata=cpm.decode(packet.metadata),
1908        )
1909
1910    def __getitem__(self, key: str) -> Any:
1911        "Retrieve metadata value."
1912        return self.metadata[key]
1913
1914    def __repr__(self) -> str:
1915        "Return friendlier hexadecimal description of packet."
1916        if self.metadata:
1917            return f"P4PacketIn(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')"
1918        return f"P4PacketIn(payload=h'{self.payload.hex()}')"

Represents a P4Runtime PacketIn.

P4PacketIn(payload: bytes, *, metadata: dict[str, typing.Any])
payload: bytes
metadata: dict[str, typing.Any]
@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.StreamMessageResponse, schema: P4Schema) -> typing_extensions.Self:
1893    @classmethod
1894    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1895        "Decode protobuf to PacketIn data."
1896        packet = msg.packet
1897        cpm = schema.controller_packet_metadata.get("packet_in")
1898        if cpm is None:
1899            # There is no controller metadata. Warn if message has any.
1900            pkt_meta = packet.metadata
1901            if pkt_meta:
1902                LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta)
1903            return cls(packet.payload, metadata={})
1904
1905        return cls(
1906            packet.payload,
1907            metadata=cpm.decode(packet.metadata),
1908        )

Decode protobuf to PacketIn data.

def __getitem__(self, key: str) -> Any:
1910    def __getitem__(self, key: str) -> Any:
1911        "Retrieve metadata value."
1912        return self.metadata[key]

Retrieve metadata value.

@dataclass(slots=True)
class P4PacketOut:
1921@dataclass(slots=True)
1922class P4PacketOut:
1923    "Represents a P4Runtime PacketOut."
1924
1925    payload: bytes
1926    _: KW_ONLY
1927    metadata: _MetadataDictType
1928
1929    def __init__(self, __payload: bytes, /, **metadata: Any):
1930        self.payload = __payload
1931        self.metadata = metadata
1932
1933    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
1934        "Encode PacketOut data as protobuf."
1935        cpm = schema.controller_packet_metadata["packet_out"]
1936        return p4r.StreamMessageRequest(
1937            packet=p4r.PacketOut(
1938                payload=self.payload,
1939                metadata=cpm.encode(self.metadata),
1940            )
1941        )
1942
1943    def __getitem__(self, key: str) -> Any:
1944        "Retrieve metadata value."
1945        return self.metadata[key]
1946
1947    def __repr__(self) -> str:
1948        "Return friendlier hexadecimal description of packet."
1949        if self.metadata:
1950            return f"P4PacketOut(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')"
1951        return f"P4PacketOut(payload=h'{self.payload.hex()}')"

Represents a P4Runtime PacketOut.

P4PacketOut(_P4PacketOut__payload: bytes, /, **metadata: Any)
1929    def __init__(self, __payload: bytes, /, **metadata: Any):
1930        self.payload = __payload
1931        self.metadata = metadata
payload: bytes
metadata: dict[str, typing.Any]
def encode_update( self, schema: P4Schema) -> p4.v1.p4runtime_pb2.StreamMessageRequest:
1933    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
1934        "Encode PacketOut data as protobuf."
1935        cpm = schema.controller_packet_metadata["packet_out"]
1936        return p4r.StreamMessageRequest(
1937            packet=p4r.PacketOut(
1938                payload=self.payload,
1939                metadata=cpm.encode(self.metadata),
1940            )
1941        )

Encode PacketOut data as protobuf.

def __getitem__(self, key: str) -> Any:
1943    def __getitem__(self, key: str) -> Any:
1944        "Retrieve metadata value."
1945        return self.metadata[key]

Retrieve metadata value.

@decodable('register_entry')
@dataclass(slots=True)
class P4RegisterEntry(finsy.p4entity._P4ModifyOnly):
1156@decodable("register_entry")
1157@dataclass(slots=True)
1158class P4RegisterEntry(_P4ModifyOnly):
1159    "Represents a P4Runtime RegisterEntry."
1160
1161    register_id: str = ""
1162    _: KW_ONLY
1163    index: int | None = None
1164    data: _DataValueType | None = None
1165
1166    def encode(self, schema: P4Schema) -> p4r.Entity:
1167        "Encode RegisterEntry data as protobuf."
1168        if not self.register_id:
1169            return p4r.Entity(register_entry=p4r.RegisterEntry())
1170
1171        register = schema.registers[self.register_id]
1172
1173        if self.index is not None:
1174            index = p4r.Index(index=self.index)
1175        else:
1176            index = None
1177
1178        if self.data is not None:
1179            data = register.type_spec.encode_data(self.data)
1180        else:
1181            data = None
1182
1183        entry = p4r.RegisterEntry(
1184            register_id=register.id,
1185            index=index,
1186            data=data,
1187        )
1188        return p4r.Entity(register_entry=entry)
1189
1190    @classmethod
1191    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1192        "Decode protobuf to RegisterEntry data."
1193        entry = msg.register_entry
1194        if entry.register_id == 0:
1195            return cls()
1196
1197        register = schema.registers[entry.register_id]
1198
1199        if entry.HasField("index"):
1200            index = entry.index.index
1201        else:
1202            index = None
1203
1204        if entry.HasField("data"):
1205            data = register.type_spec.decode_data(entry.data)
1206        else:
1207            data = None
1208
1209        return cls(
1210            register.alias,
1211            index=index,
1212            data=data,
1213        )

Represents a P4Runtime RegisterEntry.

P4RegisterEntry( register_id: str = '', *, index: int | None = None, data: Optional[Any] = None)
register_id: str
index: int | None
data: Optional[Any]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1166    def encode(self, schema: P4Schema) -> p4r.Entity:
1167        "Encode RegisterEntry data as protobuf."
1168        if not self.register_id:
1169            return p4r.Entity(register_entry=p4r.RegisterEntry())
1170
1171        register = schema.registers[self.register_id]
1172
1173        if self.index is not None:
1174            index = p4r.Index(index=self.index)
1175        else:
1176            index = None
1177
1178        if self.data is not None:
1179            data = register.type_spec.encode_data(self.data)
1180        else:
1181            data = None
1182
1183        entry = p4r.RegisterEntry(
1184            register_id=register.id,
1185            index=index,
1186            data=data,
1187        )
1188        return p4r.Entity(register_entry=entry)

Encode RegisterEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1190    @classmethod
1191    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1192        "Decode protobuf to RegisterEntry data."
1193        entry = msg.register_entry
1194        if entry.register_id == 0:
1195            return cls()
1196
1197        register = schema.registers[entry.register_id]
1198
1199        if entry.HasField("index"):
1200            index = entry.index.index
1201        else:
1202            index = None
1203
1204        if entry.HasField("data"):
1205            data = register.type_spec.decode_data(entry.data)
1206        else:
1207            data = None
1208
1209        return cls(
1210            register.alias,
1211            index=index,
1212            data=data,
1213        )

Decode protobuf to RegisterEntry data.

Inherited Members
finsy.p4entity._P4ModifyOnly
encode_update
Action = <class 'P4TableAction'>

Action is an alias for P4TableAction.

@dataclass(init=False, slots=True)
class P4TableAction:
439@dataclass(init=False, slots=True)
440class P4TableAction:
441    """Represents a P4Runtime Action reference for a direct table.
442
443    Attributes:
444        name (str): the name of the action.
445        args (dict[str, Any]): the action's arguments as a dictionary.
446
447    Example:
448        If the name of the action is "ipv4_forward" and it takes a single
449        "port" parameter, you can construct the action as:
450
451        ```
452        action = P4TableAction("ipv4_forward", port=1)
453        ```
454
455    Reference "9.1.2 Action Specification":
456        The Action Protobuf has fields: (action_id, params). Finsy translates
457        `name` to the appropriate `action_id` as determined by P4Info. It also
458        translates each named argument in `args` to the appropriate `param_id`.
459
460    See Also:
461        To specify an action for an indirect table, use `P4IndirectAction`.
462        Note that P4TableAction will automatically be promoted to an "indirect"
463        action if needed.
464
465    Operators:
466        A `P4TableAction` supports the multiplication operator (*) for
467        constructing "weighted actions". A weighted action is used in specifying
468        indirect actions. Here is an action with a weight of 3:
469
470        ```
471        weighted_action = 3 * P4TableAction("ipv4_forward", port=1)
472        ```
473
474        To specify a weight with a `watch_port`, use a tuple `(weight, port)`.
475        The weight is always a positive integer.
476
477    See Also:
478        - P4TableEntry
479    """
480
481    name: str
482    "The name of the action."
483    args: dict[str, Any]
484    "The action's arguments as a dictionary."
485
486    def __init__(self, __name: str, /, **args: Any):
487        self.name = __name
488        self.args = args
489
490    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
491        """Encode TableAction data as protobuf.
492
493        If the table is indirect, promote the action to a "one-shot" indirect
494        action.
495        """
496        try:
497            action = table.actions[self.name]
498        except Exception as ex:
499            raise ValueError(f"{table.name!r}: {ex}") from ex
500
501        action_p4 = self._encode_action(action)
502
503        if table.action_profile is not None:
504            # Promote action to ActionProfileActionSet entry with weight=1.
505            return p4r.TableAction(
506                action_profile_action_set=p4r.ActionProfileActionSet(
507                    action_profile_actions=[
508                        p4r.ActionProfileAction(action=action_p4, weight=1)
509                    ]
510                )
511            )
512
513        return p4r.TableAction(action=action_p4)
514
515    def _fail_missing_params(self, action: P4ActionRef | P4Action) -> NoReturn:
516        "Report missing parameters."
517        seen = {param.name for param in action.params}
518        for name in self.args:
519            param = action.params[name]
520            seen.remove(param.name)
521
522        raise ValueError(f"Action {action.alias!r}: missing parameters {seen}")
523
524    def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action:
525        "Encode Action data as protobuf."
526        action = schema.actions[self.name]
527        return self._encode_action(action)
528
529    def _encode_action(self, action: P4ActionRef | P4Action) -> p4r.Action:
530        "Helper to encode an action."
531        aps = action.params
532        try:
533            params = [
534                aps[name].encode_param(value) for name, value in self.args.items()
535            ]
536        except ValueError as ex:
537            raise ValueError(f"{action.alias!r}: {ex}") from ex
538
539        # Check for missing action parameters. We always accept an action with
540        # no parameters (for wildcard ReadRequests).
541        param_count = len(params)
542        if param_count > 0 and param_count != len(aps):
543            self._fail_missing_params(action)
544
545        return p4r.Action(action_id=action.id, params=params)
546
547    @classmethod
548    def decode_table_action(
549        cls, msg: p4r.TableAction, table: P4Table
550    ) -> Self | "P4IndirectAction":
551        "Decode protobuf to TableAction data."
552        match msg.WhichOneof("type"):
553            case "action":
554                return cls.decode_action(msg.action, table)
555            case "action_profile_member_id":
556                return P4IndirectAction(member_id=msg.action_profile_member_id)
557            case "action_profile_group_id":
558                return P4IndirectAction(group_id=msg.action_profile_group_id)
559            case "action_profile_action_set":
560                return P4IndirectAction.decode_action_set(
561                    msg.action_profile_action_set, table
562                )
563            case other:
564                raise ValueError(f"unknown oneof: {other!r}")
565
566    @classmethod
567    def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self:
568        "Decode protobuf to Action data."
569        action = parent.actions[msg.action_id]
570        args = {}
571        for param in msg.params:
572            action_param = action.params[param.param_id]
573            value = action_param.decode_param(param)
574            args[action_param.name] = value
575
576        return cls(action.alias, **args)
577
578    def format_str(self, schema: P4Schema | P4Table) -> str:
579        """Format the table action as a human-readable string.
580
581        The result is formatted to look like a function call:
582
583        ```
584        name(param1=value1, ...)
585        ```
586
587        Where `name` is the action name, and `(param<N>, value<N>)` are the
588        action parameters. The format of `value<N>` is schema-dependent.
589        """
590        aps = schema.actions[self.name].params
591        args = [
592            f"{key}={aps[key].format_param(value)}" for key, value in self.args.items()
593        ]
594
595        return f"{self.name}({', '.join(args)})"
596
597    def __mul__(self, weight: P4Weight) -> P4WeightedAction:
598        "Make a weighted action."
599        if not isinstance(
600            weight, (int, tuple)
601        ):  # pyright: ignore[reportUnnecessaryIsInstance]
602            raise NotImplementedError("expected P4Weight")
603        return (weight, self)
604
605    def __rmul__(self, weight: P4Weight) -> P4WeightedAction:
606        "Make a weighted action."
607        if not isinstance(
608            weight, (int, tuple)
609        ):  # pyright: ignore[reportUnnecessaryIsInstance]
610            raise NotImplementedError("expected P4Weight")
611        return (weight, self)
612
613    def __call__(self, **params: Any) -> Self:
614        "Return a new action with the updated parameters."
615        return self.__class__(self.name, **(self.args | params))

Represents a P4Runtime Action reference for a direct table.

Attributes: name (str): the name of the action. args (dict[str, Any]): the action's arguments as a dictionary.

Example: If the name of the action is "ipv4_forward" and it takes a single "port" parameter, you can construct the action as:

action = P4TableAction("ipv4_forward", port=1)

Reference "9.1.2 Action Specification": The Action Protobuf has fields: (action_id, params). Finsy translates name to the appropriate action_id as determined by P4Info. It also translates each named argument in args to the appropriate param_id.

See Also: To specify an action for an indirect table, use P4IndirectAction. Note that P4TableAction will automatically be promoted to an "indirect" action if needed.

Operators: A P4TableAction supports the multiplication operator (*) for constructing "weighted actions". A weighted action is used in specifying indirect actions. Here is an action with a weight of 3:

weighted_action = 3 * P4TableAction("ipv4_forward", port=1)

To specify a weight with a `watch_port`, use a tuple `(weight, port)`.
The weight is always a positive integer.

See Also: - P4TableEntry

P4TableAction(_P4TableAction__name: str, /, **args: Any)
486    def __init__(self, __name: str, /, **args: Any):
487        self.name = __name
488        self.args = args
name: str

The name of the action.

args: dict[str, typing.Any]

The action's arguments as a dictionary.

def encode_table_action(self, table: finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.TableAction:
490    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
491        """Encode TableAction data as protobuf.
492
493        If the table is indirect, promote the action to a "one-shot" indirect
494        action.
495        """
496        try:
497            action = table.actions[self.name]
498        except Exception as ex:
499            raise ValueError(f"{table.name!r}: {ex}") from ex
500
501        action_p4 = self._encode_action(action)
502
503        if table.action_profile is not None:
504            # Promote action to ActionProfileActionSet entry with weight=1.
505            return p4r.TableAction(
506                action_profile_action_set=p4r.ActionProfileActionSet(
507                    action_profile_actions=[
508                        p4r.ActionProfileAction(action=action_p4, weight=1)
509                    ]
510                )
511            )
512
513        return p4r.TableAction(action=action_p4)

Encode TableAction data as protobuf.

If the table is indirect, promote the action to a "one-shot" indirect action.

def encode_action( self, schema: P4Schema | finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.Action:
524    def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action:
525        "Encode Action data as protobuf."
526        action = schema.actions[self.name]
527        return self._encode_action(action)

Encode Action data as protobuf.

@classmethod
def decode_table_action( cls, msg: p4.v1.p4runtime_pb2.TableAction, table: finsy.p4schema.P4Table) -> Union[typing_extensions.Self, P4IndirectAction]:
547    @classmethod
548    def decode_table_action(
549        cls, msg: p4r.TableAction, table: P4Table
550    ) -> Self | "P4IndirectAction":
551        "Decode protobuf to TableAction data."
552        match msg.WhichOneof("type"):
553            case "action":
554                return cls.decode_action(msg.action, table)
555            case "action_profile_member_id":
556                return P4IndirectAction(member_id=msg.action_profile_member_id)
557            case "action_profile_group_id":
558                return P4IndirectAction(group_id=msg.action_profile_group_id)
559            case "action_profile_action_set":
560                return P4IndirectAction.decode_action_set(
561                    msg.action_profile_action_set, table
562                )
563            case other:
564                raise ValueError(f"unknown oneof: {other!r}")

Decode protobuf to TableAction data.

@classmethod
def decode_action( cls, msg: p4.v1.p4runtime_pb2.Action, parent: P4Schema | finsy.p4schema.P4Table) -> typing_extensions.Self:
566    @classmethod
567    def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self:
568        "Decode protobuf to Action data."
569        action = parent.actions[msg.action_id]
570        args = {}
571        for param in msg.params:
572            action_param = action.params[param.param_id]
573            value = action_param.decode_param(param)
574            args[action_param.name] = value
575
576        return cls(action.alias, **args)

Decode protobuf to Action data.

def format_str(self, schema: P4Schema | finsy.p4schema.P4Table) -> str:
578    def format_str(self, schema: P4Schema | P4Table) -> str:
579        """Format the table action as a human-readable string.
580
581        The result is formatted to look like a function call:
582
583        ```
584        name(param1=value1, ...)
585        ```
586
587        Where `name` is the action name, and `(param<N>, value<N>)` are the
588        action parameters. The format of `value<N>` is schema-dependent.
589        """
590        aps = schema.actions[self.name].params
591        args = [
592            f"{key}={aps[key].format_param(value)}" for key, value in self.args.items()
593        ]
594
595        return f"{self.name}({', '.join(args)})"

Format the table action as a human-readable string.

The result is formatted to look like a function call:

name(param1=value1, ...)

Where name is the action name, and (param<N>, value<N>) are the action parameters. The format of value<N> is schema-dependent.

@decodable('table_entry')
@dataclass(slots=True)
class P4TableEntry(finsy.p4entity._P4Writable):
 885@decodable("table_entry")
 886@dataclass(slots=True)
 887class P4TableEntry(_P4Writable):
 888    """Represents a P4Runtime table entry.
 889
 890    Attributes:
 891        table_id (str): Name of the table.
 892        match (P4TableMatch | None): Entry's match fields.
 893        action (P4TableAction | P4IndirectAction | None): Entry's action.
 894        is_default_action (bool): True if entry is the default table entry.
 895        priority (int): Priority of a table entry when match implies TCAM lookup.
 896        metadata (bytes): Arbitrary controller cookie (1.2.0).
 897        controller_metadata (int): Deprecated controller cookie (< 1.2.0).
 898        meter_config (P4MeterConfig | None): Meter configuration.
 899        counter_data (P4CounterData | None): Counter data for table entry.
 900        meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0).
 901        idle_timeout_ns (int): Idle timeout in nanoseconds.
 902        time_since_last_hit (int | None): Nanoseconds since entry last matched.
 903        is_const (bool): True if entry is constant (1.4.0).
 904
 905    The most commonly used fields are table_id, match, action, is_default_action,
 906    and priority. See the P4Runtime Spec for usage examples regarding the other
 907    attributes.
 908
 909    When writing a P4TableEntry, you can specify the type of update using '+',
 910    '-', and '~'.
 911
 912    Examples:
 913    ```
 914    # Specify all tables when using "read".
 915    entry = fy.P4TableEntry()
 916
 917    # Specify the table named "ipv4" when using "read".
 918    entry = fy.P4TableEntry("ipv4")
 919
 920    # Specify the default entry in the "ipv4" table when using "read".
 921    entry = fy.P4TableEntry("ipv4", is_default_action=True)
 922
 923    # Insert an entry into the "ipv4" table.
 924    update = +fy.P4TableEntry(
 925        "ipv4",
 926        match=fy.Match(ipv4_dst="10.0.0.0/8"),
 927        action=fy.Action("forward", port=1),
 928    )
 929
 930    # Modify the default action in the "ipv4" table.
 931    update = ~fy.P4TableEntry(
 932        "ipv4",
 933        action=fy.Action("forward", port=5),
 934        is_default_action=True
 935    )
 936    ```
 937
 938    Operators:
 939        You can retrieve a match field from a table entry using `[]`. For
 940        example, `entry["ipv4_dst"]` is the same as `entry.match["ipv4_dst"]`.
 941
 942    Formatting Helpers:
 943        The `match_str` and `action_str` methods provide P4Info-aware formatting
 944        of the match and action attributes.
 945    """
 946
 947    table_id: str = ""
 948    "Name of the table."
 949    _: KW_ONLY
 950    match: P4TableMatch | None = None
 951    "Entry's match fields."
 952    action: P4TableAction | P4IndirectAction | None = None
 953    "Entry's action."
 954    is_default_action: bool = False
 955    "True if entry is the default table entry."
 956    priority: int = 0
 957    "Priority of a table entry when match implies TCAM lookup."
 958    metadata: bytes = b""
 959    "Arbitrary controller cookie. (1.2.0)."
 960    controller_metadata: int = 0
 961    "Deprecated controller cookie (< 1.2.0)."
 962    meter_config: P4MeterConfig | None = None
 963    "Meter configuration."
 964    counter_data: P4CounterData | None = None
 965    "Counter data for table entry."
 966    meter_counter_data: P4MeterCounterData | None = None
 967    "Meter counter data (1.4.0)."
 968    idle_timeout_ns: int = 0
 969    "Idle timeout in nanoseconds."
 970    time_since_last_hit: int | None = None
 971    "Nanoseconds since entry last matched."
 972    is_const: bool = False
 973    "True if entry is constant (1.4.0)."
 974
 975    def __getitem__(self, key: str) -> Any:
 976        "Convenience accessor to retrieve a value from the `match` property."
 977        if self.match is not None:
 978            return self.match[key]
 979        raise KeyError(key)
 980
 981    def encode(self, schema: P4Schema) -> p4r.Entity:
 982        "Encode TableEntry data as protobuf."
 983        return p4r.Entity(table_entry=self.encode_entry(schema))
 984
 985    def encode_entry(self, schema: P4Schema) -> p4r.TableEntry:
 986        "Encode TableEntry data as protobuf."
 987        if not self.table_id:
 988            return self._encode_empty()
 989
 990        table = schema.tables[self.table_id]
 991
 992        if self.match:
 993            match = self.match.encode(table)
 994        else:
 995            match = None
 996
 997        if self.action:
 998            action = self.action.encode_table_action(table)
 999        else:
1000            action = None
1001
1002        if self.meter_config:
1003            meter_config = self.meter_config.encode()
1004        else:
1005            meter_config = None
1006
1007        if self.counter_data:
1008            counter_data = self.counter_data.encode()
1009        else:
1010            counter_data = None
1011
1012        if self.meter_counter_data:
1013            meter_counter_data = self.meter_counter_data.encode()
1014        else:
1015            meter_counter_data = None
1016
1017        if self.time_since_last_hit is not None:
1018            time_since_last_hit = p4r.TableEntry.IdleTimeout(
1019                elapsed_ns=self.time_since_last_hit
1020            )
1021        else:
1022            time_since_last_hit = None
1023
1024        return p4r.TableEntry(
1025            table_id=table.id,
1026            match=match,
1027            action=action,
1028            priority=self.priority,
1029            controller_metadata=self.controller_metadata,
1030            meter_config=meter_config,
1031            counter_data=counter_data,
1032            meter_counter_data=meter_counter_data,
1033            is_default_action=self.is_default_action,
1034            idle_timeout_ns=self.idle_timeout_ns,
1035            time_since_last_hit=time_since_last_hit,
1036            metadata=self.metadata,
1037            is_const=self.is_const,
1038        )
1039
1040    def _encode_empty(self) -> p4r.TableEntry:
1041        "Encode an empty wildcard request."
1042        if self.counter_data is not None:
1043            counter_data = self.counter_data.encode()
1044        else:
1045            counter_data = None
1046
1047        # FIXME: time_since_last_hit not supported for wildcard reads?
1048        if self.time_since_last_hit is not None:
1049            time_since_last_hit = p4r.TableEntry.IdleTimeout(
1050                elapsed_ns=self.time_since_last_hit
1051            )
1052        else:
1053            time_since_last_hit = None
1054
1055        return p4r.TableEntry(
1056            counter_data=counter_data,
1057            time_since_last_hit=time_since_last_hit,
1058        )
1059
1060    @classmethod
1061    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1062        "Decode protobuf to TableEntry data."
1063        return cls.decode_entry(msg.table_entry, schema)
1064
1065    @classmethod
1066    def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self:
1067        "Decode protobuf to TableEntry data."
1068        if entry.table_id == 0:
1069            return cls("")
1070
1071        table = schema.tables[entry.table_id]
1072
1073        if entry.match:
1074            match = P4TableMatch.decode(entry.match, table)
1075        else:
1076            match = None
1077
1078        if entry.HasField("action"):
1079            action = P4TableAction.decode_table_action(entry.action, table)
1080        else:
1081            action = None
1082
1083        if entry.HasField("time_since_last_hit"):
1084            last_hit = entry.time_since_last_hit.elapsed_ns
1085        else:
1086            last_hit = None
1087
1088        if entry.HasField("meter_config"):
1089            meter_config = P4MeterConfig.decode(entry.meter_config)
1090        else:
1091            meter_config = None
1092
1093        if entry.HasField("counter_data"):
1094            counter_data = P4CounterData.decode(entry.counter_data)
1095        else:
1096            counter_data = None
1097
1098        if entry.HasField("meter_counter_data"):
1099            meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data)
1100        else:
1101            meter_counter_data = None
1102
1103        return cls(
1104            table_id=table.alias,
1105            match=match,
1106            action=action,
1107            priority=entry.priority,
1108            controller_metadata=entry.controller_metadata,
1109            meter_config=meter_config,
1110            counter_data=counter_data,
1111            meter_counter_data=meter_counter_data,
1112            is_default_action=entry.is_default_action,
1113            idle_timeout_ns=entry.idle_timeout_ns,
1114            time_since_last_hit=last_hit,
1115            metadata=entry.metadata,
1116            is_const=entry.is_const,
1117        )
1118
1119    def match_dict(
1120        self,
1121        schema: P4Schema,
1122        *,
1123        wildcard: str | None = None,
1124    ) -> dict[str, str]:
1125        """Format the match fields as a dictionary of strings.
1126
1127        If `wildcard` is None, only include match fields that have values. If
1128        `wildcard` is set, include all field names but replace unset values with
1129        given wildcard value (e.g. "*")
1130        """
1131        table = schema.tables[self.table_id]
1132        if self.match is not None:
1133            return self.match.format_dict(table, wildcard=wildcard)
1134        return P4TableMatch().format_dict(table, wildcard=wildcard)
1135
1136    def match_str(
1137        self,
1138        schema: P4Schema,
1139        *,
1140        wildcard: str | None = None,
1141    ) -> str:
1142        "Format the match fields as a human-readable, canonical string."
1143        table = schema.tables[self.table_id]
1144        if self.match is not None:
1145            return self.match.format_str(table, wildcard=wildcard)
1146        return P4TableMatch().format_str(table, wildcard=wildcard)
1147
1148    def action_str(self, schema: P4Schema) -> str:
1149        "Format the actions as a human-readable, canonical string."
1150        table = schema.tables[self.table_id]
1151        if self.action is None:
1152            return NOACTION_STR
1153        return self.action.format_str(table)

Represents a P4Runtime table entry.

Attributes: table_id (str): Name of the table. match (P4TableMatch | None): Entry's match fields. action (P4TableAction | P4IndirectAction | None): Entry's action. is_default_action (bool): True if entry is the default table entry. priority (int): Priority of a table entry when match implies TCAM lookup. metadata (bytes): Arbitrary controller cookie (1.2.0). controller_metadata (int): Deprecated controller cookie (< 1.2.0). meter_config (P4MeterConfig | None): Meter configuration. counter_data (P4CounterData | None): Counter data for table entry. meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0). idle_timeout_ns (int): Idle timeout in nanoseconds. time_since_last_hit (int | None): Nanoseconds since entry last matched. is_const (bool): True if entry is constant (1.4.0).

The most commonly used fields are table_id, match, action, is_default_action, and priority. See the P4Runtime Spec for usage examples regarding the other attributes.

When writing a P4TableEntry, you can specify the type of update using '+', '-', and '~'.

Examples:

# Specify all tables when using "read".
entry = fy.P4TableEntry()

# Specify the table named "ipv4" when using "read".
entry = fy.P4TableEntry("ipv4")

# Specify the default entry in the "ipv4" table when using "read".
entry = fy.P4TableEntry("ipv4", is_default_action=True)

# Insert an entry into the "ipv4" table.
update = +fy.P4TableEntry(
    "ipv4",
    match=fyfinsy.Match(ipv4_dst="10.0.0.0/8"),
    action=fyfinsy.Action("forward", port=1),
)

# Modify the default action in the "ipv4" table.
update = ~fy.P4TableEntry(
    "ipv4",
    action=fyfinsy.Action("forward", port=5),
    is_default_action=True
)

Operators: You can retrieve a match field from a table entry using []. For example, entry["ipv4_dst"] is the same as entry.match["ipv4_dst"].

Formatting Helpers: The match_str and action_str methods provide P4Info-aware formatting of the match and action attributes.

P4TableEntry( table_id: str = '', *, match: P4TableMatch | None = None, action: P4TableAction | P4IndirectAction | None = None, is_default_action: bool = False, priority: int = 0, metadata: bytes = b'', controller_metadata: int = 0, meter_config: P4MeterConfig | None = None, counter_data: P4CounterData | None = None, meter_counter_data: P4MeterCounterData | None = None, idle_timeout_ns: int = 0, time_since_last_hit: int | None = None, is_const: bool = False)
table_id: str

Name of the table.

match: P4TableMatch | None

Entry's match fields.

action: P4TableAction | P4IndirectAction | None

Entry's action.

is_default_action: bool

True if entry is the default table entry.

priority: int

Priority of a table entry when match implies TCAM lookup.

metadata: bytes

Arbitrary controller cookie. (1.2.0).

controller_metadata: int

Deprecated controller cookie (< 1.2.0).

meter_config: P4MeterConfig | None

Meter configuration.

counter_data: P4CounterData | None

Counter data for table entry.

meter_counter_data: P4MeterCounterData | None

Meter counter data (1.4.0).

idle_timeout_ns: int

Idle timeout in nanoseconds.

time_since_last_hit: int | None

Nanoseconds since entry last matched.

is_const: bool

True if entry is constant (1.4.0).

def __getitem__(self, key: str) -> Any:
975    def __getitem__(self, key: str) -> Any:
976        "Convenience accessor to retrieve a value from the `match` property."
977        if self.match is not None:
978            return self.match[key]
979        raise KeyError(key)

Convenience accessor to retrieve a value from the match property.

def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
981    def encode(self, schema: P4Schema) -> p4r.Entity:
982        "Encode TableEntry data as protobuf."
983        return p4r.Entity(table_entry=self.encode_entry(schema))

Encode TableEntry data as protobuf.

def encode_entry(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.TableEntry:
 985    def encode_entry(self, schema: P4Schema) -> p4r.TableEntry:
 986        "Encode TableEntry data as protobuf."
 987        if not self.table_id:
 988            return self._encode_empty()
 989
 990        table = schema.tables[self.table_id]
 991
 992        if self.match:
 993            match = self.match.encode(table)
 994        else:
 995            match = None
 996
 997        if self.action:
 998            action = self.action.encode_table_action(table)
 999        else:
1000            action = None
1001
1002        if self.meter_config:
1003            meter_config = self.meter_config.encode()
1004        else:
1005            meter_config = None
1006
1007        if self.counter_data:
1008            counter_data = self.counter_data.encode()
1009        else:
1010            counter_data = None
1011
1012        if self.meter_counter_data:
1013            meter_counter_data = self.meter_counter_data.encode()
1014        else:
1015            meter_counter_data = None
1016
1017        if self.time_since_last_hit is not None:
1018            time_since_last_hit = p4r.TableEntry.IdleTimeout(
1019                elapsed_ns=self.time_since_last_hit
1020            )
1021        else:
1022            time_since_last_hit = None
1023
1024        return p4r.TableEntry(
1025            table_id=table.id,
1026            match=match,
1027            action=action,
1028            priority=self.priority,
1029            controller_metadata=self.controller_metadata,
1030            meter_config=meter_config,
1031            counter_data=counter_data,
1032            meter_counter_data=meter_counter_data,
1033            is_default_action=self.is_default_action,
1034            idle_timeout_ns=self.idle_timeout_ns,
1035            time_since_last_hit=time_since_last_hit,
1036            metadata=self.metadata,
1037            is_const=self.is_const,
1038        )

Encode TableEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1060    @classmethod
1061    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1062        "Decode protobuf to TableEntry data."
1063        return cls.decode_entry(msg.table_entry, schema)

Decode protobuf to TableEntry data.

@classmethod
def decode_entry( cls, entry: p4.v1.p4runtime_pb2.TableEntry, schema: P4Schema) -> typing_extensions.Self:
1065    @classmethod
1066    def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self:
1067        "Decode protobuf to TableEntry data."
1068        if entry.table_id == 0:
1069            return cls("")
1070
1071        table = schema.tables[entry.table_id]
1072
1073        if entry.match:
1074            match = P4TableMatch.decode(entry.match, table)
1075        else:
1076            match = None
1077
1078        if entry.HasField("action"):
1079            action = P4TableAction.decode_table_action(entry.action, table)
1080        else:
1081            action = None
1082
1083        if entry.HasField("time_since_last_hit"):
1084            last_hit = entry.time_since_last_hit.elapsed_ns
1085        else:
1086            last_hit = None
1087
1088        if entry.HasField("meter_config"):
1089            meter_config = P4MeterConfig.decode(entry.meter_config)
1090        else:
1091            meter_config = None
1092
1093        if entry.HasField("counter_data"):
1094            counter_data = P4CounterData.decode(entry.counter_data)
1095        else:
1096            counter_data = None
1097
1098        if entry.HasField("meter_counter_data"):
1099            meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data)
1100        else:
1101            meter_counter_data = None
1102
1103        return cls(
1104            table_id=table.alias,
1105            match=match,
1106            action=action,
1107            priority=entry.priority,
1108            controller_metadata=entry.controller_metadata,
1109            meter_config=meter_config,
1110            counter_data=counter_data,
1111            meter_counter_data=meter_counter_data,
1112            is_default_action=entry.is_default_action,
1113            idle_timeout_ns=entry.idle_timeout_ns,
1114            time_since_last_hit=last_hit,
1115            metadata=entry.metadata,
1116            is_const=entry.is_const,
1117        )

Decode protobuf to TableEntry data.

def match_dict( self, schema: P4Schema, *, wildcard: str | None = None) -> dict[str, str]:
1119    def match_dict(
1120        self,
1121        schema: P4Schema,
1122        *,
1123        wildcard: str | None = None,
1124    ) -> dict[str, str]:
1125        """Format the match fields as a dictionary of strings.
1126
1127        If `wildcard` is None, only include match fields that have values. If
1128        `wildcard` is set, include all field names but replace unset values with
1129        given wildcard value (e.g. "*")
1130        """
1131        table = schema.tables[self.table_id]
1132        if self.match is not None:
1133            return self.match.format_dict(table, wildcard=wildcard)
1134        return P4TableMatch().format_dict(table, wildcard=wildcard)

Format the match fields as a dictionary of strings.

If wildcard is None, only include match fields that have values. If wildcard is set, include all field names but replace unset values with given wildcard value (e.g. "*")

def match_str( self, schema: P4Schema, *, wildcard: str | None = None) -> str:
1136    def match_str(
1137        self,
1138        schema: P4Schema,
1139        *,
1140        wildcard: str | None = None,
1141    ) -> str:
1142        "Format the match fields as a human-readable, canonical string."
1143        table = schema.tables[self.table_id]
1144        if self.match is not None:
1145            return self.match.format_str(table, wildcard=wildcard)
1146        return P4TableMatch().format_str(table, wildcard=wildcard)

Format the match fields as a human-readable, canonical string.

def action_str(self, schema: P4Schema) -> str:
1148    def action_str(self, schema: P4Schema) -> str:
1149        "Format the actions as a human-readable, canonical string."
1150        table = schema.tables[self.table_id]
1151        if self.action is None:
1152            return NOACTION_STR
1153        return self.action.format_str(table)

Format the actions as a human-readable, canonical string.

Inherited Members
finsy.p4entity._P4Writable
encode_update
Match = <class 'P4TableMatch'>

Match is an alias for P4TableMatch.

class P4TableMatch(dict[str, typing.Any]):
292class P4TableMatch(dict[str, Any]):
293    """Represents a set of P4Runtime field matches.
294
295    Each match field is stored as a dictionary key, where the key is the name
296    of the match field. The field's value should be appropriate for the type
297    of match (EXACT, LPM, TERNARY, etc.)
298
299    Construct a match similar to a dictionary.
300
301    Example:
302    ```
303    # Keyword arguments:
304    match = P4TableMatch(ipv4_dst="10.0.0.1")
305
306    # Dictionary argument:
307    match = P4TableMatch({"ipv4_dst": "10.0.0.1"})
308
309    # List of 2-tuples:
310    match = P4TableMatch([("ipv4_dst", "10.0.0.1")])
311    ```
312
313    P4TableMatch is implemented as a subclass of `dict`. It supports all of the
314    standard dictionary methods:
315    ```
316    match = P4TableMatch()
317    match["ipv4_dst"] = "10.0.0.1"
318    assert len(match) == 1
319    ```
320
321    Reference "9.1.1 Match Format":
322        Each match field is translated to a FieldMatch Protobuf message by
323        translating the entry key to a `field_id`. The type of match (EXACT,
324        LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the
325        value is converted to the Protobuf representation.
326
327    Supported String Values:
328    ```
329    EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"
330
331    LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
332            "00:00:00:00:00:01/48"
333        (+ all exact formats are promoted to all-1 masks)
334
335    TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
336        "2000::1/&128", "00:00:00:00:00:01/&48"
337        (+ all exact formats are promoted to all-1 masks)
338        (+ all lpm formats are promoted to the specified contiguous mask)
339
340    RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
341        "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
342        (+ all exact formats are promoted to single-value ranges)
343
344    OPTIONAL: Same as exact format.
345    ```
346
347    See the `p4values.py` module for all supported value classes.
348
349    TODO:
350        - Change range delimiter to '-' (and drop '-' delimited MAC's).
351        - Consider supporting ternary values with just '/' (and drop support
352          for decimal masks; mask must be hexadecimal number).
353
354    See Also:
355        - P4TableEntry
356    """
357
358    def encode(self, table: P4Table) -> list[p4r.FieldMatch]:
359        "Encode TableMatch data as a list of Protobuf fields."
360        result: list[p4r.FieldMatch] = []
361        match_fields = table.match_fields
362
363        for key, value in self.items():
364            try:
365                field = match_fields[key].encode_field(value)
366                if field is not None:
367                    result.append(field)
368            except Exception as ex:
369                raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex
370
371        return result
372
373    @classmethod
374    def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self:
375        "Decode Protobuf fields as TableMatch data."
376        result: dict[str, Any] = {}
377        match_fields = table.match_fields
378
379        for field in msgs:
380            fld = match_fields[field.field_id]
381            result[fld.alias] = fld.decode_field(field)
382
383        return cls(result)
384
385    def format_dict(
386        self,
387        table: P4Table,
388        *,
389        wildcard: str | None = None,
390    ) -> dict[str, str]:
391        """Format the table match fields as a human-readable dictionary.
392
393        The result is a dictionary showing the TableMatch data for fields
394        included in the match. If `wildcard` is specified, all fields defined
395        in P4Info will be included with their value set to the wildcard string.
396
397        Values are formatted using the format/type specified in P4Info.
398        """
399        result: dict[str, str] = {}
400
401        for fld in table.match_fields:
402            value = self.get(fld.alias, None)
403            if value is not None:
404                result[fld.alias] = fld.format_field(value)
405            elif wildcard is not None:
406                result[fld.alias] = wildcard
407
408        return result
409
410    def format_str(
411        self,
412        table: P4Table,
413        *,
414        wildcard: str | None = None,
415    ) -> str:
416        """Format the table match fields as a human-readable string.
417
418        The result is a string showing the TableMatch data for fields included
419        in the match. If `wildcard` is specified, all fields defined in P4Info
420        will be included with their value set to the wildcard string.
421
422        All fields are formatted as "name=value" and they are delimited by
423        spaces.
424
425        Values are formatted using the format/type specified in P4Info.
426        """
427        result: list[str] = []
428
429        for fld in table.match_fields:
430            value = self.get(fld.alias, None)
431            if value is not None:
432                result.append(f"{fld.alias}={fld.format_field(value)}")
433            elif wildcard is not None:
434                result.append(f"{fld.alias}={wildcard}")
435
436        return " ".join(result)

Represents a set of P4Runtime field matches.

Each match field is stored as a dictionary key, where the key is the name of the match field. The field's value should be appropriate for the type of match (EXACT, LPM, TERNARY, etc.)

Construct a match similar to a dictionary.

Example:

# Keyword arguments:
match = P4TableMatch(ipv4_dst="10.0.0.1")

# Dictionary argument:
match = P4TableMatch({"ipv4_dst": "10.0.0.1"})

# List of 2-tuples:
match = P4TableMatch([("ipv4_dst", "10.0.0.1")])

P4TableMatch is implemented as a subclass of dict. It supports all of the standard dictionary methods:

match = P4TableMatch()
match["ipv4_dst"] = "10.0.0.1"
assert len(match) == 1

Reference "9.1.1 Match Format": Each match field is translated to a FieldMatch Protobuf message by translating the entry key to a field_id. The type of match (EXACT, LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the value is converted to the Protobuf representation.

Supported String Values:

EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"

LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
        "00:00:00:00:00:01/48"
    (+ all exact formats are promoted to all-1 masks)

TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
    "2000::1/&128", "00:00:00:00:00:01/&48"
    (+ all exact formats are promoted to all-1 masks)
    (+ all lpm formats are promoted to the specified contiguous mask)

RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
    "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
    (+ all exact formats are promoted to single-value ranges)

OPTIONAL: Same as exact format.

See the p4values.py module for all supported value classes.

TODO: - Change range delimiter to '-' (and drop '-' delimited MAC's). - Consider supporting ternary values with just '/' (and drop support for decimal masks; mask must be hexadecimal number).

See Also: - P4TableEntry

def encode( self, table: finsy.p4schema.P4Table) -> list[p4.v1.p4runtime_pb2.FieldMatch]:
358    def encode(self, table: P4Table) -> list[p4r.FieldMatch]:
359        "Encode TableMatch data as a list of Protobuf fields."
360        result: list[p4r.FieldMatch] = []
361        match_fields = table.match_fields
362
363        for key, value in self.items():
364            try:
365                field = match_fields[key].encode_field(value)
366                if field is not None:
367                    result.append(field)
368            except Exception as ex:
369                raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex
370
371        return result

Encode TableMatch data as a list of Protobuf fields.

@classmethod
def decode( cls, msgs: Iterable[p4.v1.p4runtime_pb2.FieldMatch], table: finsy.p4schema.P4Table) -> typing_extensions.Self:
373    @classmethod
374    def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self:
375        "Decode Protobuf fields as TableMatch data."
376        result: dict[str, Any] = {}
377        match_fields = table.match_fields
378
379        for field in msgs:
380            fld = match_fields[field.field_id]
381            result[fld.alias] = fld.decode_field(field)
382
383        return cls(result)

Decode Protobuf fields as TableMatch data.

def format_dict( self, table: finsy.p4schema.P4Table, *, wildcard: str | None = None) -> dict[str, str]:
385    def format_dict(
386        self,
387        table: P4Table,
388        *,
389        wildcard: str | None = None,
390    ) -> dict[str, str]:
391        """Format the table match fields as a human-readable dictionary.
392
393        The result is a dictionary showing the TableMatch data for fields
394        included in the match. If `wildcard` is specified, all fields defined
395        in P4Info will be included with their value set to the wildcard string.
396
397        Values are formatted using the format/type specified in P4Info.
398        """
399        result: dict[str, str] = {}
400
401        for fld in table.match_fields:
402            value = self.get(fld.alias, None)
403            if value is not None:
404                result[fld.alias] = fld.format_field(value)
405            elif wildcard is not None:
406                result[fld.alias] = wildcard
407
408        return result

Format the table match fields as a human-readable dictionary.

The result is a dictionary showing the TableMatch data for fields included in the match. If wildcard is specified, all fields defined in P4Info will be included with their value set to the wildcard string.

Values are formatted using the format/type specified in P4Info.

def format_str( self, table: finsy.p4schema.P4Table, *, wildcard: str | None = None) -> str:
410    def format_str(
411        self,
412        table: P4Table,
413        *,
414        wildcard: str | None = None,
415    ) -> str:
416        """Format the table match fields as a human-readable string.
417
418        The result is a string showing the TableMatch data for fields included
419        in the match. If `wildcard` is specified, all fields defined in P4Info
420        will be included with their value set to the wildcard string.
421
422        All fields are formatted as "name=value" and they are delimited by
423        spaces.
424
425        Values are formatted using the format/type specified in P4Info.
426        """
427        result: list[str] = []
428
429        for fld in table.match_fields:
430            value = self.get(fld.alias, None)
431            if value is not None:
432                result.append(f"{fld.alias}={fld.format_field(value)}")
433            elif wildcard is not None:
434                result.append(f"{fld.alias}={wildcard}")
435
436        return " ".join(result)

Format the table match fields as a human-readable string.

The result is a string showing the TableMatch data for fields included in the match. If wildcard is specified, all fields defined in P4Info will be included with their value set to the wildcard string.

All fields are formatted as "name=value" and they are delimited by spaces.

Values are formatted using the format/type specified in P4Info.

Inherited Members
builtins.dict
__iter__
__len__
__getitem__
__contains__
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
@decodable('value_set_entry')
@dataclass(slots=True)
class P4ValueSetEntry(finsy.p4entity._P4ModifyOnly):
1848@decodable("value_set_entry")
1849@dataclass(slots=True)
1850class P4ValueSetEntry(_P4ModifyOnly):
1851    "Represents a P4Runtime ValueSetEntry."
1852
1853    value_set_id: str
1854    _: KW_ONLY
1855    members: list[P4ValueSetMember]
1856
1857    def encode(self, schema: P4Schema) -> p4r.Entity:
1858        "Encode P4ValueSetEntry as protobuf."
1859        value_set = schema.value_sets[self.value_set_id]
1860        members = [
1861            p4r.ValueSetMember(match=member.encode(value_set))
1862            for member in self.members
1863        ]
1864
1865        return p4r.Entity(
1866            value_set_entry=p4r.ValueSetEntry(
1867                value_set_id=value_set.id, members=members
1868            )
1869        )
1870
1871    @classmethod
1872    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1873        "Decode protobuf to P4ValueSetEntry."
1874        entry = msg.value_set_entry
1875        value_set = schema.value_sets[entry.value_set_id]
1876
1877        members = [
1878            P4ValueSetMember.decode(member.match, value_set) for member in entry.members
1879        ]
1880
1881        return cls(value_set.alias, members=members)

Represents a P4Runtime ValueSetEntry.

P4ValueSetEntry(value_set_id: str, *, members: list[finsy.p4entity.P4ValueSetMember])
value_set_id: str
members: list[finsy.p4entity.P4ValueSetMember]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1857    def encode(self, schema: P4Schema) -> p4r.Entity:
1858        "Encode P4ValueSetEntry as protobuf."
1859        value_set = schema.value_sets[self.value_set_id]
1860        members = [
1861            p4r.ValueSetMember(match=member.encode(value_set))
1862            for member in self.members
1863        ]
1864
1865        return p4r.Entity(
1866            value_set_entry=p4r.ValueSetEntry(
1867                value_set_id=value_set.id, members=members
1868            )
1869        )

Encode P4ValueSetEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1871    @classmethod
1872    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1873        "Decode protobuf to P4ValueSetEntry."
1874        entry = msg.value_set_entry
1875        value_set = schema.value_sets[entry.value_set_id]
1876
1877        members = [
1878            P4ValueSetMember.decode(member.match, value_set) for member in entry.members
1879        ]
1880
1881        return cls(value_set.alias, members=members)

Decode protobuf to P4ValueSetEntry.

Inherited Members
finsy.p4entity._P4ModifyOnly
encode_update
class P4ConfigAction(finsy.grpcutil._EnumBase):
110class P4ConfigAction(_EnumBase):
111    "IntEnum equivalent to `p4r.SetForwardingPipelineConfigRequest.Action`."
112    UNSPECIFIED = p4r.SetForwardingPipelineConfigRequest.Action.UNSPECIFIED
113    VERIFY = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY
114    VERIFY_AND_SAVE = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_SAVE
115    VERIFY_AND_COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT
116    COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.COMMIT
117    RECONCILE_AND_COMMIT = (
118        p4r.SetForwardingPipelineConfigRequest.Action.RECONCILE_AND_COMMIT
119    )
120
121    def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType:
122        "Cast `self` to `ValueType`."
123        return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)

IntEnum equivalent to p4r.SetForwardingPipelineConfigRequest.Action.

VERIFY_AND_SAVE = P4ConfigAction.VERIFY_AND_SAVE
VERIFY_AND_COMMIT = P4ConfigAction.VERIFY_AND_COMMIT
RECONCILE_AND_COMMIT = P4ConfigAction.RECONCILE_AND_COMMIT
def vt(self) -> int:
121    def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType:
122        "Cast `self` to `ValueType`."
123        return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)

Cast self to ValueType.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
class P4Schema(finsy.p4schema._ReprMixin):
479class P4Schema(_ReprMixin):
480    """Represents a P4Info file and its associated P4 blob (optional).
481
482    ```
483    p4 = P4Schema(Path("basic.p4info.txtpb"))
484    ```
485
486    This class parses the P4Info contents to produce an in-memory representation
487    of the tables, actions, types, etc. inside. This in-memory graph of the
488    contents of the P4Info file may be shared when we parse identical
489    P4Info files. The sharing of P4Info data is controlled by the
490    `P4SchemaCache` class.
491    """
492
493    _p4info: p4i.P4Info | None
494    _p4blob: Path | bytes | SupportsBytes | None
495    _p4defs: _P4Defs  # possibly shared in-memory representation
496    _p4cookie: int = 0
497
498    def __init__(
499        self,
500        p4info: p4i.P4Info | Path | None = None,
501        p4blob: Path | bytes | SupportsBytes | None = None,
502    ):
503        self._p4blob = p4blob
504        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
505            p4info,
506            self._p4blob,
507        )
508
509    @property
510    def exists(self) -> bool:
511        "True if p4info is configured."
512        return self._p4info is not None
513
514    @property
515    def is_authoritative(self) -> bool:
516        "True if both p4info and p4blob are configured."
517        return self._p4info is not None and self._p4blob is not None
518
519    @property
520    def p4info(self) -> p4i.P4Info:
521        "P4Info value."
522        if self._p4info is None:
523            raise ValueError("No P4Info configured.")
524        return self._p4info
525
526    def set_p4info(self, p4info: p4i.P4Info) -> None:
527        "Set P4Info using value returned from switch."
528        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
529            p4info,
530            self._p4blob,
531        )
532
533    def has_p4info(self, p4info: p4i.P4Info) -> bool:
534        "Return true if the current P4Info equals the given P4Info."
535        if self._p4info is None:
536            return False
537        return self._p4info.SerializeToString(
538            deterministic=True
539        ) == p4info.SerializeToString(deterministic=True)
540
541    @property
542    def p4blob(self) -> bytes:
543        "P4Blob value a.k.a p4_device_config."
544        return _blob_bytes(self._p4blob)
545
546    @property
547    def p4cookie(self) -> int:
548        """Cookie value for p4info and p4blob."""
549        return self._p4cookie
550
551    def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig:
552        """The forwarding pipeline configuration."""
553        return p4r.ForwardingPipelineConfig(
554            p4info=self.p4info,
555            p4_device_config=self.p4blob,
556            cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie),
557        )
558
559    def get_pipeline_info(self) -> str:
560        "Concise string description of the pipeline (suitable for logging)."
561        if self.exists:
562            pipeline = self.name
563            version = self.version
564            arch = self.arch
565            return f"{pipeline=} {version=} {arch=}"
566
567        return "<No pipeline exists>"
568
569    @property
570    def name(self) -> str:
571        "Name from pkg_info."
572        if self._p4info is None:
573            return ""
574        return self._p4info.pkg_info.name
575
576    @property
577    def version(self) -> str:
578        "Version from pkg_info."
579        if self._p4info is None:
580            return ""
581        return self._p4info.pkg_info.version
582
583    @property
584    def arch(self) -> str:
585        "Arch from pkg_info."
586        if self._p4info is None:
587            return ""
588        return self._p4info.pkg_info.arch
589
590    @property
591    def pkg_info(self) -> p4i.PkgInfo:
592        """Protobuf message containing original `PkgInfo` header.
593
594        Use this to access less frequently used fields like `contact`, `url`,
595        and `platform_properties`.
596        """
597        if self._p4info is None:
598            raise ValueError("P4Info: No pipeline configured")
599        return self._p4info.pkg_info
600
601    @property
602    def tables(self) -> P4EntityMap["P4Table"]:
603        "Collection of P4 tables."
604        return self._p4defs.tables
605
606    @property
607    def actions(self) -> P4EntityMap["P4Action"]:
608        "Collection of P4 actions."
609        return self._p4defs.actions
610
611    @property
612    def action_profiles(self) -> P4EntityMap["P4ActionProfile"]:
613        "Collection of P4 action profiles."
614        return self._p4defs.action_profiles
615
616    @property
617    def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]:
618        "Collection of P4 controller packet metadata."
619        return self._p4defs.controller_packet_metadata
620
621    @property
622    def direct_counters(self) -> P4EntityMap["P4DirectCounter"]:
623        "Collection of P4 direct counters."
624        return self._p4defs.direct_counters
625
626    @property
627    def direct_meters(self) -> P4EntityMap["P4DirectMeter"]:
628        "Collection of P4 direct meters."
629        return self._p4defs.direct_meters
630
631    @property
632    def counters(self) -> P4EntityMap["P4Counter"]:
633        "Collection of P4 counters."
634        return self._p4defs.counters
635
636    @property
637    def meters(self) -> P4EntityMap["P4Meter"]:
638        "Collection of P4 meters."
639        return self._p4defs.meters
640
641    @property
642    def registers(self) -> P4EntityMap["P4Register"]:
643        "Collection of P4 registers."
644        return self._p4defs.registers
645
646    @property
647    def digests(self) -> P4EntityMap["P4Digest"]:
648        "Collection of P4 digests."
649        return self._p4defs.digests
650
651    @property
652    def value_sets(self) -> P4EntityMap["P4ValueSet"]:
653        "Collection of P4 value sets."
654        return self._p4defs.value_sets
655
656    @property
657    def type_info(self) -> "P4TypeInfo":
658        "Type Info object."
659        return self._p4defs.type_info
660
661    @property
662    def externs(self) -> "P4ExternMap":
663        "Collection of P4 extern instances."
664        return self._p4defs.externs
665
666    def __str__(self) -> str:
667        if self._p4info is None:
668            return "<P4Info: No pipeline configured>"
669        return str(P4SchemaDescription(self))

Represents a P4Info file and its associated P4 blob (optional).

p4 = P4Schema(Path("basic.p4info.txtpb"))

This class parses the P4Info contents to produce an in-memory representation of the tables, actions, types, etc. inside. This in-memory graph of the contents of the P4Info file may be shared when we parse identical P4Info files. The sharing of P4Info data is controlled by the P4SchemaCache class.

P4Schema( p4info: p4.config.v1.p4info_pb2.P4Info | pathlib.Path | None = None, p4blob: pathlib.Path | bytes | typing.SupportsBytes | None = None)
498    def __init__(
499        self,
500        p4info: p4i.P4Info | Path | None = None,
501        p4blob: Path | bytes | SupportsBytes | None = None,
502    ):
503        self._p4blob = p4blob
504        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
505            p4info,
506            self._p4blob,
507        )
exists: bool
509    @property
510    def exists(self) -> bool:
511        "True if p4info is configured."
512        return self._p4info is not None

True if p4info is configured.

is_authoritative: bool
514    @property
515    def is_authoritative(self) -> bool:
516        "True if both p4info and p4blob are configured."
517        return self._p4info is not None and self._p4blob is not None

True if both p4info and p4blob are configured.

p4info: p4.config.v1.p4info_pb2.P4Info
519    @property
520    def p4info(self) -> p4i.P4Info:
521        "P4Info value."
522        if self._p4info is None:
523            raise ValueError("No P4Info configured.")
524        return self._p4info

P4Info value.

def set_p4info(self, p4info: p4.config.v1.p4info_pb2.P4Info) -> None:
526    def set_p4info(self, p4info: p4i.P4Info) -> None:
527        "Set P4Info using value returned from switch."
528        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
529            p4info,
530            self._p4blob,
531        )

Set P4Info using value returned from switch.

def has_p4info(self, p4info: p4.config.v1.p4info_pb2.P4Info) -> bool:
533    def has_p4info(self, p4info: p4i.P4Info) -> bool:
534        "Return true if the current P4Info equals the given P4Info."
535        if self._p4info is None:
536            return False
537        return self._p4info.SerializeToString(
538            deterministic=True
539        ) == p4info.SerializeToString(deterministic=True)

Return true if the current P4Info equals the given P4Info.

p4blob: bytes
541    @property
542    def p4blob(self) -> bytes:
543        "P4Blob value a.k.a p4_device_config."
544        return _blob_bytes(self._p4blob)

P4Blob value a.k.a p4_device_config.

p4cookie: int
546    @property
547    def p4cookie(self) -> int:
548        """Cookie value for p4info and p4blob."""
549        return self._p4cookie

Cookie value for p4info and p4blob.

def get_pipeline_config(self) -> p4.v1.p4runtime_pb2.ForwardingPipelineConfig:
551    def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig:
552        """The forwarding pipeline configuration."""
553        return p4r.ForwardingPipelineConfig(
554            p4info=self.p4info,
555            p4_device_config=self.p4blob,
556            cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie),
557        )

The forwarding pipeline configuration.

def get_pipeline_info(self) -> str:
559    def get_pipeline_info(self) -> str:
560        "Concise string description of the pipeline (suitable for logging)."
561        if self.exists:
562            pipeline = self.name
563            version = self.version
564            arch = self.arch
565            return f"{pipeline=} {version=} {arch=}"
566
567        return "<No pipeline exists>"

Concise string description of the pipeline (suitable for logging).

name: str
569    @property
570    def name(self) -> str:
571        "Name from pkg_info."
572        if self._p4info is None:
573            return ""
574        return self._p4info.pkg_info.name

Name from pkg_info.

version: str
576    @property
577    def version(self) -> str:
578        "Version from pkg_info."
579        if self._p4info is None:
580            return ""
581        return self._p4info.pkg_info.version

Version from pkg_info.

arch: str
583    @property
584    def arch(self) -> str:
585        "Arch from pkg_info."
586        if self._p4info is None:
587            return ""
588        return self._p4info.pkg_info.arch

Arch from pkg_info.

pkg_info: p4.config.v1.p4info_pb2.PkgInfo
590    @property
591    def pkg_info(self) -> p4i.PkgInfo:
592        """Protobuf message containing original `PkgInfo` header.
593
594        Use this to access less frequently used fields like `contact`, `url`,
595        and `platform_properties`.
596        """
597        if self._p4info is None:
598            raise ValueError("P4Info: No pipeline configured")
599        return self._p4info.pkg_info

Protobuf message containing original PkgInfo header.

Use this to access less frequently used fields like contact, url, and platform_properties.

tables: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Table]
601    @property
602    def tables(self) -> P4EntityMap["P4Table"]:
603        "Collection of P4 tables."
604        return self._p4defs.tables

Collection of P4 tables.

actions: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Action]
606    @property
607    def actions(self) -> P4EntityMap["P4Action"]:
608        "Collection of P4 actions."
609        return self._p4defs.actions

Collection of P4 actions.

action_profiles: finsy.p4schema.P4EntityMap[finsy.p4schema.P4ActionProfile]
611    @property
612    def action_profiles(self) -> P4EntityMap["P4ActionProfile"]:
613        "Collection of P4 action profiles."
614        return self._p4defs.action_profiles

Collection of P4 action profiles.

controller_packet_metadata: finsy.p4schema.P4EntityMap[finsy.p4schema.P4ControllerPacketMetadata]
616    @property
617    def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]:
618        "Collection of P4 controller packet metadata."
619        return self._p4defs.controller_packet_metadata

Collection of P4 controller packet metadata.

direct_counters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4DirectCounter]
621    @property
622    def direct_counters(self) -> P4EntityMap["P4DirectCounter"]:
623        "Collection of P4 direct counters."
624        return self._p4defs.direct_counters

Collection of P4 direct counters.

direct_meters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4DirectMeter]
626    @property
627    def direct_meters(self) -> P4EntityMap["P4DirectMeter"]:
628        "Collection of P4 direct meters."
629        return self._p4defs.direct_meters

Collection of P4 direct meters.

counters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Counter]
631    @property
632    def counters(self) -> P4EntityMap["P4Counter"]:
633        "Collection of P4 counters."
634        return self._p4defs.counters

Collection of P4 counters.

meters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Meter]
636    @property
637    def meters(self) -> P4EntityMap["P4Meter"]:
638        "Collection of P4 meters."
639        return self._p4defs.meters

Collection of P4 meters.

registers: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Register]
641    @property
642    def registers(self) -> P4EntityMap["P4Register"]:
643        "Collection of P4 registers."
644        return self._p4defs.registers

Collection of P4 registers.

digests: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Digest]
646    @property
647    def digests(self) -> P4EntityMap["P4Digest"]:
648        "Collection of P4 digests."
649        return self._p4defs.digests

Collection of P4 digests.

value_sets: finsy.p4schema.P4EntityMap[finsy.p4schema.P4ValueSet]
651    @property
652    def value_sets(self) -> P4EntityMap["P4ValueSet"]:
653        "Collection of P4 value sets."
654        return self._p4defs.value_sets

Collection of P4 value sets.

type_info: finsy.p4schema.P4TypeInfo
656    @property
657    def type_info(self) -> "P4TypeInfo":
658        "Type Info object."
659        return self._p4defs.type_info

Type Info object.

externs: finsy.p4schema.P4ExternMap
661    @property
662    def externs(self) -> "P4ExternMap":
663        "Collection of P4 extern instances."
664        return self._p4defs.externs

Collection of P4 extern instances.

@final
class Switch:
 157@final
 158class Switch:
 159    """Represents a P4Runtime Switch.
 160
 161    A `Switch` is constructed with a `name`, `address` and an optional
 162    `SwitchOptions` configuration.
 163
 164    The `name` is up to the user but should uniquely identify the switch.
 165
 166    The `address` identifies the target endpoint of the GRPC channel. It should
 167    have the format "<address>:<port>" where <address> can be a domain name,
 168    IPv4 address, or IPv6 address in square brackets.
 169
 170    The `options` is a `SwitchOptions` object that specifies how the `Switch`
 171    will behave.
 172
 173    ```
 174    opts = SwitchOptions(p4info=..., p4blob=...)
 175    sw1 = Switch('sw1', '10.0.0.1:50000', opts)
 176    ```
 177
 178    Each switch object has an event emitter `ee`. Use the EventEmitter to listen
 179    for port change events like PORT_UP and PORT_DOWN. See the `SwitchEvent`
 180    class for a list of support switch events.
 181    """
 182
 183    _name: str
 184    _address: str
 185    _options: SwitchOptions
 186    _stash: dict[str, Any]
 187    _ee: "SwitchEmitter"
 188    _p4client: P4Client | None
 189    _p4schema: P4Schema
 190    _tasks: "SwitchTasks | None"
 191    _packet_queues: list[tuple[Callable[[bytes], bool], Queue[p4entity.P4PacketIn]]]
 192    _digest_queues: dict[str, Queue[p4entity.P4DigestList]]
 193    _timeout_queue: Queue[p4entity.P4IdleTimeoutNotification] | None
 194    _arbitrator: "Arbitrator"
 195    _gnmi_client: GNMIClient | None
 196    _ports: SwitchPortList
 197    _is_channel_up: bool = False
 198    _api_version: ApiVersion = ApiVersion(1, 0, 0, "")
 199    _control_task: asyncio.Task[Any] | None = None
 200
 201    def __init__(
 202        self,
 203        name: str,
 204        address: str,
 205        options: SwitchOptions | None = None,
 206    ) -> None:
 207        if options is None:
 208            options = SwitchOptions()
 209
 210        self._name = name
 211        self._address = address
 212        self._options = options
 213        self._stash = {}
 214        self._ee = SwitchEmitter(self)
 215        self._p4client = None
 216        self._p4schema = P4Schema(options.p4info, options.p4blob)
 217        self._tasks = None
 218        self._packet_queues = []
 219        self._digest_queues = {}
 220        self._timeout_queue = None
 221        self._arbitrator = Arbitrator(
 222            options.initial_election_id, options.role_name, options.role_config
 223        )
 224        self._gnmi_client = None
 225        self._ports = SwitchPortList()
 226
 227    @property
 228    def name(self) -> str:
 229        "Name of the switch."
 230        return self._name
 231
 232    @property
 233    def address(self) -> str:
 234        "Address of the switch."
 235        return self._address
 236
 237    @property
 238    def options(self) -> SwitchOptions:
 239        "Switch options."
 240        return self._options
 241
 242    @options.setter
 243    def options(self, opts: SwitchOptions) -> None:
 244        "Set switch options to a new value."
 245        if self._p4client is not None:
 246            raise RuntimeError("Cannot change switch options while client is open.")
 247
 248        self._options = opts
 249        self._p4schema = P4Schema(opts.p4info, opts.p4blob)
 250        self._arbitrator = Arbitrator(
 251            opts.initial_election_id, opts.role_name, opts.role_config
 252        )
 253
 254    @property
 255    def stash(self) -> dict[str, Any]:
 256        "Switch stash, may be used to store per-switch data for any purpose."
 257        return self._stash
 258
 259    @property
 260    def ee(self) -> "SwitchEmitter":
 261        "Switch event emitter. See `SwitchEvent` for more details on events."
 262        return self._ee
 263
 264    @property
 265    def device_id(self) -> int:
 266        "Switch's device ID."
 267        return self._options.device_id
 268
 269    @property
 270    def is_up(self) -> bool:
 271        "True if switch is UP."
 272        return self._is_channel_up
 273
 274    @property
 275    def is_primary(self) -> bool:
 276        "True if switch is primary."
 277        return self._arbitrator.is_primary
 278
 279    @property
 280    def primary_id(self) -> int:
 281        "Election ID of switch that is currently primary."
 282        return self._arbitrator.primary_id
 283
 284    @property
 285    def election_id(self) -> int:
 286        "Switch's current election ID."
 287        return self._arbitrator.election_id
 288
 289    @property
 290    def role_name(self) -> str:
 291        "Switch's current role name."
 292        return self._arbitrator.role_name
 293
 294    @property
 295    def p4info(self) -> P4Schema:
 296        "Switch's P4 schema."
 297        return self._p4schema
 298
 299    @property
 300    def gnmi_client(self) -> GNMIClient | None:
 301        "Switch's gNMI client."
 302        return self._gnmi_client
 303
 304    @property
 305    def ports(self) -> SwitchPortList:
 306        "Switch's list of interfaces."
 307        return self._ports
 308
 309    @property
 310    def api_version(self) -> ApiVersion:
 311        "P4Runtime protocol version."
 312        return self._api_version
 313
 314    @overload
 315    async def read(
 316        self,
 317        entities: _ET,
 318    ) -> AsyncGenerator[_ET, None]:
 319        "Overload for read of a single P4Entity subtype."
 320        ...  # pragma: no cover
 321
 322    @overload
 323    async def read(
 324        self,
 325        entities: Iterable[_ET],
 326    ) -> AsyncGenerator[_ET, None]:
 327        "Overload for read of an iterable of the same P4Entity subtype."
 328        ...  # pragma: no cover
 329
 330    @overload
 331    async def read(
 332        self,
 333        entities: Iterable[p4entity.P4EntityList],
 334    ) -> AsyncGenerator[p4entity.P4Entity, None]:
 335        "Most general overload: we can't determine the return type exactly."
 336        ...  # pragma: no cover
 337
 338    async def read(
 339        self,
 340        entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity,
 341    ) -> AsyncGenerator[p4entity.P4Entity, None]:
 342        "Async iterator that reads entities from the switch."
 343        assert self._p4client is not None
 344
 345        if not entities:
 346            return
 347
 348        if isinstance(entities, p4entity.P4Entity):
 349            entities = [entities]
 350
 351        request = p4r.ReadRequest(
 352            device_id=self.device_id,
 353            entities=p4entity.encode_entities(entities, self.p4info),
 354        )
 355
 356        async for reply in self._p4client.request_iter(request):
 357            for ent in reply.entities:
 358                yield p4entity.decode_entity(ent, self.p4info)
 359
 360    async def read_packets(
 361        self,
 362        *,
 363        queue_size: int = _DEFAULT_QUEUE_SIZE,
 364        eth_types: Iterable[int] | None = None,
 365    ) -> AsyncIterator["p4entity.P4PacketIn"]:
 366        "Async iterator for incoming packets (P4PacketIn)."
 367        LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types)
 368
 369        if eth_types is None:
 370
 371            def _pkt_filter(_payload: bytes) -> bool:
 372                return True
 373
 374        else:
 375            _filter = {eth.to_bytes(2, "big") for eth in eth_types}
 376
 377            def _pkt_filter(_payload: bytes) -> bool:
 378                return _payload[12:14] in _filter
 379
 380        queue = Queue[p4entity.P4PacketIn](queue_size)
 381        queue_filter = (_pkt_filter, queue)
 382        self._packet_queues.append(queue_filter)
 383
 384        try:
 385            while True:
 386                yield await queue.get()
 387        finally:
 388            LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types)
 389            self._packet_queues.remove(queue_filter)
 390
 391    async def read_digests(
 392        self,
 393        digest_id: str,
 394        *,
 395        queue_size: int = _DEFAULT_QUEUE_SIZE,
 396    ) -> AsyncIterator["p4entity.P4DigestList"]:
 397        "Async iterator for incoming digest lists (P4DigestList)."
 398        LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id)
 399
 400        if digest_id in self._digest_queues:
 401            raise ValueError(f"queue for digest_id {digest_id!r} already open")
 402
 403        queue = Queue[p4entity.P4DigestList](queue_size)
 404        self._digest_queues[digest_id] = queue
 405        try:
 406            while True:
 407                yield await queue.get()
 408        finally:
 409            LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id)
 410            del self._digest_queues[digest_id]
 411
 412    async def read_idle_timeouts(
 413        self,
 414        *,
 415        queue_size: int = _DEFAULT_QUEUE_SIZE,
 416    ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]:
 417        "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)."
 418        LOGGER.debug("read_idle_timeouts: opening queue")
 419
 420        if self._timeout_queue is not None:
 421            raise ValueError("timeout queue already open")
 422
 423        queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size)
 424        self._timeout_queue = queue
 425        try:
 426            while True:
 427                yield await queue.get()
 428        finally:
 429            LOGGER.debug("read_idle_timeouts: closing queue")
 430            self._timeout_queue = None
 431
 432    async def write(
 433        self,
 434        entities: Iterable[p4entity.P4UpdateList],
 435        *,
 436        strict: bool = True,
 437        warn_only: bool = False,
 438    ) -> None:
 439        """Write updates and stream messages to the switch.
 440
 441        If `strict` is False, MODIFY and DELETE operations will NOT raise an
 442        error if the entity does not exist (NOT_FOUND).
 443
 444        If `warn_only` is True, no operations will raise an error. Instead,
 445        the exception will be logged as a WARNING and the method will return
 446        normally.
 447        """
 448        assert self._p4client is not None
 449
 450        if not entities:
 451            return
 452
 453        msgs = p4entity.encode_updates(entities, self.p4info)
 454
 455        updates: list[p4r.Update] = []
 456        for msg in msgs:
 457            if isinstance(msg, p4r.StreamMessageRequest):
 458                # StreamMessageRequests are transmitted immediately.
 459                # TODO: Understand what happens with backpressure?
 460                await self._p4client.send(msg)
 461            else:
 462                updates.append(msg)
 463
 464        if updates:
 465            await self._write_request(updates, strict, warn_only)
 466
 467    async def insert(
 468        self,
 469        entities: Iterable[p4entity.P4EntityList],
 470        *,
 471        warn_only: bool = False,
 472    ) -> None:
 473        """Insert the specified entities.
 474
 475        If `warn_only` is True, errors will be logged as warnings instead of
 476        raising an exception.
 477        """
 478        if entities:
 479            await self._write_request(
 480                [
 481                    p4r.Update(type=p4r.Update.INSERT, entity=ent)
 482                    for ent in p4entity.encode_entities(entities, self.p4info)
 483                ],
 484                True,
 485                warn_only,
 486            )
 487
 488    async def modify(
 489        self,
 490        entities: Iterable[p4entity.P4EntityList],
 491        *,
 492        strict: bool = True,
 493        warn_only: bool = False,
 494    ) -> None:
 495        """Modify the specified entities.
 496
 497        If `strict` is False, NOT_FOUND errors will be ignored.
 498
 499        If `warn_only` is True, errors will be logged as warnings instead of
 500        raising an exception.
 501        """
 502        if entities:
 503            await self._write_request(
 504                [
 505                    p4r.Update(type=p4r.Update.MODIFY, entity=ent)
 506                    for ent in p4entity.encode_entities(entities, self.p4info)
 507                ],
 508                strict,
 509                warn_only,
 510            )
 511
 512    async def delete(
 513        self,
 514        entities: Iterable[p4entity.P4EntityList],
 515        *,
 516        strict: bool = True,
 517        warn_only: bool = False,
 518    ) -> None:
 519        """Delete the specified entities.
 520
 521        If `strict` is False, NOT_FOUND errors will be ignored.
 522
 523        If `warn_only` is True, errors will be logged as warnings instead of
 524        raising an exception.
 525        """
 526        if entities:
 527            await self._write_request(
 528                [
 529                    p4r.Update(type=p4r.Update.DELETE, entity=ent)
 530                    for ent in p4entity.encode_entities(entities, self.p4info)
 531                ],
 532                strict,
 533                warn_only,
 534            )
 535
 536    async def delete_all(self) -> None:
 537        """Delete all entities if no parameter is passed. Otherwise, delete
 538        items that match `entities`.
 539
 540        This method does not attempt to delete entries in const tables.
 541
 542        TODO: This method does not affect indirect counters, meters or
 543        value_sets.
 544        """
 545        await self.delete_many(
 546            [
 547                p4entity.P4TableEntry(),
 548                p4entity.P4MulticastGroupEntry(),
 549                p4entity.P4CloneSessionEntry(),
 550            ]
 551        )
 552
 553        # Reset all default table entries.
 554        default_entries = [
 555            p4entity.P4TableEntry(table.alias, is_default_action=True)
 556            for table in self.p4info.tables
 557            if table.const_default_action is None and table.action_profile is None
 558        ]
 559        if default_entries:
 560            await self.modify(default_entries)
 561
 562        # Delete all P4ActionProfileGroup's and P4ActionProfileMember's.
 563        # We do this after deleting the P4TableEntry's in case a client is using
 564        # "one-shot" references; these are incompatible with separate
 565        # action profiles.
 566        await self.delete_many(
 567            [
 568                p4entity.P4ActionProfileGroup(),
 569                p4entity.P4ActionProfileMember(),
 570            ]
 571        )
 572
 573        # Delete DigestEntry separately. Wildcard reads are not supported.
 574        digest_entries = [
 575            p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests
 576        ]
 577        if digest_entries:
 578            await self.delete(digest_entries, strict=False)
 579
 580    async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None:
 581        """Delete entities that match a wildcard read.
 582
 583        This method always skips over entries in const tables. It is an error
 584        to attempt to delete those.
 585        """
 586        assert self._p4client is not None
 587
 588        request = p4r.ReadRequest(
 589            device_id=self.device_id,
 590            entities=p4entity.encode_entities(entities, self.p4info),
 591        )
 592
 593        # Compute set of all const table ID's (may be empty).
 594        to_skip = {table.id for table in self.p4info.tables if table.is_const}
 595
 596        async for reply in self._p4client.request_iter(request):
 597            if reply.entities:
 598                if to_skip:
 599                    await self.delete(
 600                        reply
 601                        for reply in reply.entities
 602                        if reply.HasField("table_entry")
 603                        and reply.table_entry.table_id not in to_skip
 604                    )
 605                else:
 606                    await self.delete(reply.entities)
 607
 608    async def run(self) -> None:
 609        "Run the switch's lifecycle repeatedly."
 610        assert self._p4client is None
 611        assert self._tasks is None
 612
 613        self._tasks = SwitchTasks(self._options.fail_fast)
 614        self._p4client = P4Client(self._address, self._options.channel_credentials)
 615        self._switch_start()
 616
 617        try:
 618            while True:
 619                # If the switch fails and restarts too quickly, slow it down.
 620                async with _throttle_failure():
 621                    self.create_task(self._run(), background=True)
 622                    await self._tasks.wait()
 623                    self._arbitrator.reset()
 624
 625        finally:
 626            self._p4client = None
 627            self._tasks = None
 628            self._switch_stop()
 629
 630    def create_task(
 631        self,
 632        coro: Coroutine[Any, Any, _T],
 633        *,
 634        background: bool = False,
 635        name: str | None = None,
 636    ) -> asyncio.Task[_T]:
 637        "Create an asyncio task tied to the Switch's lifecycle."
 638        assert self._tasks is not None
 639
 640        return self._tasks.create_task(
 641            coro,
 642            switch=self,
 643            background=background,
 644            name=name,
 645        )
 646
 647    async def _run(self):
 648        "Main Switch task runs the stream."
 649        assert not self._is_channel_up
 650        assert self._p4client is not None
 651
 652        try:
 653            await self._p4client.open(
 654                schema=self.p4info,
 655                complete_request=self._arbitrator.complete_request,
 656            )
 657            await self._arbitrator.handshake(self)
 658            await self._fetch_capabilities()
 659            await self._start_gnmi()
 660            self._channel_up()
 661
 662            # Receive messages from the stream until it closes.
 663            await self._receive_until_closed()
 664
 665        finally:
 666            await self._stop_gnmi()
 667            await self._p4client.close()
 668            self._channel_down()
 669
 670    async def _receive_until_closed(self):
 671        "Receive messages from stream until EOF."
 672        assert self._p4client is not None
 673
 674        client = self._p4client
 675
 676        while True:
 677            try:
 678                msg = await client.receive()
 679            except P4ClientError as ex:
 680                if not ex.is_election_id_used:
 681                    raise
 682                # Handle "election ID in use" error.
 683                await self._arbitrator.handshake(self, conflict=True)
 684            else:
 685                await self._handle_stream_message(msg)
 686
 687    async def _handle_stream_message(self, msg: p4r.StreamMessageResponse):
 688        "Handle a P4Runtime StreamMessageResponse."
 689        match msg.WhichOneof("update"):
 690            case "packet":
 691                self._stream_packet_message(msg)
 692            case "digest":
 693                self._stream_digest_message(msg)
 694            case "idle_timeout_notification":
 695                self._stream_timeout_message(msg)
 696            case "arbitration":
 697                await self._arbitrator.update(self, msg.arbitration)
 698            case "error":
 699                self._stream_error_message(msg)
 700            case other:
 701                LOGGER.error("_handle_stream_message: unknown update %r", other)
 702
 703    async def __aenter__(self) -> Self:
 704        "Similar to run() but provides a one-time context manager interface."
 705        assert self._p4client is None
 706        assert self._tasks is None
 707
 708        self._tasks = SwitchTasks(self._options.fail_fast)
 709        self._p4client = P4Client(
 710            self._address,
 711            self._options.channel_credentials,
 712            wait_for_ready=False,
 713        )
 714        self._switch_start()
 715
 716        try:
 717            # Start the switch's `_run` task in the background. Then, wait for
 718            # `_run` task to fire the CHANNEL_READY event. If the `_run` task
 719            # cannot connect or fails in some other way, it will finish before
 720            # the `ready` future. We need to handle the error in this case.
 721
 722            run = self.create_task(self._run(), background=True)
 723            ready = self.ee.event_future(SwitchEvent.CHANNEL_READY)
 724            done, _ = await asyncio.wait(
 725                [run, ready], return_when=asyncio.FIRST_COMPLETED
 726            )
 727            if run in done:
 728                await run
 729
 730        except BaseException:
 731            await self.__aexit__(None, None, None)
 732            raise
 733
 734        return self
 735
 736    async def __aexit__(
 737        self,
 738        _exc_type: type[BaseException] | None,
 739        _exc_val: BaseException | None,
 740        _exc_tb: TracebackType | None,
 741    ) -> bool | None:
 742        "Similar to run() but provides a one-time context manager interface."
 743        assert self._p4client is not None
 744        assert self._tasks is not None
 745
 746        self._tasks.cancel_all()
 747        await self._tasks.wait()
 748        self._arbitrator.reset()
 749        self._p4client = None
 750        self._tasks = None
 751        self._switch_stop()
 752
 753    def _switch_start(self):
 754        "Called when switch starts its run() cycle."
 755        assert not self._is_channel_up
 756
 757        LOGGER.info(
 758            "Switch start (name=%r, address=%r, device_id=%r, role_name=%r, initial_election_id=%r)",
 759            self.name,
 760            self.address,
 761            self.device_id,
 762            self.role_name,
 763            self.options.initial_election_id,
 764        )
 765        self.ee.emit(SwitchEvent.SWITCH_START)
 766
 767    def _switch_stop(self):
 768        "Called when switch stops its run() cycle."
 769        assert not self._is_channel_up
 770
 771        LOGGER.info(
 772            "Switch stop (name=%r, address=%r, device_id=%r, role_name=%r)",
 773            self.name,
 774            self.address,
 775            self.device_id,
 776            self.role_name,
 777        )
 778        self.ee.emit(SwitchEvent.SWITCH_STOP)
 779
 780    def _channel_up(self):
 781        "Called when P4Runtime channel is UP."
 782        assert not self._is_channel_up
 783
 784        ports = " ".join(f"({port.id}){port.name}" for port in self.ports)
 785        LOGGER.info(
 786            "Channel up (is_primary=%r, role_name=%r, p4r=%s): %s",
 787            self.is_primary,
 788            self.role_name,
 789            self.api_version,
 790            ports,
 791        )
 792        self._is_channel_up = True
 793        self.create_task(self._ready())
 794
 795        self.ee.emit(SwitchEvent.CHANNEL_UP, self)
 796
 797    def _channel_down(self):
 798        "Called when P4Runtime channel is DOWN."
 799        if not self._is_channel_up:
 800            return  # do nothing!
 801
 802        LOGGER.info(
 803            "Channel down (is_primary=%r, role_name=%r)",
 804            self.is_primary,
 805            self.role_name,
 806        )
 807        self._is_channel_up = False
 808
 809        self.ee.emit(SwitchEvent.CHANNEL_DOWN, self)
 810
 811    def _become_primary(self):
 812        "Called when a P4Runtime backup channel becomes the primary."
 813        assert self._tasks is not None
 814
 815        LOGGER.info(
 816            "Become primary (is_primary=%r, role_name=%r)",
 817            self.is_primary,
 818            self.role_name,
 819        )
 820
 821        self._tasks.cancel_primary()
 822        self.create_task(self._ready())
 823
 824        self.ee.emit(SwitchEvent.BECOME_PRIMARY, self)
 825
 826    def _become_backup(self):
 827        "Called when a P4Runtime primary channel becomes a backup."
 828        assert self._tasks is not None
 829
 830        LOGGER.info(
 831            "Become backup (is_primary=%r, role_name=%r)",
 832            self.is_primary,
 833            self.role_name,
 834        )
 835
 836        self._tasks.cancel_primary()
 837        self.create_task(self._ready())
 838
 839        self.ee.emit(SwitchEvent.BECOME_BACKUP, self)
 840
 841    def _channel_ready(self):
 842        "Called when a P4Runtime channel is READY."
 843        LOGGER.info(
 844            "Channel ready (is_primary=%r, role_name=%r): %s",
 845            self.is_primary,
 846            self.role_name,
 847            self.p4info.get_pipeline_info(),
 848        )
 849
 850        if self._options.ready_handler:
 851            self.create_task(self._options.ready_handler(self))
 852
 853        self.ee.emit(SwitchEvent.CHANNEL_READY, self)
 854
 855    def _stream_packet_message(self, msg: p4r.StreamMessageResponse):
 856        "Called when a P4Runtime packet-in response is received."
 857        packet = p4entity.decode_stream(msg, self.p4info)
 858
 859        was_queued = False
 860        for pkt_filter, queue in self._packet_queues:
 861            if not queue.full() and pkt_filter(packet.payload):
 862                queue.put_nowait(packet)
 863                was_queued = True
 864
 865        if not was_queued:
 866            LOGGER.warning("packet ignored: %r", packet)
 867
 868    def _stream_digest_message(self, msg: p4r.StreamMessageResponse):
 869        "Called when a P4Runtime digest response is received."
 870        try:
 871            # Decode the digest list message.
 872            digest: p4entity.P4DigestList = p4entity.decode_stream(msg, self.p4info)
 873        except ValueError as ex:
 874            # It's possible to receive a digest for a different P4Info file, or
 875            # even before a P4Info is fetched from the switch.
 876            LOGGER.warning("digest decode failed: %s", ex)
 877        else:
 878            # Place the decoded digest list in a queue, if one is waiting.
 879            queue = self._digest_queues.get(digest.digest_id)
 880            if queue is not None and not queue.full():
 881                queue.put_nowait(digest)
 882            else:
 883                LOGGER.warning("digest ignored: %r", digest)
 884
 885    def _stream_timeout_message(self, msg: p4r.StreamMessageResponse):
 886        "Called when a P4Runtime timeout notification is received."
 887        timeout: p4entity.P4IdleTimeoutNotification = p4entity.decode_stream(
 888            msg, self.p4info
 889        )
 890        queue = self._timeout_queue
 891
 892        if queue is not None and not queue.full():
 893            queue.put_nowait(timeout)
 894        else:
 895            LOGGER.warning("timeout ignored: %r", timeout)
 896
 897    def _stream_error_message(self, msg: p4r.StreamMessageResponse):
 898        "Called when a P4Runtime stream error response is received."
 899        assert self._p4client is not None
 900
 901        # Log the message at the ERROR level.
 902        pbutil.log_msg(self._p4client.channel, msg, self.p4info, level=logging.ERROR)
 903
 904        self.ee.emit(SwitchEvent.STREAM_ERROR, self, msg)
 905
 906    async def _ready(self):
 907        "Prepare the pipeline."
 908        if self.p4info.is_authoritative and self.is_primary:
 909            await self._set_pipeline()
 910        else:
 911            await self._get_pipeline()
 912
 913        self._channel_ready()
 914
 915    async def _get_pipeline(self):
 916        "Get the switch's P4Info."
 917        has_pipeline = False
 918
 919        try:
 920            reply = await self._get_pipeline_config_request(
 921                response_type=P4ConfigResponseType.P4INFO_AND_COOKIE
 922            )
 923
 924            if reply.config.HasField("p4info"):
 925                has_pipeline = True
 926                p4info = reply.config.p4info
 927                if not self.p4info.exists:
 928                    # If we don't have P4Info yet, set it.
 929                    self.p4info.set_p4info(p4info)
 930                elif not self.p4info.has_p4info(p4info):
 931                    # If P4Info is not identical, log a warning message.
 932                    LOGGER.warning("Retrieved P4Info is different than expected!")
 933
 934        except P4ClientError as ex:
 935            if not ex.is_pipeline_missing:
 936                raise
 937
 938        if not has_pipeline and self.p4info.exists:
 939            LOGGER.warning("Forwarding pipeline is not configured")
 940
 941    async def _set_pipeline(self):
 942        """Set up the pipeline.
 943
 944        If `p4force` is false (the default), we first retrieve the cookie for
 945        the current pipeline and see if it matches the new pipeline's cookie.
 946        If the cookies match, we are done; there is no need to set the pipeline.
 947
 948        If `p4force` is true, we always load the new pipeline.
 949        """
 950        cookie = -1
 951        try:
 952            if not self.options.p4force:
 953                reply = await self._get_pipeline_config_request()
 954                if reply.config.HasField("cookie"):
 955                    cookie = reply.config.cookie.cookie
 956
 957        except P4ClientError as ex:
 958            if not ex.is_pipeline_missing:
 959                raise
 960
 961        if cookie != self.p4info.p4cookie:
 962            LOGGER.debug(
 963                "cookie %#x does not match expected %#x", cookie, self.p4info.p4cookie
 964            )
 965            await self._set_pipeline_config_request(
 966                config=self.p4info.get_pipeline_config()
 967            )
 968            LOGGER.info("Pipeline installed: %s", self.p4info.get_pipeline_info())
 969
 970    async def _get_pipeline_config_request(
 971        self,
 972        *,
 973        response_type: P4ConfigResponseType = P4ConfigResponseType.COOKIE_ONLY,
 974    ) -> p4r.GetForwardingPipelineConfigResponse:
 975        "Send a GetForwardingPipelineConfigRequest and await the response."
 976        assert self._p4client is not None
 977
 978        return await self._p4client.request(
 979            p4r.GetForwardingPipelineConfigRequest(
 980                device_id=self.device_id,
 981                response_type=response_type.vt(),
 982            )
 983        )
 984
 985    async def _set_pipeline_config_request(
 986        self,
 987        *,
 988        action: P4ConfigAction = P4ConfigAction.VERIFY_AND_COMMIT,
 989        config: p4r.ForwardingPipelineConfig,
 990    ) -> p4r.SetForwardingPipelineConfigResponse:
 991        "Send a SetForwardingPipelineConfigRequest and await the response."
 992        assert self._p4client is not None
 993
 994        return await self._p4client.request(
 995            p4r.SetForwardingPipelineConfigRequest(
 996                device_id=self.device_id,
 997                action=action.vt(),
 998                config=config,
 999            )
1000        )
1001
1002    async def _write_request(
1003        self,
1004        updates: list[p4r.Update],
1005        strict: bool,
1006        warn_only: bool,
1007    ):
1008        "Send a P4Runtime WriteRequest."
1009        assert self._p4client is not None
1010
1011        try:
1012            await self._p4client.request(
1013                p4r.WriteRequest(
1014                    device_id=self.device_id,
1015                    updates=updates,
1016                )
1017            )
1018        except P4ClientError as ex:
1019            if strict or not ex.is_not_found_only:
1020                if warn_only:
1021                    LOGGER.warning(
1022                        "WriteRequest with `warn_only=True` failed",
1023                        exc_info=True,
1024                    )
1025                else:
1026                    raise
1027
1028            assert (not strict and ex.is_not_found_only) or warn_only
1029
1030    async def _fetch_capabilities(self):
1031        "Check the P4Runtime protocol version supported by the other end."
1032        assert self._p4client is not None
1033
1034        try:
1035            reply = await self._p4client.request(p4r.CapabilitiesRequest())
1036            self._api_version = ApiVersion.parse(reply.p4runtime_api_version)
1037
1038        except P4ClientError as ex:
1039            if ex.code != GRPCStatusCode.UNIMPLEMENTED:
1040                raise
1041            LOGGER.warning("CapabilitiesRequest is not implemented")
1042
1043    async def _start_gnmi(self):
1044        "Start the associated gNMI client."
1045        assert self._gnmi_client is None
1046        assert self._p4client is not None
1047
1048        self._gnmi_client = GNMIClient(self._address, self._options.channel_credentials)
1049        await self._gnmi_client.open(channel=self._p4client.channel)
1050
1051        try:
1052            await self._ports.subscribe(self._gnmi_client)
1053            if self._ports:
1054                self.create_task(self._ports.listen(), background=True, name="_ports")
1055
1056        except GNMIClientError as ex:
1057            if ex.code != GRPCStatusCode.UNIMPLEMENTED:
1058                raise
1059            LOGGER.warning("gNMI is not implemented")
1060            await self._gnmi_client.close()
1061            self._gnmi_client = None
1062
1063    async def _stop_gnmi(self):
1064        "Stop the associated gNMI client."
1065        if self._gnmi_client is not None:
1066            self._ports.close()
1067            await self._gnmi_client.close()
1068            self._gnmi_client = None
1069
1070    def __repr__(self) -> str:
1071        "Return string representation of switch."
1072        return f"Switch(name={self._name!r}, address={self._address!r})"

Represents a P4Runtime Switch.

A Switch is constructed with a name, address and an optional SwitchOptions configuration.

The name is up to the user but should uniquely identify the switch.

The address identifies the target endpoint of the GRPC channel. It should have the format "

:" where
can be a domain name, IPv4 address, or IPv6 address in square brackets.

The options is a SwitchOptions object that specifies how the Switch will behave.

opts = SwitchOptions(p4info=..., p4blob=...)
sw1 = Switch('sw1', '10.0.0.1:50000', opts)

Each switch object has an event emitter ee. Use the EventEmitter to listen for port change events like PORT_UP and PORT_DOWN. See the SwitchEvent class for a list of support switch events.

Switch( name: str, address: str, options: SwitchOptions | None = None)
201    def __init__(
202        self,
203        name: str,
204        address: str,
205        options: SwitchOptions | None = None,
206    ) -> None:
207        if options is None:
208            options = SwitchOptions()
209
210        self._name = name
211        self._address = address
212        self._options = options
213        self._stash = {}
214        self._ee = SwitchEmitter(self)
215        self._p4client = None
216        self._p4schema = P4Schema(options.p4info, options.p4blob)
217        self._tasks = None
218        self._packet_queues = []
219        self._digest_queues = {}
220        self._timeout_queue = None
221        self._arbitrator = Arbitrator(
222            options.initial_election_id, options.role_name, options.role_config
223        )
224        self._gnmi_client = None
225        self._ports = SwitchPortList()
name: str
227    @property
228    def name(self) -> str:
229        "Name of the switch."
230        return self._name

Name of the switch.

address: str
232    @property
233    def address(self) -> str:
234        "Address of the switch."
235        return self._address

Address of the switch.

options: SwitchOptions
237    @property
238    def options(self) -> SwitchOptions:
239        "Switch options."
240        return self._options

Switch options.

stash: dict[str, typing.Any]
254    @property
255    def stash(self) -> dict[str, Any]:
256        "Switch stash, may be used to store per-switch data for any purpose."
257        return self._stash

Switch stash, may be used to store per-switch data for any purpose.

ee: finsy.switch.SwitchEmitter
259    @property
260    def ee(self) -> "SwitchEmitter":
261        "Switch event emitter. See `SwitchEvent` for more details on events."
262        return self._ee

Switch event emitter. See SwitchEvent for more details on events.

device_id: int
264    @property
265    def device_id(self) -> int:
266        "Switch's device ID."
267        return self._options.device_id

Switch's device ID.

is_up: bool
269    @property
270    def is_up(self) -> bool:
271        "True if switch is UP."
272        return self._is_channel_up

True if switch is UP.

is_primary: bool
274    @property
275    def is_primary(self) -> bool:
276        "True if switch is primary."
277        return self._arbitrator.is_primary

True if switch is primary.

primary_id: int
279    @property
280    def primary_id(self) -> int:
281        "Election ID of switch that is currently primary."
282        return self._arbitrator.primary_id

Election ID of switch that is currently primary.

election_id: int
284    @property
285    def election_id(self) -> int:
286        "Switch's current election ID."
287        return self._arbitrator.election_id

Switch's current election ID.

role_name: str
289    @property
290    def role_name(self) -> str:
291        "Switch's current role name."
292        return self._arbitrator.role_name

Switch's current role name.

p4info: P4Schema
294    @property
295    def p4info(self) -> P4Schema:
296        "Switch's P4 schema."
297        return self._p4schema

Switch's P4 schema.

gnmi_client: GNMIClient | None
299    @property
300    def gnmi_client(self) -> GNMIClient | None:
301        "Switch's gNMI client."
302        return self._gnmi_client

Switch's gNMI client.

ports: SwitchPortList
304    @property
305    def ports(self) -> SwitchPortList:
306        "Switch's list of interfaces."
307        return self._ports

Switch's list of interfaces.

api_version: finsy.switch.ApiVersion
309    @property
310    def api_version(self) -> ApiVersion:
311        "P4Runtime protocol version."
312        return self._api_version

P4Runtime protocol version.

async def read( self, entities: Union[Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], finsy.p4entity.P4Entity]) -> AsyncGenerator[finsy.p4entity.P4Entity, NoneType]:
338    async def read(
339        self,
340        entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity,
341    ) -> AsyncGenerator[p4entity.P4Entity, None]:
342        "Async iterator that reads entities from the switch."
343        assert self._p4client is not None
344
345        if not entities:
346            return
347
348        if isinstance(entities, p4entity.P4Entity):
349            entities = [entities]
350
351        request = p4r.ReadRequest(
352            device_id=self.device_id,
353            entities=p4entity.encode_entities(entities, self.p4info),
354        )
355
356        async for reply in self._p4client.request_iter(request):
357            for ent in reply.entities:
358                yield p4entity.decode_entity(ent, self.p4info)

Async iterator that reads entities from the switch.

async def read_packets( self, *, queue_size: int = 50, eth_types: Optional[Iterable[int]] = None) -> AsyncIterator[P4PacketIn]:
360    async def read_packets(
361        self,
362        *,
363        queue_size: int = _DEFAULT_QUEUE_SIZE,
364        eth_types: Iterable[int] | None = None,
365    ) -> AsyncIterator["p4entity.P4PacketIn"]:
366        "Async iterator for incoming packets (P4PacketIn)."
367        LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types)
368
369        if eth_types is None:
370
371            def _pkt_filter(_payload: bytes) -> bool:
372                return True
373
374        else:
375            _filter = {eth.to_bytes(2, "big") for eth in eth_types}
376
377            def _pkt_filter(_payload: bytes) -> bool:
378                return _payload[12:14] in _filter
379
380        queue = Queue[p4entity.P4PacketIn](queue_size)
381        queue_filter = (_pkt_filter, queue)
382        self._packet_queues.append(queue_filter)
383
384        try:
385            while True:
386                yield await queue.get()
387        finally:
388            LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types)
389            self._packet_queues.remove(queue_filter)

Async iterator for incoming packets (P4PacketIn).

async def read_digests( self, digest_id: str, *, queue_size: int = 50) -> AsyncIterator[P4DigestList]:
391    async def read_digests(
392        self,
393        digest_id: str,
394        *,
395        queue_size: int = _DEFAULT_QUEUE_SIZE,
396    ) -> AsyncIterator["p4entity.P4DigestList"]:
397        "Async iterator for incoming digest lists (P4DigestList)."
398        LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id)
399
400        if digest_id in self._digest_queues:
401            raise ValueError(f"queue for digest_id {digest_id!r} already open")
402
403        queue = Queue[p4entity.P4DigestList](queue_size)
404        self._digest_queues[digest_id] = queue
405        try:
406            while True:
407                yield await queue.get()
408        finally:
409            LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id)
410            del self._digest_queues[digest_id]

Async iterator for incoming digest lists (P4DigestList).

async def read_idle_timeouts( self, *, queue_size: int = 50) -> AsyncIterator[finsy.p4entity.P4IdleTimeoutNotification]:
412    async def read_idle_timeouts(
413        self,
414        *,
415        queue_size: int = _DEFAULT_QUEUE_SIZE,
416    ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]:
417        "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)."
418        LOGGER.debug("read_idle_timeouts: opening queue")
419
420        if self._timeout_queue is not None:
421            raise ValueError("timeout queue already open")
422
423        queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size)
424        self._timeout_queue = queue
425        try:
426            while True:
427                yield await queue.get()
428        finally:
429            LOGGER.debug("read_idle_timeouts: closing queue")
430            self._timeout_queue = None

Async iterator for incoming idle timeouts (P4IdleTimeoutNotification).

async def write( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Update, p4.v1.p4runtime_pb2.StreamMessageRequest, finsy.p4entity._SupportsEncodeUpdate, Iterable[ForwardRef('P4UpdateList')]]], *, strict: bool = True, warn_only: bool = False) -> None:
432    async def write(
433        self,
434        entities: Iterable[p4entity.P4UpdateList],
435        *,
436        strict: bool = True,
437        warn_only: bool = False,
438    ) -> None:
439        """Write updates and stream messages to the switch.
440
441        If `strict` is False, MODIFY and DELETE operations will NOT raise an
442        error if the entity does not exist (NOT_FOUND).
443
444        If `warn_only` is True, no operations will raise an error. Instead,
445        the exception will be logged as a WARNING and the method will return
446        normally.
447        """
448        assert self._p4client is not None
449
450        if not entities:
451            return
452
453        msgs = p4entity.encode_updates(entities, self.p4info)
454
455        updates: list[p4r.Update] = []
456        for msg in msgs:
457            if isinstance(msg, p4r.StreamMessageRequest):
458                # StreamMessageRequests are transmitted immediately.
459                # TODO: Understand what happens with backpressure?
460                await self._p4client.send(msg)
461            else:
462                updates.append(msg)
463
464        if updates:
465            await self._write_request(updates, strict, warn_only)

Write updates and stream messages to the switch.

If strict is False, MODIFY and DELETE operations will NOT raise an error if the entity does not exist (NOT_FOUND).

If warn_only is True, no operations will raise an error. Instead, the exception will be logged as a WARNING and the method will return normally.

async def insert( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], *, warn_only: bool = False) -> None:
467    async def insert(
468        self,
469        entities: Iterable[p4entity.P4EntityList],
470        *,
471        warn_only: bool = False,
472    ) -> None:
473        """Insert the specified entities.
474
475        If `warn_only` is True, errors will be logged as warnings instead of
476        raising an exception.
477        """
478        if entities:
479            await self._write_request(
480                [
481                    p4r.Update(type=p4r.Update.INSERT, entity=ent)
482                    for ent in p4entity.encode_entities(entities, self.p4info)
483                ],
484                True,
485                warn_only,
486            )

Insert the specified entities.

If warn_only is True, errors will be logged as warnings instead of raising an exception.

async def modify( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], *, strict: bool = True, warn_only: bool = False) -> None:
488    async def modify(
489        self,
490        entities: Iterable[p4entity.P4EntityList],
491        *,
492        strict: bool = True,
493        warn_only: bool = False,
494    ) -> None:
495        """Modify the specified entities.
496
497        If `strict` is False, NOT_FOUND errors will be ignored.
498
499        If `warn_only` is True, errors will be logged as warnings instead of
500        raising an exception.
501        """
502        if entities:
503            await self._write_request(
504                [
505                    p4r.Update(type=p4r.Update.MODIFY, entity=ent)
506                    for ent in p4entity.encode_entities(entities, self.p4info)
507                ],
508                strict,
509                warn_only,
510            )

Modify the specified entities.

If strict is False, NOT_FOUND errors will be ignored.

If warn_only is True, errors will be logged as warnings instead of raising an exception.

async def delete( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], *, strict: bool = True, warn_only: bool = False) -> None:
512    async def delete(
513        self,
514        entities: Iterable[p4entity.P4EntityList],
515        *,
516        strict: bool = True,
517        warn_only: bool = False,
518    ) -> None:
519        """Delete the specified entities.
520
521        If `strict` is False, NOT_FOUND errors will be ignored.
522
523        If `warn_only` is True, errors will be logged as warnings instead of
524        raising an exception.
525        """
526        if entities:
527            await self._write_request(
528                [
529                    p4r.Update(type=p4r.Update.DELETE, entity=ent)
530                    for ent in p4entity.encode_entities(entities, self.p4info)
531                ],
532                strict,
533                warn_only,
534            )

Delete the specified entities.

If strict is False, NOT_FOUND errors will be ignored.

If warn_only is True, errors will be logged as warnings instead of raising an exception.

async def delete_all(self) -> None:
536    async def delete_all(self) -> None:
537        """Delete all entities if no parameter is passed. Otherwise, delete
538        items that match `entities`.
539
540        This method does not attempt to delete entries in const tables.
541
542        TODO: This method does not affect indirect counters, meters or
543        value_sets.
544        """
545        await self.delete_many(
546            [
547                p4entity.P4TableEntry(),
548                p4entity.P4MulticastGroupEntry(),
549                p4entity.P4CloneSessionEntry(),
550            ]
551        )
552
553        # Reset all default table entries.
554        default_entries = [
555            p4entity.P4TableEntry(table.alias, is_default_action=True)
556            for table in self.p4info.tables
557            if table.const_default_action is None and table.action_profile is None
558        ]
559        if default_entries:
560            await self.modify(default_entries)
561
562        # Delete all P4ActionProfileGroup's and P4ActionProfileMember's.
563        # We do this after deleting the P4TableEntry's in case a client is using
564        # "one-shot" references; these are incompatible with separate
565        # action profiles.
566        await self.delete_many(
567            [
568                p4entity.P4ActionProfileGroup(),
569                p4entity.P4ActionProfileMember(),
570            ]
571        )
572
573        # Delete DigestEntry separately. Wildcard reads are not supported.
574        digest_entries = [
575            p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests
576        ]
577        if digest_entries:
578            await self.delete(digest_entries, strict=False)

Delete all entities if no parameter is passed. Otherwise, delete items that match entities.

This method does not attempt to delete entries in const tables.

TODO: This method does not affect indirect counters, meters or value_sets.

async def delete_many( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]]) -> None:
580    async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None:
581        """Delete entities that match a wildcard read.
582
583        This method always skips over entries in const tables. It is an error
584        to attempt to delete those.
585        """
586        assert self._p4client is not None
587
588        request = p4r.ReadRequest(
589            device_id=self.device_id,
590            entities=p4entity.encode_entities(entities, self.p4info),
591        )
592
593        # Compute set of all const table ID's (may be empty).
594        to_skip = {table.id for table in self.p4info.tables if table.is_const}
595
596        async for reply in self._p4client.request_iter(request):
597            if reply.entities:
598                if to_skip:
599                    await self.delete(
600                        reply
601                        for reply in reply.entities
602                        if reply.HasField("table_entry")
603                        and reply.table_entry.table_id not in to_skip
604                    )
605                else:
606                    await self.delete(reply.entities)

Delete entities that match a wildcard read.

This method always skips over entries in const tables. It is an error to attempt to delete those.

async def run(self) -> None:
608    async def run(self) -> None:
609        "Run the switch's lifecycle repeatedly."
610        assert self._p4client is None
611        assert self._tasks is None
612
613        self._tasks = SwitchTasks(self._options.fail_fast)
614        self._p4client = P4Client(self._address, self._options.channel_credentials)
615        self._switch_start()
616
617        try:
618            while True:
619                # If the switch fails and restarts too quickly, slow it down.
620                async with _throttle_failure():
621                    self.create_task(self._run(), background=True)
622                    await self._tasks.wait()
623                    self._arbitrator.reset()
624
625        finally:
626            self._p4client = None
627            self._tasks = None
628            self._switch_stop()

Run the switch's lifecycle repeatedly.

def create_task( self, coro: Coroutine[Any, Any, ~_T], *, background: bool = False, name: str | None = None) -> _asyncio.Task[~_T]:
630    def create_task(
631        self,
632        coro: Coroutine[Any, Any, _T],
633        *,
634        background: bool = False,
635        name: str | None = None,
636    ) -> asyncio.Task[_T]:
637        "Create an asyncio task tied to the Switch's lifecycle."
638        assert self._tasks is not None
639
640        return self._tasks.create_task(
641            coro,
642            switch=self,
643            background=background,
644            name=name,
645        )

Create an asyncio task tied to the Switch's lifecycle.

async def __aenter__(self) -> typing_extensions.Self:
703    async def __aenter__(self) -> Self:
704        "Similar to run() but provides a one-time context manager interface."
705        assert self._p4client is None
706        assert self._tasks is None
707
708        self._tasks = SwitchTasks(self._options.fail_fast)
709        self._p4client = P4Client(
710            self._address,
711            self._options.channel_credentials,
712            wait_for_ready=False,
713        )
714        self._switch_start()
715
716        try:
717            # Start the switch's `_run` task in the background. Then, wait for
718            # `_run` task to fire the CHANNEL_READY event. If the `_run` task
719            # cannot connect or fails in some other way, it will finish before
720            # the `ready` future. We need to handle the error in this case.
721
722            run = self.create_task(self._run(), background=True)
723            ready = self.ee.event_future(SwitchEvent.CHANNEL_READY)
724            done, _ = await asyncio.wait(
725                [run, ready], return_when=asyncio.FIRST_COMPLETED
726            )
727            if run in done:
728                await run
729
730        except BaseException:
731            await self.__aexit__(None, None, None)
732            raise
733
734        return self

Similar to run() but provides a one-time context manager interface.

class SwitchEvent(builtins.str, enum.Enum):
1075class SwitchEvent(str, enum.Enum):
1076    "Events for Switch class."
1077
1078    CONTROLLER_ENTER = "controller_enter"  # (switch)
1079    CONTROLLER_LEAVE = "controller_leave"  # (switch)
1080    SWITCH_START = "switch_start"  # (switch)
1081    SWITCH_STOP = "switch_stop"  # (switch)
1082    CHANNEL_UP = "channel_up"  # (switch)
1083    CHANNEL_DOWN = "channel_down"  # (switch)
1084    CHANNEL_READY = "channel_ready"  # (switch)
1085    BECOME_PRIMARY = "become_primary"  # (switch)
1086    BECOME_BACKUP = "become_backup"  # (switch)
1087    PORT_UP = "port_up"  # (switch, port)
1088    PORT_DOWN = "port_down"  # (switch, port)
1089    STREAM_ERROR = "stream_error"  # (switch, p4r.StreamMessageResponse)

Events for Switch class.

CONTROLLER_ENTER = <SwitchEvent.CONTROLLER_ENTER: 'controller_enter'>
CONTROLLER_LEAVE = <SwitchEvent.CONTROLLER_LEAVE: 'controller_leave'>
SWITCH_START = <SwitchEvent.SWITCH_START: 'switch_start'>
SWITCH_STOP = <SwitchEvent.SWITCH_STOP: 'switch_stop'>
CHANNEL_UP = <SwitchEvent.CHANNEL_UP: 'channel_up'>
CHANNEL_DOWN = <SwitchEvent.CHANNEL_DOWN: 'channel_down'>
CHANNEL_READY = <SwitchEvent.CHANNEL_READY: 'channel_ready'>
BECOME_PRIMARY = <SwitchEvent.BECOME_PRIMARY: 'become_primary'>
BECOME_BACKUP = <SwitchEvent.BECOME_BACKUP: 'become_backup'>
PORT_UP = <SwitchEvent.PORT_UP: 'port_up'>
PORT_DOWN = <SwitchEvent.PORT_DOWN: 'port_down'>
STREAM_ERROR = <SwitchEvent.STREAM_ERROR: 'stream_error'>
Inherited Members
enum.Enum
name
value
builtins.str
__iter__
__len__
__getitem__
__contains__
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
@final
@dataclasses.dataclass(frozen=True)
class SwitchOptions:
 70@final
 71@dataclasses.dataclass(frozen=True)
 72class SwitchOptions:
 73    """Represents the configuration options for a `Switch`.
 74
 75    ```
 76    opts = SwitchOptions(
 77        p4info=Path("basic.p4info.txtpb"),
 78        p4blob=Path("basic.json"),
 79        ready_handler=on_ready,
 80    )
 81    ```
 82
 83    Each `SwitchOptions` object is immutable and may be shared by multiple
 84    switches. You should treat all values as read-only.
 85
 86    You can use function call syntax to return a copy of a `SwitchOptions` with
 87    one or more propertise altered.
 88
 89    ```
 90    new_opts = opts(device_id=6)
 91    ```
 92    """
 93
 94    p4info: Path | None = None
 95    "Path to P4Info protobuf text file."
 96
 97    p4blob: Path | SupportsBytes | None = None
 98    "Path to P4Blob file, or an object that can provide the bytes value."
 99
100    p4force: bool = False
101    "If true, always load the P4 program after initial handshake."
102
103    device_id: int = 1
104    "Default P4Runtime device ID."
105
106    initial_election_id: int = 10
107    "Initial P4Runtime election ID."
108
109    channel_credentials: GRPCCredentialsTLS | None = None
110    "P4Runtime channel credentials. Used for TLS support."
111
112    role_name: str = ""
113    "P4Runtime role configuration."
114
115    role_config: pbutil.PBMessage | None = None
116    "P4Runtime role configuration."
117
118    ready_handler: Callable[["Switch"], Coroutine[Any, Any, None]] | None = None
119    "Ready handler async function callback."
120
121    fail_fast: bool = False
122    """If true, log switch errors as CRITICAL and immediately abort when the
123    switch is running in a Controller."""
124
125    configuration: Any = None
126    "Store your app's configuration information here."
127
128    def __call__(self, **kwds: Any) -> Self:
129        return dataclasses.replace(self, **kwds)

Represents the configuration options for a Switch.

opts = SwitchOptions(
    p4info=Path("basic.p4info.txtpb"),
    p4blob=Path("basic.json"),
    ready_handler=on_ready,
)

Each SwitchOptions object is immutable and may be shared by multiple switches. You should treat all values as read-only.

You can use function call syntax to return a copy of a SwitchOptions with one or more propertise altered.

new_opts = opts(device_id=6)
SwitchOptions( p4info: pathlib.Path | None = None, p4blob: pathlib.Path | typing.SupportsBytes | None = None, p4force: bool = False, device_id: int = 1, initial_election_id: int = 10, channel_credentials: GRPCCredentialsTLS | None = None, role_name: str = '', role_config: google.protobuf.message.Message | None = None, ready_handler: Optional[Callable[[Switch], Coroutine[Any, Any, NoneType]]] = None, fail_fast: bool = False, configuration: Any = None)
p4info: pathlib.Path | None = None

Path to P4Info protobuf text file.

p4blob: pathlib.Path | typing.SupportsBytes | None = None

Path to P4Blob file, or an object that can provide the bytes value.

p4force: bool = False

If true, always load the P4 program after initial handshake.

device_id: int = 1

Default P4Runtime device ID.

initial_election_id: int = 10

Initial P4Runtime election ID.

channel_credentials: GRPCCredentialsTLS | None = None

P4Runtime channel credentials. Used for TLS support.

role_name: str = ''

P4Runtime role configuration.

role_config: google.protobuf.message.Message | None = None

P4Runtime role configuration.

ready_handler: Optional[Callable[[Switch], Coroutine[Any, Any, NoneType]]] = None

Ready handler async function callback.

fail_fast: bool = False

If true, log switch errors as CRITICAL and immediately abort when the switch is running in a Controller.

configuration: Any = None

Store your app's configuration information here.

@dataclass
class SwitchPort:
43@dataclass
44class SwitchPort:
45    "Represents a switch port."
46
47    id: int
48    name: str
49    oper_status: OperStatus = OperStatus.UNKNOWN
50
51    @property
52    def up(self) -> bool:
53        "Return true if port is basically up."
54        return self.oper_status == OperStatus.UP

Represents a switch port.

SwitchPort( id: int, name: str, oper_status: finsy.ports.OperStatus = <OperStatus.UNKNOWN: 'UNKNOWN'>)
id: int
name: str
oper_status: finsy.ports.OperStatus = <OperStatus.UNKNOWN: 'UNKNOWN'>
up: bool
51    @property
52    def up(self) -> bool:
53        "Return true if port is basically up."
54        return self.oper_status == OperStatus.UP

Return true if port is basically up.

class SwitchPortList:
 57class SwitchPortList:
 58    "Represents a list of switch ports."
 59
 60    _ports: dict[str, SwitchPort]
 61    _subscription: GNMISubscription | None = None
 62
 63    def __init__(self):
 64        self._ports = {}
 65
 66    def __getitem__(self, key: str) -> SwitchPort:
 67        "Retrieve interface by ID."
 68        return self._ports[key]
 69
 70    def __len__(self) -> int:
 71        "Return number of switch ports."
 72        return len(self._ports)
 73
 74    def __iter__(self) -> Iterator[SwitchPort]:
 75        "Iterate over switch ports."
 76        return iter(self._ports.values())
 77
 78    async def subscribe(self, client: GNMIClient) -> None:
 79        """Obtain the initial list of ports and subscribe to switch port status
 80        updates using GNMI."""
 81        assert self._subscription is None
 82
 83        self._ports = await self._get_ports(client)
 84        if self._ports:
 85            self._subscription = await self._get_subscription(client)
 86        else:
 87            LOGGER.warning("No switch ports exist")
 88
 89    async def listen(self, switch: "_sw.Switch | None" = None) -> None:
 90        "Listen for switch port updates."
 91        assert self._subscription is not None
 92
 93        async for update in self._subscription.updates():
 94            self._update(update, switch)
 95
 96    def close(self) -> None:
 97        "Close the switch port subscription."
 98        if self._subscription is not None:
 99            self._subscription.cancel()
100            self._subscription = None
101            self._ports = {}
102
103    async def _get_ports(self, client: GNMIClient) -> dict[str, SwitchPort]:
104        "Retrieve ID and name of each port."
105        ports: dict[str, SwitchPort] = {}
106
107        result = await client.get(_ifIndex)
108        for update in result:
109            path = update.path
110            assert path.last == _ifIndex.last
111
112            port = SwitchPort(update.value, path["name"])
113            ports[port.name] = port
114
115        return ports
116
117    async def _get_subscription(self, client: GNMIClient) -> GNMISubscription:
118        sub = client.subscribe()
119
120        # Subscribe to change notifications.
121        for port in self._ports.values():
122            sub.on_change(_ifOperStatus.set(name=port.name))
123
124        # Synchronize initial settings for ports.
125        async for update in sub.synchronize():
126            self._update(update, None)
127
128        return sub
129
130    def _update(self, update: GNMIUpdate, switch: "_sw.Switch | None"):
131        path = update.path
132        if path.last == _ifOperStatus.last:
133            status = OperStatus(update.value)
134            self._update_port(path["name"], status, switch)
135        else:
136            LOGGER.warning(f"PortList: unknown gNMI path: {path}")
137
138    def _update_port(self, name: str, status: OperStatus, switch: "_sw.Switch | None"):
139        port = self._ports[name]
140
141        prev_up = port.up
142        port.oper_status = status
143        curr_up = port.up
144
145        if switch is not None and curr_up != prev_up:
146            if curr_up:
147                switch.ee.emit(_sw.SwitchEvent.PORT_UP, switch, port)
148            else:
149                switch.ee.emit(_sw.SwitchEvent.PORT_DOWN, switch, port)

Represents a list of switch ports.

def __getitem__(self, key: str) -> SwitchPort:
66    def __getitem__(self, key: str) -> SwitchPort:
67        "Retrieve interface by ID."
68        return self._ports[key]

Retrieve interface by ID.

def __len__(self) -> int:
70    def __len__(self) -> int:
71        "Return number of switch ports."
72        return len(self._ports)

Return number of switch ports.

def __iter__(self) -> Iterator[SwitchPort]:
74    def __iter__(self) -> Iterator[SwitchPort]:
75        "Iterate over switch ports."
76        return iter(self._ports.values())

Iterate over switch ports.

async def subscribe(self, client: GNMIClient) -> None:
78    async def subscribe(self, client: GNMIClient) -> None:
79        """Obtain the initial list of ports and subscribe to switch port status
80        updates using GNMI."""
81        assert self._subscription is None
82
83        self._ports = await self._get_ports(client)
84        if self._ports:
85            self._subscription = await self._get_subscription(client)
86        else:
87            LOGGER.warning("No switch ports exist")

Obtain the initial list of ports and subscribe to switch port status updates using GNMI.

async def listen(self, switch: Switch | None = None) -> None:
89    async def listen(self, switch: "_sw.Switch | None" = None) -> None:
90        "Listen for switch port updates."
91        assert self._subscription is not None
92
93        async for update in self._subscription.updates():
94            self._update(update, switch)

Listen for switch port updates.

def close(self) -> None:
 96    def close(self) -> None:
 97        "Close the switch port subscription."
 98        if self._subscription is not None:
 99            self._subscription.cancel()
100            self._subscription = None
101            self._ports = {}

Close the switch port subscription.

class GNMIClient:
119class GNMIClient:
120    """Async GNMI client.
121
122    This client implements `get`, `set`, `subscribe` and `capabilities`.
123
124    The API depends on the protobuf definition of `gnmi.TypedValue`.
125
126    Get usage:
127    ```
128    client = GNMIClient('127.0.0.1:9339')
129    await client.open()
130
131    path = GNMIPath("interfaces/interface")
132    async for update in client.get(path):
133        print(update)
134    ```
135
136    Subscribe usage:
137    ```
138    path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
139    sub = client.subscribe()
140    sub.on_change(path)
141
142    async for initial_state in sub.synchronize():
143        print(initial_state)
144
145    async for update in sub.updates():
146        print(update)
147    ```
148
149    Set usage:
150    ```
151    enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")
152
153    await client.set(update={
154        enabled: gnmi.TypedValue(boolValue=True),
155    })
156    ```
157    """
158
159    _address: str
160    _credentials: GRPCCredentialsTLS | None
161    _channel: grpc.aio.Channel | None = None
162    _stub: gnmi_grpc.gNMIStub | None = None
163    _channel_reused: bool = False
164
165    def __init__(
166        self,
167        address: str,
168        credentials: GRPCCredentialsTLS | None = None,
169    ):
170        self._address = address
171        self._credentials = credentials
172
173    async def __aenter__(self) -> Self:
174        await self.open()
175        return self
176
177    async def __aexit__(self, *_args: Any) -> bool | None:
178        await self.close()
179
180    async def open(
181        self,
182        *,
183        channel: grpc.aio.Channel | None = None,
184    ) -> None:
185        """Open the client channel.
186
187        Note: This method is `async` for forward-compatible reasons.
188        """
189        if self._channel is not None:
190            raise RuntimeError("GNMIClient: client is already open")
191
192        assert self._stub is None
193
194        if channel is not None:
195            self._channel = channel
196            self._channel_reused = True
197        else:
198            self._channel = grpc_channel(
199                self._address,
200                credentials=self._credentials,
201                client_type="GNMIClient",
202            )
203
204        self._stub = gnmi_grpc.gNMIStub(self._channel)
205
206    async def close(self) -> None:
207        "Close the client channel."
208        if self._channel is not None:
209            if not self._channel_reused:
210                LOGGER.debug("GNMIClient: close channel %r", self._address)
211                await self._channel.close()
212
213            self._channel = None
214            self._stub = None
215            self._channel_reused = False
216
217    async def get(
218        self,
219        *path: GNMIPath,
220        prefix: GNMIPath | None = None,
221        config: bool = False,
222    ) -> Sequence[GNMIUpdate]:
223        "Retrieve value(s) using a GetRequest."
224        if self._stub is None:
225            raise RuntimeError("GNMIClient: client is not open")
226
227        request = gnmi.GetRequest(
228            path=(i.path for i in path),
229            encoding=gnmi.Encoding.PROTO,
230        )
231
232        if prefix is not None:
233            request.prefix.CopyFrom(prefix.path)
234
235        if config:
236            request.type = gnmi.GetRequest.CONFIG
237
238        self._log_msg(request)
239        try:
240            reply = cast(
241                gnmi.GetResponse,
242                await self._stub.Get(request),
243            )
244        except grpc.RpcError as ex:
245            raise GNMIClientError(ex) from None
246
247        self._log_msg(reply)
248
249        result: list[GNMIUpdate] = []
250        for notification in reply.notification:
251            for update in _read_updates(notification):
252                result.append(update)
253
254        return result
255
256    def subscribe(
257        self,
258        *,
259        prefix: GNMIPath | None = None,
260    ) -> "GNMISubscription":
261        """Subscribe to gNMI change notifications.
262
263        Usage:
264        ```
265        sub = client.subscribe()
266        sub.on_change(path1, ...)
267        sub.sample(path3, path4, sample_interval=1000000000)
268
269        async for update in sub.synchronize():
270            # do something with initial state
271
272        async for update in sub.updates():
273            # do something with updates
274        ```
275
276        You can also subscribe in "ONCE" mode:
277        ```
278        sub = client.subscribe()
279        sub.once(path1, path2, ...)
280
281        async for info in sub.synchronize():
282            # do something with info
283        ```
284
285        The subscription object is not re-entrant, but a fully consumed
286        subscription may be reused.
287        """
288        if self._stub is None:
289            raise RuntimeError("GNMIClient: client is not open")
290
291        return GNMISubscription(self, prefix)
292
293    async def capabilities(self) -> gnmi.CapabilityResponse:
294        "Issue a CapabilitiesRequest."
295        if self._stub is None:
296            raise RuntimeError("GNMIClient: client is not open")
297
298        request = gnmi.CapabilityRequest()
299
300        self._log_msg(request)
301        try:
302            reply = cast(
303                gnmi.CapabilityResponse,
304                await self._stub.Capabilities(request),
305            )
306        except grpc.RpcError as ex:
307            raise GNMIClientError(ex) from None
308
309        self._log_msg(reply)
310        return reply
311
312    async def set(
313        self,
314        *,
315        update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
316        replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
317        delete: Sequence[GNMIPath] | None = None,
318        prefix: GNMIPath | None = None,
319    ) -> int:
320        """Set value(s) using SetRequest.
321
322        Returns the timestamp from the successful `SetResponse`.
323        """
324        if self._stub is None:
325            raise RuntimeError("GNMIClient: client is not open")
326
327        if update is not None:
328            updates = [gnmi_update(path, value) for path, value in update]
329        else:
330            updates = None
331
332        if replace is not None:
333            replaces = [gnmi_update(path, value) for path, value in replace]
334        else:
335            replaces = None
336
337        if delete is not None:
338            deletes = [path.path for path in delete]
339        else:
340            deletes = None
341
342        request = gnmi.SetRequest(
343            update=updates,
344            replace=replaces,
345            delete=deletes,
346        )
347
348        if prefix is not None:
349            request.prefix.CopyFrom(prefix.path)
350
351        self._log_msg(request)
352        try:
353            reply = cast(
354                gnmi.SetResponse,
355                await self._stub.Set(request),
356            )
357        except grpc.RpcError as ex:
358            raise GNMIClientError(ex) from None
359
360        self._log_msg(reply)
361
362        # According to the comments in the current protobuf, I expect all error
363        # results to be raised as an exception. The only useful value in a
364        # successful response is the timestamp.
365
366        if reply.HasField("message"):
367            raise NotImplementedError("SetResponse error not supported")
368
369        for result in reply.response:
370            if result.HasField("message"):
371                raise NotImplementedError("SetResponse suberror not supported")
372
373        return reply.timestamp
374
375    def _log_msg(self, msg: "pbutil.PBMessage"):
376        "Log a gNMI message."
377        pbutil.log_msg(self._channel, msg, None)

Async GNMI client.

This client implements get, set, subscribe and capabilities.

The API depends on the protobuf definition of gnmi.TypedValue.

Get usage:

client = GNMIClient('127.0.0.1:9339')
await client.open()

path = GNMIPath("interfaces/interface")
async for update in client.get(path):
    print(update)

Subscribe usage:

path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
sub = client.subscribe()
sub.on_change(path)

async for initial_state in sub.synchronize():
    print(initial_state)

async for update in sub.updates():
    print(update)

Set usage:

enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")

await client.set(update={
    enabled: gnmi.TypedValue(boolValue=True),
})
GNMIClient( address: str, credentials: GRPCCredentialsTLS | None = None)
165    def __init__(
166        self,
167        address: str,
168        credentials: GRPCCredentialsTLS | None = None,
169    ):
170        self._address = address
171        self._credentials = credentials
async def __aenter__(self) -> typing_extensions.Self:
173    async def __aenter__(self) -> Self:
174        await self.open()
175        return self
async def open(self, *, channel: grpc.aio._base_channel.Channel | None = None) -> None:
180    async def open(
181        self,
182        *,
183        channel: grpc.aio.Channel | None = None,
184    ) -> None:
185        """Open the client channel.
186
187        Note: This method is `async` for forward-compatible reasons.
188        """
189        if self._channel is not None:
190            raise RuntimeError("GNMIClient: client is already open")
191
192        assert self._stub is None
193
194        if channel is not None:
195            self._channel = channel
196            self._channel_reused = True
197        else:
198            self._channel = grpc_channel(
199                self._address,
200                credentials=self._credentials,
201                client_type="GNMIClient",
202            )
203
204        self._stub = gnmi_grpc.gNMIStub(self._channel)

Open the client channel.

Note: This method is async for forward-compatible reasons.

async def close(self) -> None:
206    async def close(self) -> None:
207        "Close the client channel."
208        if self._channel is not None:
209            if not self._channel_reused:
210                LOGGER.debug("GNMIClient: close channel %r", self._address)
211                await self._channel.close()
212
213            self._channel = None
214            self._stub = None
215            self._channel_reused = False

Close the client channel.

async def get( self, *path: GNMIPath, prefix: GNMIPath | None = None, config: bool = False) -> Sequence[GNMIUpdate]:
217    async def get(
218        self,
219        *path: GNMIPath,
220        prefix: GNMIPath | None = None,
221        config: bool = False,
222    ) -> Sequence[GNMIUpdate]:
223        "Retrieve value(s) using a GetRequest."
224        if self._stub is None:
225            raise RuntimeError("GNMIClient: client is not open")
226
227        request = gnmi.GetRequest(
228            path=(i.path for i in path),
229            encoding=gnmi.Encoding.PROTO,
230        )
231
232        if prefix is not None:
233            request.prefix.CopyFrom(prefix.path)
234
235        if config:
236            request.type = gnmi.GetRequest.CONFIG
237
238        self._log_msg(request)
239        try:
240            reply = cast(
241                gnmi.GetResponse,
242                await self._stub.Get(request),
243            )
244        except grpc.RpcError as ex:
245            raise GNMIClientError(ex) from None
246
247        self._log_msg(reply)
248
249        result: list[GNMIUpdate] = []
250        for notification in reply.notification:
251            for update in _read_updates(notification):
252                result.append(update)
253
254        return result

Retrieve value(s) using a GetRequest.

def subscribe( self, *, prefix: GNMIPath | None = None) -> GNMISubscription:
256    def subscribe(
257        self,
258        *,
259        prefix: GNMIPath | None = None,
260    ) -> "GNMISubscription":
261        """Subscribe to gNMI change notifications.
262
263        Usage:
264        ```
265        sub = client.subscribe()
266        sub.on_change(path1, ...)
267        sub.sample(path3, path4, sample_interval=1000000000)
268
269        async for update in sub.synchronize():
270            # do something with initial state
271
272        async for update in sub.updates():
273            # do something with updates
274        ```
275
276        You can also subscribe in "ONCE" mode:
277        ```
278        sub = client.subscribe()
279        sub.once(path1, path2, ...)
280
281        async for info in sub.synchronize():
282            # do something with info
283        ```
284
285        The subscription object is not re-entrant, but a fully consumed
286        subscription may be reused.
287        """
288        if self._stub is None:
289            raise RuntimeError("GNMIClient: client is not open")
290
291        return GNMISubscription(self, prefix)

Subscribe to gNMI change notifications.

Usage:

sub = client.subscribe()
sub.on_change(path1, ...)
sub.sample(path3, path4, sample_interval=1000000000)

async for update in sub.synchronize():
    # do something with initial state

async for update in sub.updates():
    # do something with updates

You can also subscribe in "ONCE" mode:

sub = client.subscribe()
sub.once(path1, path2, ...)

async for info in sub.synchronize():
    # do something with info

The subscription object is not re-entrant, but a fully consumed subscription may be reused.

async def capabilities(self) -> gnmi1.gnmi_pb2.CapabilityResponse:
293    async def capabilities(self) -> gnmi.CapabilityResponse:
294        "Issue a CapabilitiesRequest."
295        if self._stub is None:
296            raise RuntimeError("GNMIClient: client is not open")
297
298        request = gnmi.CapabilityRequest()
299
300        self._log_msg(request)
301        try:
302            reply = cast(
303                gnmi.CapabilityResponse,
304                await self._stub.Capabilities(request),
305            )
306        except grpc.RpcError as ex:
307            raise GNMIClientError(ex) from None
308
309        self._log_msg(reply)
310        return reply

Issue a CapabilitiesRequest.

async def set( self, *, update: Optional[Sequence[tuple[GNMIPath, bool | int | float | str | bytes | gnmi1.gnmi_pb2.TypedValue]]] = None, replace: Optional[Sequence[tuple[GNMIPath, bool | int | float | str | bytes | gnmi1.gnmi_pb2.TypedValue]]] = None, delete: Optional[Sequence[GNMIPath]] = None, prefix: GNMIPath | None = None) -> int:
312    async def set(
313        self,
314        *,
315        update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
316        replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
317        delete: Sequence[GNMIPath] | None = None,
318        prefix: GNMIPath | None = None,
319    ) -> int:
320        """Set value(s) using SetRequest.
321
322        Returns the timestamp from the successful `SetResponse`.
323        """
324        if self._stub is None:
325            raise RuntimeError("GNMIClient: client is not open")
326
327        if update is not None:
328            updates = [gnmi_update(path, value) for path, value in update]
329        else:
330            updates = None
331
332        if replace is not None:
333            replaces = [gnmi_update(path, value) for path, value in replace]
334        else:
335            replaces = None
336
337        if delete is not None:
338            deletes = [path.path for path in delete]
339        else:
340            deletes = None
341
342        request = gnmi.SetRequest(
343            update=updates,
344            replace=replaces,
345            delete=deletes,
346        )
347
348        if prefix is not None:
349            request.prefix.CopyFrom(prefix.path)
350
351        self._log_msg(request)
352        try:
353            reply = cast(
354                gnmi.SetResponse,
355                await self._stub.Set(request),
356            )
357        except grpc.RpcError as ex:
358            raise GNMIClientError(ex) from None
359
360        self._log_msg(reply)
361
362        # According to the comments in the current protobuf, I expect all error
363        # results to be raised as an exception. The only useful value in a
364        # successful response is the timestamp.
365
366        if reply.HasField("message"):
367            raise NotImplementedError("SetResponse error not supported")
368
369        for result in reply.response:
370            if result.HasField("message"):
371                raise NotImplementedError("SetResponse suberror not supported")
372
373        return reply.timestamp

Set value(s) using SetRequest.

Returns the timestamp from the successful SetResponse.

class GNMIPath:
 25class GNMIPath:
 26    """Concrete class for working with `gnmi.Path` objects.
 27
 28    A `GNMIPath` should be treated as an immutable object. You can access
 29    the wrapped `gnmi.Path` protobuf class using the `.path` property.
 30    `GNMIPath` objects are hashable, but you must be careful that you do not
 31    mutate them via the underlying `gnmi.Path`.
 32
 33    You can construct a GNMIPath from a string:
 34    ```
 35    path = GNMIPath("interfaces/interface[name=eth1]/state")
 36    ```
 37
 38    You can construct a GNMIPath object from a `gnmi.Path` directly.
 39    ```
 40    path = GNMIPath(update.path)
 41    ```
 42
 43    You can create paths by using an existing path as a template, without
 44    modifying the original path. Use the `set` method:
 45    ```
 46    operStatus = GNMIPath("interfaces/interface/state/oper-status")
 47    path = operStatus.set("interface", name="eth1")
 48    ```
 49
 50    Use [] to access the name/key of path elements:
 51    ```
 52    path[1] == "interface"
 53    path["interface", "name"] == "eth1"
 54    path["name"] == "eth1"
 55    ```
 56    """
 57
 58    path: gnmi.Path
 59    "The underlying `gnmi.Path` protobuf representation."
 60
 61    def __init__(
 62        self,
 63        path: "gnmi.Path | str | GNMIPath" = "",
 64        *,
 65        origin: str = "",
 66        target: str = "",
 67    ):
 68        "Construct a `GNMIPath` from a string or `gnmi.Path`."
 69        if isinstance(path, str):
 70            path = gnmistring.parse(path)
 71        elif isinstance(path, GNMIPath):
 72            path = _copy_path(path.path)
 73
 74        assert isinstance(path, gnmi.Path)
 75
 76        if origin:
 77            path.origin = origin
 78
 79        if target:
 80            path.target = target
 81
 82        self.path = path
 83
 84    @property
 85    def first(self) -> str:
 86        "Return the first element of the path."
 87        return self.path.elem[0].name
 88
 89    @property
 90    def last(self) -> str:
 91        "Return the last element of the path."
 92        return self.path.elem[-1].name
 93
 94    @property
 95    def origin(self) -> str:
 96        "Return the path's origin."
 97        return self.path.origin
 98
 99    @property
100    def target(self) -> str:
101        "Return the path's target."
102        return self.path.target
103
104    def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath":
105        "Construct a new GNMIPath with keys set for the given elem."
106        if __elem is None:
107            return self._rekey(kwds)
108
109        if isinstance(__elem, str):
110            elem = _find_index(__elem, self.path)
111        else:
112            elem = __elem
113
114        result = self.copy()
115        keys = result.path.elem[elem].key
116        keys.clear()
117        keys.update({key: str(val) for key, val in kwds.items()})
118        return result
119
120    def copy(self) -> "GNMIPath":
121        "Return a copy of the path."
122        return GNMIPath(_copy_path(self.path))
123
124    @overload
125    def __getitem__(
126        self, key: int | str | tuple[int | str, str]
127    ) -> str: ...  # pragma: no cover
128
129    @overload
130    def __getitem__(self, key: slice) -> "GNMIPath": ...  # pragma: no cover
131
132    def __getitem__(
133        self,
134        key: int | str | tuple[int | str, str] | slice,
135    ) -> "str | GNMIPath":
136        "Return the specified element or key value."
137        match key:
138            case int(idx):
139                return self.path.elem[idx].name
140            case str(name):
141                return _retrieve_key(name, self.path)
142            case (int(idx), str(k)):
143                result = self.path.elem[idx].key.get(k)
144                if result is None:
145                    raise KeyError(k)
146                return result
147            case (str(name), str(k)):
148                result = self.path.elem[_find_index(name, self.path)].key.get(k)
149                if result is None:
150                    raise KeyError(k)
151                return result
152            case slice() as s:
153                return self._slice(s.start, s.stop, s.step)
154            case other:  # pyright: ignore[reportUnnecessaryComparison]
155                raise TypeError(f"invalid key type: {other!r}")
156
157    def __eq__(self, rhs: Any) -> bool:
158        "Return True if path's are equal."
159        if not isinstance(rhs, GNMIPath):
160            return False
161        return self.path == rhs.path  # pyright: ignore[reportUnknownVariableType]
162
163    def __hash__(self) -> int:
164        "Return hash of path."
165        return hash(tuple(_to_tuple(elem) for elem in self.path.elem))
166
167    def __contains__(self, name: str) -> bool:
168        "Return True if element name is in path."
169        for elem in self.path.elem:
170            if elem.name == name:
171                return True
172        return False
173
174    def __repr__(self) -> str:
175        "Return string representation of path."
176        path = gnmistring.to_str(self.path)
177        if self.target or self.origin:
178            return f"GNMIPath({path!r}, origin={self.origin!r}, target={self.target!r})"
179        return f"GNMIPath({path!r})"
180
181    def __str__(self) -> str:
182        "Return path as string."
183        return gnmistring.to_str(self.path)
184
185    def __len__(self) -> int:
186        "Return the length of the path."
187        return len(self.path.elem)
188
189    def __truediv__(self, rhs: "GNMIPath | str") -> "GNMIPath":
190        "Append values to end of path."
191        if not isinstance(rhs, GNMIPath):
192            rhs = GNMIPath(rhs)
193
194        result = gnmi.Path(
195            elem=itertools.chain(self.path.elem, rhs.path.elem),
196            origin=self.origin,
197            target=self.target,
198        )
199        return GNMIPath(result)
200
201    def __rtruediv__(self, lhs: str) -> "GNMIPath":
202        "Prepend values to the beginning of the path."
203        path = GNMIPath(lhs)
204
205        result = gnmi.Path(elem=itertools.chain(path.path.elem, self.path.elem))
206        return GNMIPath(result)
207
208    def _slice(
209        self, start: int | None, stop: int | None, step: int | None
210    ) -> "GNMIPath":
211        "Return specified slice of GNMIPath."
212        if start is None:
213            start = 0
214
215        if stop is None:
216            stop = len(self)
217        elif stop < 0:
218            stop = len(self) + stop
219
220        if step is None:
221            step = 1
222
223        path = gnmi.Path()
224        for i in range(start, stop, step):
225            elem = self.path.elem[i]
226            path.elem.append(gnmi.PathElem(name=elem.name, key=elem.key))
227
228        return GNMIPath(path)
229
230    def _rekey(self, keys: dict[str, Any]) -> "GNMIPath":
231        """Construct a new path with specified keys replaced."""
232        if not keys:
233            raise ValueError("empty keys")
234
235        result = self.copy()
236
237        found = False
238        for elem in result.path.elem:
239            for key, value in keys.items():
240                if key in elem.key:
241                    elem.key[key] = str(value)
242                    found = True
243
244        if not found:
245            raise ValueError(f"no keys found in path: {keys!r}")
246
247        return result

Concrete class for working with gnmi.Path objects.

A GNMIPath should be treated as an immutable object. You can access the wrapped gnmi.Path protobuf class using the .path property. GNMIPath objects are hashable, but you must be careful that you do not mutate them via the underlying gnmi.Path.

You can construct a GNMIPath from a string:

path = GNMIPath("interfaces/interface[name=eth1]/state")

You can construct a GNMIPath object from a gnmi.Path directly.

path = GNMIPath(update.path)

You can create paths by using an existing path as a template, without modifying the original path. Use the set method:

operStatus = GNMIPath("interfaces/interface/state/oper-status")
path = operStatus.set("interface", name="eth1")

Use [] to access the name/key of path elements:

path[1] == "interface"
path["interface", "name"] == "eth1"
path["name"] == "eth1"
GNMIPath( path: gnmi1.gnmi_pb2.Path | str | GNMIPath = '', *, origin: str = '', target: str = '')
61    def __init__(
62        self,
63        path: "gnmi.Path | str | GNMIPath" = "",
64        *,
65        origin: str = "",
66        target: str = "",
67    ):
68        "Construct a `GNMIPath` from a string or `gnmi.Path`."
69        if isinstance(path, str):
70            path = gnmistring.parse(path)
71        elif isinstance(path, GNMIPath):
72            path = _copy_path(path.path)
73
74        assert isinstance(path, gnmi.Path)
75
76        if origin:
77            path.origin = origin
78
79        if target:
80            path.target = target
81
82        self.path = path

Construct a GNMIPath from a string or gnmi.Path.

path: gnmi1.gnmi_pb2.Path

The underlying gnmi.Path protobuf representation.

first: str
84    @property
85    def first(self) -> str:
86        "Return the first element of the path."
87        return self.path.elem[0].name

Return the first element of the path.

last: str
89    @property
90    def last(self) -> str:
91        "Return the last element of the path."
92        return self.path.elem[-1].name

Return the last element of the path.

origin: str
94    @property
95    def origin(self) -> str:
96        "Return the path's origin."
97        return self.path.origin

Return the path's origin.

target: str
 99    @property
100    def target(self) -> str:
101        "Return the path's target."
102        return self.path.target

Return the path's target.

def set( self, _GNMIPath__elem: str | int | None = None, **kwds: Any) -> GNMIPath:
104    def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath":
105        "Construct a new GNMIPath with keys set for the given elem."
106        if __elem is None:
107            return self._rekey(kwds)
108
109        if isinstance(__elem, str):
110            elem = _find_index(__elem, self.path)
111        else:
112            elem = __elem
113
114        result = self.copy()
115        keys = result.path.elem[elem].key
116        keys.clear()
117        keys.update({key: str(val) for key, val in kwds.items()})
118        return result

Construct a new GNMIPath with keys set for the given elem.

def copy(self) -> GNMIPath:
120    def copy(self) -> "GNMIPath":
121        "Return a copy of the path."
122        return GNMIPath(_copy_path(self.path))

Return a copy of the path.

def __getitem__( self, key: int | str | tuple[int | str, str] | slice) -> str | GNMIPath:
132    def __getitem__(
133        self,
134        key: int | str | tuple[int | str, str] | slice,
135    ) -> "str | GNMIPath":
136        "Return the specified element or key value."
137        match key:
138            case int(idx):
139                return self.path.elem[idx].name
140            case str(name):
141                return _retrieve_key(name, self.path)
142            case (int(idx), str(k)):
143                result = self.path.elem[idx].key.get(k)
144                if result is None:
145                    raise KeyError(k)
146                return result
147            case (str(name), str(k)):
148                result = self.path.elem[_find_index(name, self.path)].key.get(k)
149                if result is None:
150                    raise KeyError(k)
151                return result
152            case slice() as s:
153                return self._slice(s.start, s.stop, s.step)
154            case other:  # pyright: ignore[reportUnnecessaryComparison]
155                raise TypeError(f"invalid key type: {other!r}")

Return the specified element or key value.

def __contains__(self, name: str) -> bool:
167    def __contains__(self, name: str) -> bool:
168        "Return True if element name is in path."
169        for elem in self.path.elem:
170            if elem.name == name:
171                return True
172        return False

Return True if element name is in path.

def __len__(self) -> int:
185    def __len__(self) -> int:
186        "Return the length of the path."
187        return len(self.path.elem)

Return the length of the path.

class GNMISubscription:
385class GNMISubscription:
386    """Represents a gNMI subscription stream.
387
388    Returned from `GNMIClient.subscribe()`.
389    """
390
391    _client: GNMIClient
392    _stream: _StreamTypeAlias | None
393
394    def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None):
395        "Initialize a GNMISubscription."
396        self._client = client
397        self._stream = None
398        self._sublist = gnmi.SubscriptionList(
399            mode=gnmi.SubscriptionList.Mode.STREAM,
400        )
401        if prefix is not None:
402            self._sublist.prefix.CopyFrom(prefix.path)
403
404    def once(self, *paths: GNMIPath) -> None:
405        "Subscribe in `ONCE` mode to given paths."
406        self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE
407
408        for path in paths:
409            sub = gnmi.Subscription(path=path.path)
410            self._sublist.subscription.append(sub)
411
412    def on_change(self, *paths: GNMIPath) -> None:
413        "Subscribe in `ON_CHANGE` mode to given paths."
414        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
415
416        for path in paths:
417            sub = gnmi.Subscription(
418                path=path.path,
419                mode=gnmi.SubscriptionMode.ON_CHANGE,
420            )
421            self._sublist.subscription.append(sub)
422
423    def sample(
424        self,
425        *paths: GNMIPath,
426        sample_interval: int,
427        suppress_redundant: bool = False,
428        heartbeat_interval: int = 0,
429    ) -> None:
430        "Subscribe in `SAMPLE` mode to given paths."
431        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
432
433        for path in paths:
434            sub = gnmi.Subscription(
435                path=path.path,
436                mode=gnmi.SubscriptionMode.SAMPLE,
437                sample_interval=sample_interval,
438                suppress_redundant=suppress_redundant,
439                heartbeat_interval=heartbeat_interval,
440            )
441            self._sublist.subscription.append(sub)
442
443    async def synchronize(self) -> AsyncIterator[GNMIUpdate]:
444        """Async iterator for initial subscription updates.
445
446        Retrieve all updates up to `sync_response` message.
447        """
448        if self._stream is None:
449            await self._subscribe()
450
451        try:
452            async for result in self._read(True):
453                yield result
454        except grpc.RpcError as ex:
455            raise GNMIClientError(ex) from None
456
457    async def updates(self) -> AsyncIterator[GNMIUpdate]:
458        "Async iterator for all remaining subscription updates."
459        if self._stream is None:
460            await self._subscribe()
461
462        try:
463            async for result in self._read(False):
464                yield result
465        except grpc.RpcError as ex:
466            raise GNMIClientError(ex) from None
467
468    def cancel(self) -> None:
469        "Cancel the subscription."
470        if self._stream is not None:
471            self._stream.cancel()
472            self._stream = None
473
474    async def _subscribe(self):
475        assert self._stream is None
476        assert self._client._stub is not None
477
478        self._stream = cast(
479            _StreamTypeAlias,
480            self._client._stub.Subscribe(wait_for_ready=True),  # type: ignore
481        )
482
483        request = gnmi.SubscribeRequest(subscribe=self._sublist)
484        self._client._log_msg(request)
485        try:
486            await self._stream.write(request)
487        except grpc.RpcError as ex:
488            raise GNMIClientError(ex) from None
489
490    async def _read(self, stop_at_sync: bool) -> AsyncIterator[GNMIUpdate]:
491        assert self._stream is not None
492
493        while True:
494            msg = cast(
495                gnmi.SubscribeResponse,
496                await self._stream.read(),  # type: ignore
497            )
498            if msg is GRPC_EOF:
499                LOGGER.warning("gNMI _read: unexpected EOF")
500                return
501
502            self._client._log_msg(msg)
503
504            match msg.WhichOneof("response"):
505                case "update":
506                    for update in _read_updates(msg.update):
507                        yield update
508                case "sync_response":
509                    if stop_at_sync:
510                        if self._is_once():
511                            # FIXME? I expected Stratum to issue an EOF after
512                            # the sync_response in "ONCE" mode, but it doesn't.
513                            # For now, cancel the stream explictly.
514                            self.cancel()
515                            # Let grpc react to the cancellation immediately.
516                            await asyncio.sleep(0)
517                        return  # All done!
518                    LOGGER.warning("gNMI _read: ignored sync_response")
519                case other:
520                    LOGGER.warning("gNMI _read: unexpected oneof: %s", other)
521
522    def _is_once(self) -> bool:
523        "Return true if the subscription is in ONCE mode."
524        return self._sublist.mode == gnmi.SubscriptionList.Mode.ONCE

Represents a gNMI subscription stream.

Returned from GNMIClient.subscribe().

GNMISubscription( client: GNMIClient, prefix: GNMIPath | None = None)
394    def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None):
395        "Initialize a GNMISubscription."
396        self._client = client
397        self._stream = None
398        self._sublist = gnmi.SubscriptionList(
399            mode=gnmi.SubscriptionList.Mode.STREAM,
400        )
401        if prefix is not None:
402            self._sublist.prefix.CopyFrom(prefix.path)

Initialize a GNMISubscription.

def once(self, *paths: GNMIPath) -> None:
404    def once(self, *paths: GNMIPath) -> None:
405        "Subscribe in `ONCE` mode to given paths."
406        self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE
407
408        for path in paths:
409            sub = gnmi.Subscription(path=path.path)
410            self._sublist.subscription.append(sub)

Subscribe in ONCE mode to given paths.

def on_change(self, *paths: GNMIPath) -> None:
412    def on_change(self, *paths: GNMIPath) -> None:
413        "Subscribe in `ON_CHANGE` mode to given paths."
414        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
415
416        for path in paths:
417            sub = gnmi.Subscription(
418                path=path.path,
419                mode=gnmi.SubscriptionMode.ON_CHANGE,
420            )
421            self._sublist.subscription.append(sub)

Subscribe in ON_CHANGE mode to given paths.

def sample( self, *paths: GNMIPath, sample_interval: int, suppress_redundant: bool = False, heartbeat_interval: int = 0) -> None:
423    def sample(
424        self,
425        *paths: GNMIPath,
426        sample_interval: int,
427        suppress_redundant: bool = False,
428        heartbeat_interval: int = 0,
429    ) -> None:
430        "Subscribe in `SAMPLE` mode to given paths."
431        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
432
433        for path in paths:
434            sub = gnmi.Subscription(
435                path=path.path,
436                mode=gnmi.SubscriptionMode.SAMPLE,
437                sample_interval=sample_interval,
438                suppress_redundant=suppress_redundant,
439                heartbeat_interval=heartbeat_interval,
440            )
441            self._sublist.subscription.append(sub)

Subscribe in SAMPLE mode to given paths.

async def synchronize(self) -> AsyncIterator[GNMIUpdate]:
443    async def synchronize(self) -> AsyncIterator[GNMIUpdate]:
444        """Async iterator for initial subscription updates.
445
446        Retrieve all updates up to `sync_response` message.
447        """
448        if self._stream is None:
449            await self._subscribe()
450
451        try:
452            async for result in self._read(True):
453                yield result
454        except grpc.RpcError as ex:
455            raise GNMIClientError(ex) from None

Async iterator for initial subscription updates.

Retrieve all updates up to sync_response message.

async def updates(self) -> AsyncIterator[GNMIUpdate]:
457    async def updates(self) -> AsyncIterator[GNMIUpdate]:
458        "Async iterator for all remaining subscription updates."
459        if self._stream is None:
460            await self._subscribe()
461
462        try:
463            async for result in self._read(False):
464                yield result
465        except grpc.RpcError as ex:
466            raise GNMIClientError(ex) from None

Async iterator for all remaining subscription updates.

def cancel(self) -> None:
468    def cancel(self) -> None:
469        "Cancel the subscription."
470        if self._stream is not None:
471            self._stream.cancel()
472            self._stream = None

Cancel the subscription.

@dataclass
class GNMIUpdate:
63@dataclass
64class GNMIUpdate:
65    "Represents a gNMI update returned from GNMIClient."
66
67    timestamp: int
68    path: GNMIPath
69    typed_value: gnmi.TypedValue | None
70
71    @property
72    def value(self) -> Any:
73        "Return the value as a Python value."
74        if self.typed_value is None:
75            return None
76
77        attr = self.typed_value.WhichOneof("value")
78        if attr is None:
79            raise ValueError("typed_value is not set")
80        return getattr(self.typed_value, attr)
81
82    def __repr__(self) -> str:
83        "Override repr to strip newline from end of `TypedValue`."
84        value = repr(self.typed_value).rstrip()
85        return f"GNMIUpdate(timestamp={self.timestamp!r}, path={self.path!r}, typed_value=`{value}`)"

Represents a gNMI update returned from GNMIClient.

GNMIUpdate( timestamp: int, path: GNMIPath, typed_value: gnmi1.gnmi_pb2.TypedValue | None)
timestamp: int
path: GNMIPath
typed_value: gnmi1.gnmi_pb2.TypedValue | None
value: Any
71    @property
72    def value(self) -> Any:
73        "Return the value as a Python value."
74        if self.typed_value is None:
75            return None
76
77        attr = self.typed_value.WhichOneof("value")
78        if attr is None:
79            raise ValueError("typed_value is not set")
80        return getattr(self.typed_value, attr)

Return the value as a Python value.

@dataclass(kw_only=True)
class GRPCCredentialsTLS:
117@dataclass(kw_only=True)
118class GRPCCredentialsTLS:
119    "GRPC channel credentials for Transport Layer Security (TLS)."
120
121    cacert: Path | bytes | None
122    """Certificate authority used to authenticate the certificate at the other
123    end of the connection."""
124
125    cert: Path | bytes | None
126    "Certificate that identifies this side of the connection."
127
128    private_key: Path | bytes | None
129    "Private key associated with this side's certificate identity."
130
131    target_name_override: str = ""
132    "Override the target name used for TLS host name checking (useful for testing)."
133
134    call_credentials: grpc.AuthMetadataPlugin | None = None
135    """Optional GRPC call credentials for the client channel. Be aware that the
136    auth plugin's callback takes place in a different thread."""
137
138    def to_client_credentials(self) -> grpc.ChannelCredentials:
139        "Create native SSL client credentials object."
140        root_certificates = _coerce_tls_path(self.cacert)
141        certificate_chain = _coerce_tls_path(self.cert)
142        private_key = _coerce_tls_path(self.private_key)
143
144        return self._compose_credentials(
145            grpc.ssl_channel_credentials(  # pyright: ignore[reportUnknownMemberType]
146                root_certificates=root_certificates,
147                private_key=private_key,
148                certificate_chain=certificate_chain,
149            )
150        )
151
152    def to_server_credentials(self) -> grpc.ServerCredentials:
153        """Create native SSL server credentials object.
154
155        On the server side, we ignore the `call_credentials`.
156        """
157        root_certificates = _coerce_tls_path(self.cacert)
158        certificate_chain = _coerce_tls_path(self.cert)
159        private_key = _coerce_tls_path(self.private_key)
160
161        return grpc.ssl_server_credentials(  # pyright: ignore[reportUnknownMemberType]
162            private_key_certificate_chain_pairs=[(private_key, certificate_chain)],
163            root_certificates=root_certificates,
164            require_client_auth=True,
165        )
166
167    def _compose_credentials(
168        self, channel_cred: grpc.ChannelCredentials
169    ) -> grpc.ChannelCredentials:
170        "Compose call credentials with channel credentials."
171        if not self.call_credentials:
172            return channel_cred
173
174        call_cred = (
175            grpc.metadata_call_credentials(  # pyright: ignore[reportUnknownMemberType]
176                self.call_credentials
177            )
178        )
179        return grpc.composite_channel_credentials(  # pyright: ignore[reportUnknownMemberType]
180            channel_cred,
181            call_cred,
182        )

GRPC channel credentials for Transport Layer Security (TLS).

GRPCCredentialsTLS( *, cacert: pathlib.Path | bytes | None, cert: pathlib.Path | bytes | None, private_key: pathlib.Path | bytes | None, target_name_override: str = '', call_credentials: grpc.AuthMetadataPlugin | None = None)
cacert: pathlib.Path | bytes | None

Certificate authority used to authenticate the certificate at the other end of the connection.

cert: pathlib.Path | bytes | None

Certificate that identifies this side of the connection.

private_key: pathlib.Path | bytes | None

Private key associated with this side's certificate identity.

target_name_override: str = ''

Override the target name used for TLS host name checking (useful for testing).

call_credentials: grpc.AuthMetadataPlugin | None = None

Optional GRPC call credentials for the client channel. Be aware that the auth plugin's callback takes place in a different thread.

def to_client_credentials(self) -> grpc.ChannelCredentials:
138    def to_client_credentials(self) -> grpc.ChannelCredentials:
139        "Create native SSL client credentials object."
140        root_certificates = _coerce_tls_path(self.cacert)
141        certificate_chain = _coerce_tls_path(self.cert)
142        private_key = _coerce_tls_path(self.private_key)
143
144        return self._compose_credentials(
145            grpc.ssl_channel_credentials(  # pyright: ignore[reportUnknownMemberType]
146                root_certificates=root_certificates,
147                private_key=private_key,
148                certificate_chain=certificate_chain,
149            )
150        )

Create native SSL client credentials object.

def to_server_credentials(self) -> grpc.ServerCredentials:
152    def to_server_credentials(self) -> grpc.ServerCredentials:
153        """Create native SSL server credentials object.
154
155        On the server side, we ignore the `call_credentials`.
156        """
157        root_certificates = _coerce_tls_path(self.cacert)
158        certificate_chain = _coerce_tls_path(self.cert)
159        private_key = _coerce_tls_path(self.private_key)
160
161        return grpc.ssl_server_credentials(  # pyright: ignore[reportUnknownMemberType]
162            private_key_certificate_chain_pairs=[(private_key, certificate_chain)],
163            root_certificates=root_certificates,
164            require_client_auth=True,
165        )

Create native SSL server credentials object.

On the server side, we ignore the call_credentials.

class GRPCStatusCode(finsy.grpcutil._EnumBase):
41class GRPCStatusCode(_EnumBase):
42    "IntEnum equivalent to `grpc.StatusCode`."
43
44    OK = rpc_code.OK
45    CANCELLED = rpc_code.CANCELLED
46    UNKNOWN = rpc_code.UNKNOWN
47    FAILED_PRECONDITION = rpc_code.FAILED_PRECONDITION
48    INVALID_ARGUMENT = rpc_code.INVALID_ARGUMENT
49    DEADLINE_EXCEEDED = rpc_code.DEADLINE_EXCEEDED
50    NOT_FOUND = rpc_code.NOT_FOUND
51    ALREADY_EXISTS = rpc_code.ALREADY_EXISTS
52    PERMISSION_DENIED = rpc_code.PERMISSION_DENIED
53    UNAUTHENTICATED = rpc_code.UNAUTHENTICATED
54    RESOURCE_EXHAUSTED = rpc_code.RESOURCE_EXHAUSTED
55    ABORTED = rpc_code.ABORTED
56    OUT_OF_RANGE = rpc_code.OUT_OF_RANGE
57    UNIMPLEMENTED = rpc_code.UNIMPLEMENTED
58    INTERNAL = rpc_code.INTERNAL
59    UNAVAILABLE = rpc_code.UNAVAILABLE
60    DATA_LOSS = rpc_code.DATA_LOSS
61
62    @classmethod
63    def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode":
64        "Create corresponding GRPCStatusCode from a grpc.StatusCode object."
65        n: Any = val.value[0]  # pyright: ignore[reportUnknownMemberType]
66        assert isinstance(n, int)
67        return GRPCStatusCode(n)
68
69    @staticmethod
70    def _validate_enum() -> None:
71        "Verify that GRPCStatusCode covers every possible grpc.StatusCode."
72        for value in grpc.StatusCode:
73            n: Any = value.value[0]  # pyright: ignore[reportUnknownMemberType]
74            assert isinstance(n, int)
75            assert GRPCStatusCode[value.name].value == n, value.name

IntEnum equivalent to grpc.StatusCode.

FAILED_PRECONDITION = GRPCStatusCode.FAILED_PRECONDITION
INVALID_ARGUMENT = GRPCStatusCode.INVALID_ARGUMENT
DEADLINE_EXCEEDED = GRPCStatusCode.DEADLINE_EXCEEDED
PERMISSION_DENIED = GRPCStatusCode.PERMISSION_DENIED
UNAUTHENTICATED = GRPCStatusCode.UNAUTHENTICATED
RESOURCE_EXHAUSTED = GRPCStatusCode.RESOURCE_EXHAUSTED
@classmethod
def from_status_code(cls, val: grpc.StatusCode) -> GRPCStatusCode:
62    @classmethod
63    def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode":
64        "Create corresponding GRPCStatusCode from a grpc.StatusCode object."
65        n: Any = val.value[0]  # pyright: ignore[reportUnknownMemberType]
66        assert isinstance(n, int)
67        return GRPCStatusCode(n)

Create corresponding GRPCStatusCode from a grpc.StatusCode object.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator