finsy

Finsy P4Runtime Controller Library

pypi documentation ci codecov codespace

Finsy is a P4Runtime controller library written in Python using asyncio. Finsy includes support for gNMI.

Check out the examples directory for some demonstration programs.

Installation

Finsy requires Python 3.10 or later. To install the latest version, type pip install finsy.

P4Runtime Scripts

With Finsy, you can write a Python script that reads/writes P4Runtime entities for a single switch.

Here is a complete example that retrieves the P4Info from a switch:

import finsy as fy

async def main():
    async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
        # Print out a description of the switch's P4Info, if one is configured.
        print(sw1.p4info)

fy.run(main())

Here is another example that prints out all non-default table entries.

import finsy as fy

async def main():
    async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
        # Do a wildcard read for table entries.
        async for entry in sw1.read(fy.P4TableEntry()):
            print(entry)

fy.run(main())

P4Runtime Controller

You can also write a P4Runtime controller that manages multiple switches independently. Your controller can react to events from the Switch by changing the contents of P4 tables.

Each switch is managed by an async ready_handler function. Your ready_handler function can read or update various P4Runtime entities in the switch. It can also create tasks to listen for packets or digests.

When you write P4Runtime updates to the switch, you use a unary operator (+, -, \~) to specify the operation: INSERT (+), DELETE (-) or MODIFY (\~).

import finsy as fy

async def ready_handler(sw: fy.Switch):
    await sw.delete_all()
    await sw.write(
        [
            # Insert (+) multicast group with ports 1, 2, 3 and CONTROLLER.
            +fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3, 255]),
            # Modify (~) default table entry to flood all unmatched packets.
            ~fy.P4TableEntry(
                "ipv4",
                action=fy.Action("flood"),
                is_default_action=True,
            ),
        ]
    )

    async for packet in sw.read_packets():
        print(f"{sw.name}: {packet}")

Use the SwitchOptions class to specify each switch's settings, including the p4info/p4blob and ready_handler. Use the Controller class to drive multiple switch connections. Each switch will call back into your ready_handler function after the P4Runtime connection is established.

from pathlib import Path

options = fy.SwitchOptions(
    p4info=Path("hello.p4info.txt"),
    p4blob=Path("hello.json"),
    ready_handler=ready_handler,
)

controller = fy.Controller([
    fy.Switch("sw1", "127.0.0.1:50001", options),
    fy.Switch("sw2", "127.0.0.1:50002", options),
    fy.Switch("sw3", "127.0.0.1:50003", options),
])

fy.run(controller.run())

Your ready_handler can spawn concurrent tasks with the Switch.create_task method. Tasks created this way will have their lifetimes managed by the switch object.

If the switch disconnects or its role changes to backup, the task running your ready_handler (and any tasks it spawned) will be cancelled and the ready_handler will begin again.

For more examples, see the examples directory.

Switch Read/Write API

The Switch class provides the API for interacting with P4Runtime switches. You will control a Switch object with a "ready handler" function. The ready handler is an async function that is called when the switch is ready to accept commands.

Your ready handler will typically write some control entities to the switch, then listen for incoming events and react to them with more writes. You may occasionally read entities from the switch.

When your ready handler is invoked, there is already a P4Runtime channel established, with client arbitration completed, and pipeline configured as specified in SwitchOptions.

Here is an example skeleton program. The ready handler is named ready().

async def ready(switch: fy.Switch):
    # Check if switch is the primary. If not, we may want to proceed
    # in read-only mode. In this example, ignore switch if it's a backup.
    if not switch.is_primary:
        return

    # If we're reconnecting to a switch, it will already have runtime state.
    # In this example, we just delete all entities and start over.
    await switch.delete_all()

    # Provision the pipeline with one or more `write` transactions. Each
    # `write` is a single WriteRequest which may contain multiple updates.
    await switch.write(
        # [Next section will cover what goes here.]
    )

    # Listen for events and respond to them. This "infinite" loop will
    # continue until the Switch disconnects, changes primary/backup status,
    # or the controller is stopped.
    async for packet in switch.read_packets():
        await handle_packet(switch, packet)

The Switch class provides a switch.create_task method to start a managed task. Tasks allow you to perform concurrent operations on the same switch. We could have written the last stanza above that reads packets in an infinite loop as a separate task. It's okay for the ready handler function to return early; any tasks it created will still run.

Writes

Use the write() method to write one or more P4Runtime updates and packets.

A P4Runtime update supports one of three operations: INSERT, MODIFY or DELETE. Some entities support all three operations. Other entities only support MODIFY.

Entity Operations Permitted Related Classes
P4TableEntry INSERT, MODIFY, DELETE Match, Action, IndirectAction, P4MeterConfig, P4CounterData, P4MeterCounterData
P4ActionProfileMember INSERT, MODIFY, DELETE
P4ActionProfileGroup INSERT, MODIFY, DELETE P4Member
P4MulticastGroupEntry INSERT, MODIFY, DELETE
P4CloneSessionEntry INSERT, MODIFY, DELETE
P4DigestEntry INSERT, MODIFY, DELETE
P4ExternEntry INSERT, MODIFY, DELETE
P4RegisterEntry MODIFY
P4CounterEntry MODIFY P4CounterData
P4DirectCounterEntry MODIFY P4CounterData
P4MeterEntry MODIFY P4MeterConfig, P4MeterCounterData
P4DirectMeterEntry MODIFY
P4ValueSetEntry MODIFY P4ValueSetMember

Insert/Modify/Delete Updates

To specify the operation, use a unary + (INSERT), ~ (MODIFY), or - (DELETE). If you do not specify the operation, write will raise a ValueError exception.

Here is an example showing how to insert and delete two different entities in the same WriteRequest.

await switch.write([
    +fy.P4TableEntry(          # unary + means INSERT
        "ipv4", 
        match=fy.Match(dest="192.168.1.0/24"),
        action=fy.Action("forward", port=1),
    ),
    -fy.P4TableEntry(          # unary - means DELETE
        "ipv4", 
        match=fy.Match(dest="192.168.2.0/24"),
        action=fy.Action("forward", port=2),
    ),
])

You should not insert, modify or delete the same entry in the same WriteRequest.

If you are performing the same operation on all entities, you can use the Switch insert, delete, or modify methods.

await switch.insert([
    fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3]),
    fy.P4MulticastGroupEntry(2, replicas=[4, 5, 6]),
])

Modify-Only Updates

For entities that only support the modify operation, you do not need to specify the operation. (You can optionally use ~.)

await switch.write([
    fy.P4RegisterEntry("reg1", index=0, data=0),
    fy.P4RegisterEntry("reg1", index=1, data=1),
    fy.P4RegisterEntry("reg1", index=2, data=2),
])

You can also use the modify method:

await switch.modify([
    fy.P4RegisterEntry("reg1", index=0, data=0),
    fy.P4RegisterEntry("reg1", index=1, data=1),
    fy.P4RegisterEntry("reg1", index=2, data=2),
])

If you pass a modify-only entity to the insert or delete methods, the P4Runtime server will return an error.

Sending Packets

Use the write method to send a packet.

await switch.write([fy.P4PacketOut(b"a payload.....", port=3)])

You can include other entities in the same call. Any non-update objects (e.g. P4PacketOut, P4DigestListAck) will be sent before the WriteRequest.

Listening for Packets

To receive packets, use the async iterator Switch.read_packets(). In this example, pkt is a P4PacketIn object.

read_packets can filter for a specific eth_type.

# Read packets filtering only for ARP (eth_type == 0x0806).
async for pkt in switch.read_packets(eth_types={0x0806}):
    # You can access the packet payload `pkt.payload` or any metadata value,
    # e.g. `pkt['ingress_port']`
    print(pkt.payload)
    print(pkt['ingress_port'])

Listening for Digests

To receive digests, use the async iterator Switch.read_digests. You must specify the name of the digest from your P4 program.

async for digest in switch.read_digests("digest_t"):
    # You can access the digest metadata e.g. `digest['ingress_port']`
    # Your code may need to update table entries based on the digest data.
    # To ack the digest, write `digest.ack()`.
    await switch.write([entry, ...])
    await switch.write([digest.ack()])

To acknowledge the digest entry, you can write digest.ack().

Listening for Idle Timeouts

To receive idle timeout notifications, use the async iterator Switch.read_idle_timeouts. You will receive a P4IdleTimeoutNotification which contains multiple table entries -- one for each entry that timed out.

async for timeout in switch.read_idle_timeouts():
    for entry in timeout.table_entry:
        print(timeout.timestamp, entry)

Other Events

A P4 switch may report other events using the EventEmitter API. See the SwitchEvent class for the event types. Each switch has a switch.ee attribute that lets your code register for event callbacks.

Development and Testing

Perform these steps to set up your local environment for Finsy development, or try the codespace. Finsy requires Python 3.10 or later. If poetry is not installed, follow these directions to install it.

Clone and Prepare a Virtual Environment

The poetry install command installs all development dependencies into the virtual environment (venv).

$ git clone https://github.com/byllyfish/finsy.git
$ cd finsy
$ python3 -m venv .venv
$ poetry install

Run Unit Tests

When you run pytest from the top level of the repository, you will run the unit tests.

$ poetry run pytest

Run Integration Tests

When you run pytest from within the examples directory, you will run the integration tests instead of the unit tests. The integration tests run the example programs against a Mininet network. Docker or podman are required.

$ cd examples
$ poetry run pytest

API Documentation

  1"""
  2.. include:: ../README.md
  3
  4# API Documentation
  5"""
  6
  7# Copyright (c) 2022-2023 Bill Fisher
  8#
  9# Licensed under the Apache License, Version 2.0 (the "License");
 10# you may not use this file except in compliance with the License.
 11# You may obtain a copy of the License at
 12#
 13#     http://www.apache.org/licenses/LICENSE-2.0
 14#
 15# Unless required by applicable law or agreed to in writing, software
 16# distributed under the License is distributed on an "AS IS" BASIS,
 17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 18# See the License for the specific language governing permissions and
 19# limitations under the License.
 20
 21__version__ = "0.27.0"
 22
 23import sys
 24
 25if sys.version_info < (3, 10):  # pragma: no cover
 26    raise RuntimeError("Requires Python 3.10 or later.")
 27
 28from .controller import Controller
 29from .gnmiclient import GNMIClient, GNMISubscription, GNMIUpdate
 30from .gnmipath import GNMIPath
 31from .grpcutil import GRPCCredentialsTLS, GRPCStatusCode
 32from .log import LoggerAdapter
 33from .macaddr import MACAddress
 34from .p4client import P4Client, P4ClientError, P4Error
 35from .p4entity import (
 36    P4ActionProfileGroup,
 37    P4ActionProfileMember,
 38    P4CloneSessionEntry,
 39    P4CounterData,
 40    P4CounterEntry,
 41    P4DigestEntry,
 42    P4DigestList,
 43    P4DigestListAck,
 44    P4DirectCounterEntry,
 45    P4DirectMeterEntry,
 46    P4ExternEntry,
 47    P4IndirectAction,
 48    P4Member,
 49    P4MeterConfig,
 50    P4MeterCounterData,
 51    P4MeterEntry,
 52    P4MulticastGroupEntry,
 53    P4PacketIn,
 54    P4PacketOut,
 55    P4RegisterEntry,
 56    P4TableAction,
 57    P4TableEntry,
 58    P4TableMatch,
 59    P4ValueSetEntry,
 60)
 61from .p4schema import P4ConfigAction, P4CounterUnit, P4Schema
 62from .ports import SwitchPort, SwitchPortList
 63from .runner import run
 64from .switch import Switch, SwitchEvent, SwitchOptions
 65
 66Match = P4TableMatch
 67"`Match` is an alias for P4TableMatch."
 68
 69Action = P4TableAction
 70"`Action` is an alias for P4TableAction."
 71
 72IndirectAction = P4IndirectAction
 73"`IndirectAction` is an alias for P4IndirectAction."
 74
 75__all__ = [
 76    "run",
 77    "Controller",
 78    "LoggerAdapter",
 79    "MACAddress",
 80    "P4ActionProfileGroup",
 81    "P4ActionProfileMember",
 82    "P4Client",
 83    "P4ClientError",
 84    "P4CloneSessionEntry",
 85    "P4CounterData",
 86    "P4CounterEntry",
 87    "P4CounterUnit",
 88    "P4DigestEntry",
 89    "P4DigestList",
 90    "P4DigestListAck",
 91    "P4DirectCounterEntry",
 92    "P4DirectMeterEntry",
 93    "P4Error",
 94    "P4ExternEntry",
 95    "IndirectAction",  # alias for P4IndirectAction
 96    "P4IndirectAction",
 97    "P4Member",
 98    "P4MeterConfig",
 99    "P4MeterCounterData",
100    "P4MeterEntry",
101    "P4MulticastGroupEntry",
102    "P4PacketIn",
103    "P4PacketOut",
104    "P4RegisterEntry",
105    "Action",  # alias for P4TableAction
106    "P4TableAction",
107    "P4TableEntry",
108    "Match",  # alias for P4TableMatch
109    "P4TableMatch",
110    "P4ValueSetEntry",
111    "P4ConfigAction",
112    "P4Schema",
113    "Switch",
114    "SwitchEvent",
115    "SwitchOptions",
116    "SwitchPort",
117    "SwitchPortList",
118    "GNMIClient",
119    "GNMIPath",
120    "GNMISubscription",
121    "GNMIUpdate",
122    "GRPCCredentialsTLS",
123    "GRPCStatusCode",
124]
def run(coro: Coroutine[Any, Any, NoneType]) -> None:
53def run(coro: Coroutine[Any, Any, None]) -> None:
54    """`finsy.run` provides a useful wrapper around `asyncio.run`.
55
56    This function implements common boilerplate for running a Finsy application.
57
58    - Set up basic logging to stderr at the INFO log level.
59    - Set up a signal handler for SIGTERM that shuts down gracefully.
60    - Set up caching of P4Info data so common definitions are re-used.
61
62    Example:
63
64    ```python
65    import finsy as fy
66
67    async def main():
68        async with fy.Switch("sw", "127.0.0.1:50001") as sw:
69            print(sw.p4info)
70
71    if __name__ == "__main__":
72        fy.run(main())
73    ```
74
75    If you choose to use `asyncio.run` instead, your P4Schema/P4Info objects
76    will not be eligible for sharing. You can create your own `P4SchemaCache`
77    context manager to implement this.
78    """
79    try:
80        asyncio.run(_finsy_boilerplate(coro))
81    except (KeyboardInterrupt, asyncio.CancelledError):
82        pass

finsy.run provides a useful wrapper around asyncio.run.

This function implements common boilerplate for running a Finsy application.

  • Set up basic logging to stderr at the INFO log level.
  • Set up a signal handler for SIGTERM that shuts down gracefully.
  • Set up caching of P4Info data so common definitions are re-used.

Example:

import finsy as fy

async def main():
    async with fy.Switch("sw", "127.0.0.1:50001") as sw:
        print(sw.p4info)

if __name__ == "__main__":
    fy.run(main())

If you choose to use asyncio.run instead, your P4Schema/P4Info objects will not be eligible for sharing. You can create your own P4SchemaCache context manager to implement this.

@final
class Controller:
 30@final
 31class Controller:
 32    """Represents a collection of P4Runtime switches.
 33
 34    Each `Switch` in the Controller is identified by its name. Each name must
 35    be unique.
 36
 37    ```
 38    switches = [
 39        fy.Switch("sw1", "10.0.0.1:50000"),
 40        fy.Switch("sw2", "10.0.0.2:50000"),
 41    ]
 42    controller = fy.Controller(switches)
 43    await controller.run()
 44    ```
 45
 46    A Controller can be running or stopped. There are two ways to run a
 47    Controller. You can use the `Controller.run()` method, or you can use
 48    a Controller as a context manager.
 49
 50    ```
 51    controller = fy.Controller(switches)
 52    async with controller:
 53        # Let controller run for 60 seconds.
 54        await asyncio.sleep(60)
 55    ```
 56
 57    You can `add` or `remove` switches regardless of whether a Controller is
 58    running or not. If the Controller is not running, adding or removing a Switch is
 59    instantaneous. If the Controller is running, adding a Switch will start
 60    it running asynchronously. Removing a Switch will schedule the Switch to stop,
 61    but defer actual removal until the Switch has stopped asynchronously.
 62
 63    When a switch starts inside a Controller, it fires the `CONTROLLER_ENTER`
 64    event. When a switch stops inside a Controller, it fires the `CONTROLLER_LEAVE`
 65    event.
 66
 67    A Controller supports these methods to access its contents:
 68
 69    - len(controller): Return number of switches in the controller.
 70    - controller[name]: Return the switch with the given name.
 71    - controller.get(name): Return the switch with the given name, or None if not found.
 72
 73    You can iterate over the switches in a Controller using a for loop:
 74
 75    ```
 76    for switch in controller:
 77        print(switch.name)
 78    ```
 79
 80    Any task or sub-task running inside a controller can retrieve its
 81    Controller object using the `Controller.current()` method.
 82    """
 83
 84    _switches: dict[str, Switch]
 85    _pending_removal: set[Switch]
 86    _task_count: CountdownFuture
 87
 88    _control_task: asyncio.Task[Any] | None = None
 89    "Keep track of controller's main task."
 90
 91    def __init__(self, switches: Iterable[Switch] = ()):
 92        """Initialize Controller object with an initial list of switches.
 93
 94        Args:
 95            switches: a collection or other iterable of Switch objects.
 96        """
 97        self._switches = {}
 98        self._pending_removal = set()
 99        self._task_count = CountdownFuture()
100
101        for switch in switches:
102            if switch.name in self._switches:
103                raise ValueError(f"Switch named {switch.name!r} already exists")
104            self._switches[switch.name] = switch
105
106    @property
107    def running(self) -> bool:
108        "True if Controller is running."
109        return self._control_task is not None
110
111    async def run(self) -> None:
112        "Run the controller."
113        async with self:
114            await wait_for_cancel()
115
116    def stop(self) -> None:
117        "Stop the controller if it is running."
118        if self._control_task is not None:
119            self._control_task.cancel()
120
121    async def __aenter__(self) -> Self:
122        "Run the controller as a context manager (see also run())."
123        assert not self.running, "Controller.__aenter__ is not re-entrant"
124        assert self._task_count.value() == 0
125        assert not self._pending_removal
126
127        self._control_task = asyncio.current_task()
128        _CONTROLLER.set(self)
129
130        try:
131            # Start each switch running.
132            for switch in self:
133                self._start_switch(switch)
134        except Exception:
135            self._control_task = None
136            _CONTROLLER.set(None)
137            raise
138
139        return self
140
141    async def __aexit__(
142        self,
143        _exc_type: type[BaseException] | None,
144        _exc_val: BaseException | None,
145        _exc_tb: TracebackType | None,
146    ) -> bool | None:
147        "Run the controller as a context manager (see also run())."
148        assert self.running
149
150        try:
151            # Stop all the switches.
152            for switch in self:
153                self._stop_switch(switch)
154
155            # Wait for switch tasks to finish.
156            await self._task_count.wait()
157
158        finally:
159            self._control_task = None
160            _CONTROLLER.set(None)
161
162    def add(self, switch: Switch) -> None:
163        """Add a switch to the controller.
164
165        If the controller is running, tell the switch to start.
166
167        Args:
168            switch: the Switch object.
169        """
170        if switch.name in self._switches:
171            raise ValueError(f"Switch named {switch.name!r} already exists")
172
173        self._switches[switch.name] = switch
174        if self.running:
175            self._start_switch(switch)
176
177    def remove(self, switch: Switch) -> asyncio.Event:
178        """Remove a switch from the controller.
179
180        If the controller is running, tell the switch to stop and schedule it
181        for removal when it fully stops.
182
183        Args:
184            switch: the Switch object.
185        """
186        name = switch.name
187        if self._switches.get(name, None) is not switch:
188            raise ValueError(f"Switch named {name!r} not found")
189
190        del self._switches[name]
191
192        event = asyncio.Event()
193        if self.running:
194            # When controller is running, event will complete when switch
195            # is actually stopped.
196            self._stop_switch(switch)
197            self._pending_removal.add(switch)
198
199            def _controller_leave(sw: Switch):
200                self._pending_removal.discard(sw)
201                event.set()
202
203            switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave)  # type: ignore
204        else:
205            # When controller is not running, event completes immediately.
206            event.set()
207
208        return event
209
210    def _start_switch(self, switch: Switch):
211        "Start the switch's control task."
212        LOGGER.debug("Controller._start_switch: %r", switch)
213        assert switch._control_task is None  # pyright: ignore[reportPrivateUsage]
214
215        switch.ee.emit(SwitchEvent.CONTROLLER_ENTER, switch)
216
217        task = asyncio.create_task(switch.run(), name=f"fy:{switch.name}")
218        switch._control_task = task  # pyright: ignore[reportPrivateUsage]
219        self._task_count.increment()
220
221        def _switch_done(done: asyncio.Task[Any]):
222            switch._control_task = None  # pyright: ignore[reportPrivateUsage]
223            switch.ee.emit(SwitchEvent.CONTROLLER_LEAVE, switch)
224            self._task_count.decrement()
225
226            if not done.cancelled():
227                ex = done.exception()
228                if ex is not None:
229                    if not isinstance(ex, SwitchFailFastError):
230                        # The `fail_fast` error has already been logged. If
231                        # it's any other error, log it. (There shouldn't be
232                        # any other error.)
233                        LOGGER.critical(
234                            "Controller task %r failed",
235                            done.get_name(),
236                            exc_info=ex,
237                        )
238                    # Shutdown the program cleanly due to switch failure.
239                    raise SystemExit(99)
240
241        task.add_done_callback(_switch_done)
242
243    def _stop_switch(self, switch: Switch):
244        "Stop the switch's control task."
245        LOGGER.debug("Controller._stop_switch: %r", switch)
246
247        if switch._control_task is not None:  # pyright: ignore[reportPrivateUsage]
248            switch._control_task.cancel()  # pyright: ignore[reportPrivateUsage]
249
250    def __len__(self) -> int:
251        "Return the number of switches."
252        return len(self._switches)
253
254    def __iter__(self) -> Iterator[Switch]:
255        "Iterate over the switches."
256        return iter(self._switches.values())
257
258    def __getitem__(self, name: str) -> Switch:
259        "Retrieve a switch by name."
260        return self._switches[name]
261
262    def get(self, name: str) -> Switch | None:
263        "Retrieve a switch by name, or return None if not found."
264        return self._switches.get(name)
265
266    @staticmethod
267    def current() -> "Controller":
268        "Return the current Controller object."
269        result = _CONTROLLER.get()
270        if result is None:
271            raise RuntimeError("controller does not exist")
272        return result

Represents a collection of P4Runtime switches.

Each Switch in the Controller is identified by its name. Each name must be unique.

switches = [
    fy.Switch("sw1", "10.0.0.1:50000"),
    fy.Switch("sw2", "10.0.0.2:50000"),
]
controller = fy.Controller(switches)
await controller.run()

A Controller can be running or stopped. There are two ways to run a Controller. You can use the Controller.run() method, or you can use a Controller as a context manager.

controller = fy.Controller(switches)
async with controller:
    # Let controller run for 60 seconds.
    await asyncio.sleep(60)

You can add or remove switches regardless of whether a Controller is running or not. If the Controller is not running, adding or removing a Switch is instantaneous. If the Controller is running, adding a Switch will start it running asynchronously. Removing a Switch will schedule the Switch to stop, but defer actual removal until the Switch has stopped asynchronously.

When a switch starts inside a Controller, it fires the CONTROLLER_ENTER event. When a switch stops inside a Controller, it fires the CONTROLLER_LEAVE event.

A Controller supports these methods to access its contents:

  • len(controller): Return number of switches in the controller.
  • controller[name]: Return the switch with the given name.
  • controller.get(name): Return the switch with the given name, or None if not found.

You can iterate over the switches in a Controller using a for loop:

for switch in controller:
    print(switch.name)

Any task or sub-task running inside a controller can retrieve its Controller object using the Controller.current() method.

Controller(switches: Iterable[Switch] = ())
 91    def __init__(self, switches: Iterable[Switch] = ()):
 92        """Initialize Controller object with an initial list of switches.
 93
 94        Args:
 95            switches: a collection or other iterable of Switch objects.
 96        """
 97        self._switches = {}
 98        self._pending_removal = set()
 99        self._task_count = CountdownFuture()
100
101        for switch in switches:
102            if switch.name in self._switches:
103                raise ValueError(f"Switch named {switch.name!r} already exists")
104            self._switches[switch.name] = switch

Initialize Controller object with an initial list of switches.

Arguments:
  • switches: a collection or other iterable of Switch objects.
running: bool
106    @property
107    def running(self) -> bool:
108        "True if Controller is running."
109        return self._control_task is not None

True if Controller is running.

async def run(self) -> None:
111    async def run(self) -> None:
112        "Run the controller."
113        async with self:
114            await wait_for_cancel()

Run the controller.

def stop(self) -> None:
116    def stop(self) -> None:
117        "Stop the controller if it is running."
118        if self._control_task is not None:
119            self._control_task.cancel()

Stop the controller if it is running.

async def __aenter__(self) -> typing_extensions.Self:
121    async def __aenter__(self) -> Self:
122        "Run the controller as a context manager (see also run())."
123        assert not self.running, "Controller.__aenter__ is not re-entrant"
124        assert self._task_count.value() == 0
125        assert not self._pending_removal
126
127        self._control_task = asyncio.current_task()
128        _CONTROLLER.set(self)
129
130        try:
131            # Start each switch running.
132            for switch in self:
133                self._start_switch(switch)
134        except Exception:
135            self._control_task = None
136            _CONTROLLER.set(None)
137            raise
138
139        return self

Run the controller as a context manager (see also run()).

def add(self, switch: Switch) -> None:
162    def add(self, switch: Switch) -> None:
163        """Add a switch to the controller.
164
165        If the controller is running, tell the switch to start.
166
167        Args:
168            switch: the Switch object.
169        """
170        if switch.name in self._switches:
171            raise ValueError(f"Switch named {switch.name!r} already exists")
172
173        self._switches[switch.name] = switch
174        if self.running:
175            self._start_switch(switch)

Add a switch to the controller.

If the controller is running, tell the switch to start.

Arguments:
  • switch: the Switch object.
def remove(self, switch: Switch) -> asyncio.locks.Event:
177    def remove(self, switch: Switch) -> asyncio.Event:
178        """Remove a switch from the controller.
179
180        If the controller is running, tell the switch to stop and schedule it
181        for removal when it fully stops.
182
183        Args:
184            switch: the Switch object.
185        """
186        name = switch.name
187        if self._switches.get(name, None) is not switch:
188            raise ValueError(f"Switch named {name!r} not found")
189
190        del self._switches[name]
191
192        event = asyncio.Event()
193        if self.running:
194            # When controller is running, event will complete when switch
195            # is actually stopped.
196            self._stop_switch(switch)
197            self._pending_removal.add(switch)
198
199            def _controller_leave(sw: Switch):
200                self._pending_removal.discard(sw)
201                event.set()
202
203            switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave)  # type: ignore
204        else:
205            # When controller is not running, event completes immediately.
206            event.set()
207
208        return event

Remove a switch from the controller.

If the controller is running, tell the switch to stop and schedule it for removal when it fully stops.

Arguments:
  • switch: the Switch object.
def __len__(self) -> int:
250    def __len__(self) -> int:
251        "Return the number of switches."
252        return len(self._switches)

Return the number of switches.

def __iter__(self) -> Iterator[Switch]:
254    def __iter__(self) -> Iterator[Switch]:
255        "Iterate over the switches."
256        return iter(self._switches.values())

Iterate over the switches.

def __getitem__(self, name: str) -> Switch:
258    def __getitem__(self, name: str) -> Switch:
259        "Retrieve a switch by name."
260        return self._switches[name]

Retrieve a switch by name.

def get(self, name: str) -> Switch | None:
262    def get(self, name: str) -> Switch | None:
263        "Retrieve a switch by name, or return None if not found."
264        return self._switches.get(name)

Retrieve a switch by name, or return None if not found.

@staticmethod
def current() -> Controller:
266    @staticmethod
267    def current() -> "Controller":
268        "Return the current Controller object."
269        result = _CONTROLLER.get()
270        if result is None:
271            raise RuntimeError("controller does not exist")
272        return result

Return the current Controller object.

class LoggerAdapter(logging.LoggerAdapter):
68class LoggerAdapter(_BaseLoggerAdapter):
69    """Custom log adapter to include the name of the current task."""
70
71    def process(
72        self,
73        msg: Any,
74        kwargs: MutableMapping[str, Any],
75    ) -> tuple[Any, MutableMapping[str, Any]]:
76        """Process the logging message and keyword arguments passed in to a
77        logging call to insert contextual information.
78        """
79        task_name = _get_current_task_name()
80        return f"[{task_name}] {msg}", kwargs
81
82    def info(self, msg: Any, *args: Any, **kwargs: Any) -> None:
83        """INFO level uses a concise task name representation for readability."""
84        if self.logger.isEnabledFor(logging.INFO):
85            task_name = _get_current_task_name(True)
86            self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)

Custom log adapter to include the name of the current task.

def process( self, msg: Any, kwargs: MutableMapping[str, Any]) -> tuple[typing.Any, typing.MutableMapping[str, typing.Any]]:
71    def process(
72        self,
73        msg: Any,
74        kwargs: MutableMapping[str, Any],
75    ) -> tuple[Any, MutableMapping[str, Any]]:
76        """Process the logging message and keyword arguments passed in to a
77        logging call to insert contextual information.
78        """
79        task_name = _get_current_task_name()
80        return f"[{task_name}] {msg}", kwargs

Process the logging message and keyword arguments passed in to a logging call to insert contextual information.

def info(self, msg: Any, *args: Any, **kwargs: Any) -> None:
82    def info(self, msg: Any, *args: Any, **kwargs: Any) -> None:
83        """INFO level uses a concise task name representation for readability."""
84        if self.logger.isEnabledFor(logging.INFO):
85            task_name = _get_current_task_name(True)
86            self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)

INFO level uses a concise task name representation for readability.

@functools.total_ordering
class MACAddress:
 24@functools.total_ordering
 25class MACAddress:
 26    """Concrete class for a MAC address."""
 27
 28    __slots__ = ("_mac", "__weakref__")
 29    _mac: int
 30
 31    def __init__(self, value: object) -> None:
 32        "Construct a MAC address from a string, integer or bytes object."
 33        match value:
 34            case int():
 35                self._mac = _from_int(value)
 36            case bytes():
 37                self._mac = _from_bytes(value)
 38            case MACAddress():
 39                self._mac = value._mac
 40            case _:
 41                # Input argument is a MAC string, or an object that is
 42                # formatted as MAC string (behave like ipaddress module).
 43                self._mac = _from_string(str(value))
 44
 45    @property
 46    def packed(self) -> bytes:
 47        "Return the MAC address a byte string."
 48        return _to_bytes(self._mac)
 49
 50    @property
 51    def max_prefixlen(self) -> int:
 52        "Return the maximum prefix length (48 bits)."
 53        return _BIT_WIDTH
 54
 55    @property
 56    def is_multicast(self) -> bool:
 57        "Return true if MAC address has the multicast bit set."
 58        return self._mac & (1 << 40) != 0
 59
 60    @property
 61    def is_private(self) -> bool:
 62        "Return true if MAC address has the locally administered bit set."
 63        return self._mac & (1 << 41) != 0
 64
 65    @property
 66    def is_global(self) -> bool:
 67        "Return true if the locally administered bit is not set."
 68        return not self.is_private
 69
 70    @property
 71    def is_unspecified(self) -> bool:
 72        "Return true if MAC address is all zeros."
 73        return self._mac == 0
 74
 75    @property
 76    def is_broadcast(self) -> bool:
 77        "Return true if MAC address is the broadcast address."
 78        return self._mac == (1 << _BIT_WIDTH) - 1
 79
 80    def __int__(self) -> int:
 81        return self._mac
 82
 83    def __eq__(self, rhs: object) -> bool:
 84        if not isinstance(rhs, MACAddress):
 85            return NotImplemented
 86        return self._mac == rhs._mac
 87
 88    def __lt__(self, rhs: object) -> bool:
 89        if not isinstance(rhs, MACAddress):
 90            return NotImplemented
 91        return self._mac < rhs._mac
 92
 93    def __repr__(self) -> str:
 94        return f"MACAddress({_to_string(self._mac)!r})"
 95
 96    def __str__(self) -> str:
 97        return _to_string(self._mac)
 98
 99    def __hash__(self) -> int:
100        return hash(hex(self._mac))

Concrete class for a MAC address.

MACAddress(value: object)
31    def __init__(self, value: object) -> None:
32        "Construct a MAC address from a string, integer or bytes object."
33        match value:
34            case int():
35                self._mac = _from_int(value)
36            case bytes():
37                self._mac = _from_bytes(value)
38            case MACAddress():
39                self._mac = value._mac
40            case _:
41                # Input argument is a MAC string, or an object that is
42                # formatted as MAC string (behave like ipaddress module).
43                self._mac = _from_string(str(value))

Construct a MAC address from a string, integer or bytes object.

packed: bytes
45    @property
46    def packed(self) -> bytes:
47        "Return the MAC address a byte string."
48        return _to_bytes(self._mac)

Return the MAC address a byte string.

max_prefixlen: int
50    @property
51    def max_prefixlen(self) -> int:
52        "Return the maximum prefix length (48 bits)."
53        return _BIT_WIDTH

Return the maximum prefix length (48 bits).

is_multicast: bool
55    @property
56    def is_multicast(self) -> bool:
57        "Return true if MAC address has the multicast bit set."
58        return self._mac & (1 << 40) != 0

Return true if MAC address has the multicast bit set.

is_private: bool
60    @property
61    def is_private(self) -> bool:
62        "Return true if MAC address has the locally administered bit set."
63        return self._mac & (1 << 41) != 0

Return true if MAC address has the locally administered bit set.

is_global: bool
65    @property
66    def is_global(self) -> bool:
67        "Return true if the locally administered bit is not set."
68        return not self.is_private

Return true if the locally administered bit is not set.

is_unspecified: bool
70    @property
71    def is_unspecified(self) -> bool:
72        "Return true if MAC address is all zeros."
73        return self._mac == 0

Return true if MAC address is all zeros.

is_broadcast: bool
75    @property
76    def is_broadcast(self) -> bool:
77        "Return true if MAC address is the broadcast address."
78        return self._mac == (1 << _BIT_WIDTH) - 1

Return true if MAC address is the broadcast address.

@decodable('action_profile_group')
@dataclass(slots=True)
class P4ActionProfileGroup(finsy.p4entity._P4Writable):
1458@decodable("action_profile_group")
1459@dataclass(slots=True)
1460class P4ActionProfileGroup(_P4Writable):
1461    "Represents a P4Runtime ActionProfileGroup."
1462
1463    action_profile_id: str = ""
1464    _: KW_ONLY
1465    group_id: int = 0
1466    max_size: int = 0
1467    members: Sequence[P4Member] | None = None
1468
1469    def encode(self, schema: P4Schema) -> p4r.Entity:
1470        "Encode P4ActionProfileGroup as protobuf."
1471        if not self.action_profile_id:
1472            return p4r.Entity(action_profile_group=p4r.ActionProfileGroup())
1473
1474        profile = schema.action_profiles[self.action_profile_id]
1475
1476        if self.members is not None:
1477            members = [member.encode() for member in self.members]
1478        else:
1479            members = None
1480
1481        entry = p4r.ActionProfileGroup(
1482            action_profile_id=profile.id,
1483            group_id=self.group_id,
1484            members=members,
1485            max_size=self.max_size,
1486        )
1487        return p4r.Entity(action_profile_group=entry)
1488
1489    @classmethod
1490    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1491        "Decode protobuf to ActionProfileGroup data."
1492        entry = msg.action_profile_group
1493        if entry.action_profile_id == 0:
1494            return cls()
1495
1496        profile = schema.action_profiles[entry.action_profile_id]
1497
1498        if entry.members:
1499            members = [P4Member.decode(member) for member in entry.members]
1500        else:
1501            members = None
1502
1503        return cls(
1504            action_profile_id=profile.alias,
1505            group_id=entry.group_id,
1506            max_size=entry.max_size,
1507            members=members,
1508        )
1509
1510    def action_str(self, _schema: P4Schema) -> str:
1511        "Return string representation of the weighted members."
1512        if not self.members:
1513            return ""
1514
1515        return " ".join(
1516            [f"{member.weight}*{member.member_id:#x}" for member in self.members]
1517        )

Represents a P4Runtime ActionProfileGroup.

P4ActionProfileGroup( action_profile_id: str = '', *, group_id: int = 0, max_size: int = 0, members: Optional[Sequence[P4Member]] = None)
action_profile_id: str
group_id: int
max_size: int
members: Optional[Sequence[P4Member]]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1469    def encode(self, schema: P4Schema) -> p4r.Entity:
1470        "Encode P4ActionProfileGroup as protobuf."
1471        if not self.action_profile_id:
1472            return p4r.Entity(action_profile_group=p4r.ActionProfileGroup())
1473
1474        profile = schema.action_profiles[self.action_profile_id]
1475
1476        if self.members is not None:
1477            members = [member.encode() for member in self.members]
1478        else:
1479            members = None
1480
1481        entry = p4r.ActionProfileGroup(
1482            action_profile_id=profile.id,
1483            group_id=self.group_id,
1484            members=members,
1485            max_size=self.max_size,
1486        )
1487        return p4r.Entity(action_profile_group=entry)

Encode P4ActionProfileGroup as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1489    @classmethod
1490    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1491        "Decode protobuf to ActionProfileGroup data."
1492        entry = msg.action_profile_group
1493        if entry.action_profile_id == 0:
1494            return cls()
1495
1496        profile = schema.action_profiles[entry.action_profile_id]
1497
1498        if entry.members:
1499            members = [P4Member.decode(member) for member in entry.members]
1500        else:
1501            members = None
1502
1503        return cls(
1504            action_profile_id=profile.alias,
1505            group_id=entry.group_id,
1506            max_size=entry.max_size,
1507            members=members,
1508        )

Decode protobuf to ActionProfileGroup data.

def action_str(self, _schema: P4Schema) -> str:
1510    def action_str(self, _schema: P4Schema) -> str:
1511        "Return string representation of the weighted members."
1512        if not self.members:
1513            return ""
1514
1515        return " ".join(
1516            [f"{member.weight}*{member.member_id:#x}" for member in self.members]
1517        )

Return string representation of the weighted members.

@decodable('action_profile_member')
@dataclass(slots=True)
class P4ActionProfileMember(finsy.p4entity._P4Writable):
1356@decodable("action_profile_member")
1357@dataclass(slots=True)
1358class P4ActionProfileMember(_P4Writable):
1359    "Represents a P4Runtime ActionProfileMember."
1360
1361    action_profile_id: str = ""
1362    _: KW_ONLY
1363    member_id: int = 0
1364    action: P4TableAction | None = None
1365
1366    def encode(self, schema: P4Schema) -> p4r.Entity:
1367        "Encode P4ActionProfileMember as protobuf."
1368        if not self.action_profile_id:
1369            return p4r.Entity(action_profile_member=p4r.ActionProfileMember())
1370
1371        profile = schema.action_profiles[self.action_profile_id]
1372
1373        if self.action:
1374            action = self.action.encode_action(schema)
1375        else:
1376            action = None
1377
1378        entry = p4r.ActionProfileMember(
1379            action_profile_id=profile.id,
1380            member_id=self.member_id,
1381            action=action,
1382        )
1383        return p4r.Entity(action_profile_member=entry)
1384
1385    @classmethod
1386    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1387        "Decode protobuf to ActionProfileMember data."
1388        entry = msg.action_profile_member
1389        if entry.action_profile_id == 0:
1390            return cls()
1391
1392        profile = schema.action_profiles[entry.action_profile_id]
1393
1394        if entry.HasField("action"):
1395            action = P4TableAction.decode_action(entry.action, schema)
1396        else:
1397            action = None
1398
1399        return cls(
1400            action_profile_id=profile.alias,
1401            member_id=entry.member_id,
1402            action=action,
1403        )
1404
1405    def action_str(self, schema: P4Schema) -> str:
1406        "Format the action as a human-readable, canonical string."
1407        if self.action is None:
1408            return NOACTION_STR
1409        return self.action.format_str(schema)

Represents a P4Runtime ActionProfileMember.

P4ActionProfileMember( action_profile_id: str = '', *, member_id: int = 0, action: P4TableAction | None = None)
action_profile_id: str
member_id: int
action: P4TableAction | None
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1366    def encode(self, schema: P4Schema) -> p4r.Entity:
1367        "Encode P4ActionProfileMember as protobuf."
1368        if not self.action_profile_id:
1369            return p4r.Entity(action_profile_member=p4r.ActionProfileMember())
1370
1371        profile = schema.action_profiles[self.action_profile_id]
1372
1373        if self.action:
1374            action = self.action.encode_action(schema)
1375        else:
1376            action = None
1377
1378        entry = p4r.ActionProfileMember(
1379            action_profile_id=profile.id,
1380            member_id=self.member_id,
1381            action=action,
1382        )
1383        return p4r.Entity(action_profile_member=entry)

Encode P4ActionProfileMember as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1385    @classmethod
1386    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1387        "Decode protobuf to ActionProfileMember data."
1388        entry = msg.action_profile_member
1389        if entry.action_profile_id == 0:
1390            return cls()
1391
1392        profile = schema.action_profiles[entry.action_profile_id]
1393
1394        if entry.HasField("action"):
1395            action = P4TableAction.decode_action(entry.action, schema)
1396        else:
1397            action = None
1398
1399        return cls(
1400            action_profile_id=profile.alias,
1401            member_id=entry.member_id,
1402            action=action,
1403        )

Decode protobuf to ActionProfileMember data.

def action_str(self, schema: P4Schema) -> str:
1405    def action_str(self, schema: P4Schema) -> str:
1406        "Format the action as a human-readable, canonical string."
1407        if self.action is None:
1408            return NOACTION_STR
1409        return self.action.format_str(schema)

Format the action as a human-readable, canonical string.

class P4Client:
239class P4Client:
240    "Implements a P4Runtime client."
241
242    _address: str
243    _credentials: GRPCCredentialsTLS | None
244    _wait_for_ready: bool
245    _channel: grpc.aio.Channel | None = None
246    _stub: p4r_grpc.P4RuntimeStub | None = None
247    _stream: _P4StreamTypeAlias | None = None
248    _complete_request: Callable[[pbutil.PBMessage], None] | None = None
249
250    _schema: P4Schema | None = None
251    "Annotate log messages using this optional P4Info schema."
252
253    def __init__(
254        self,
255        address: str,
256        credentials: GRPCCredentialsTLS | None = None,
257        *,
258        wait_for_ready: bool = True,
259    ) -> None:
260        self._address = address
261        self._credentials = credentials
262        self._wait_for_ready = wait_for_ready
263
264    @property
265    def channel(self) -> grpc.aio.Channel | None:
266        "Return the GRPC channel object, or None if the channel is not open."
267        return self._channel
268
269    async def __aenter__(self) -> Self:
270        await self.open()
271        return self
272
273    async def __aexit__(self, *args: Any) -> bool | None:
274        await self.close()
275
276    async def open(
277        self,
278        *,
279        schema: P4Schema | None = None,
280        complete_request: Callable[[pbutil.PBMessage], None] | None = None,
281    ) -> None:
282        """Open the client channel.
283
284        Note: This method is `async` for forward-compatible reasons.
285        """
286        assert self._channel is None
287        assert self._stub is None
288
289        # Increase max_metadata_size from 8 KB to 64 KB.
290        options = GRPCOptions(
291            max_metadata_size=64 * 1024,  # 64 kilobytes
292            max_reconnect_backoff_ms=15000,  # 15.0 seconds
293        )
294
295        self._channel = grpc_channel(
296            self._address,
297            credentials=self._credentials,
298            options=options,
299            client_type="P4Client",
300        )
301
302        self._stub = p4r_grpc.P4RuntimeStub(self._channel)
303        self._schema = schema
304        self._complete_request = complete_request
305
306    async def close(self) -> None:
307        "Close the client channel."
308        if self._channel is not None:
309            LOGGER.debug("P4Client: close channel %r", self._address)
310
311            if self._stream is not None:
312                self._stream.cancel()
313                self._stream = None
314
315            await self._channel.close()
316            self._channel = None
317            self._stub = None
318            self._schema = None
319            self._complete_request = None
320
321    async def send(self, msg: p4r.StreamMessageRequest) -> None:
322        """Send a message to the stream."""
323        assert self._stub is not None
324
325        if not self._stream or self._stream.done():
326            self._stream = cast(
327                _P4StreamTypeAlias,
328                self._stub.StreamChannel(wait_for_ready=self._wait_for_ready),  # type: ignore
329            )
330
331        self._log_msg(msg)
332
333        try:
334            await self._stream.write(msg)
335        except grpc.RpcError as ex:
336            raise P4ClientError(ex, "send") from None
337
338    async def receive(self) -> p4r.StreamMessageResponse:
339        """Read a message from the stream."""
340        assert self._stream is not None
341
342        try:
343            msg = cast(
344                p4r.StreamMessageResponse,
345                await self._stream.read(),  # type: ignore
346            )
347            if msg is GRPC_EOF:
348                # Treat EOF as a protocol violation.
349                raise RuntimeError("P4Client.receive got EOF!")
350
351        except grpc.RpcError as ex:
352            raise P4ClientError(ex, "receive") from None
353
354        self._log_msg(msg)
355        return msg
356
357    @overload
358    async def request(
359        self, msg: p4r.WriteRequest
360    ) -> p4r.WriteResponse: ...  # pragma: no cover
361
362    @overload
363    async def request(
364        self, msg: p4r.GetForwardingPipelineConfigRequest
365    ) -> p4r.GetForwardingPipelineConfigResponse: ...  # pragma: no cover
366
367    @overload
368    async def request(
369        self, msg: p4r.SetForwardingPipelineConfigRequest
370    ) -> p4r.SetForwardingPipelineConfigResponse: ...  # pragma: no cover
371
372    @overload
373    async def request(
374        self, msg: p4r.CapabilitiesRequest
375    ) -> p4r.CapabilitiesResponse: ...  # pragma: no cover
376
377    async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage:
378        "Send a unary-unary P4Runtime request and wait for the response."
379        assert self._stub is not None
380
381        if self._complete_request:
382            self._complete_request(msg)
383
384        msg_type = type(msg).__name__
385        assert msg_type.endswith("Request")
386        rpc_method = getattr(self._stub, msg_type[:-7])
387
388        self._log_msg(msg)
389        try:
390            reply = await rpc_method(
391                msg,
392                timeout=_DEFAULT_RPC_TIMEOUT,
393            )
394        except grpc.RpcError as ex:
395            raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None
396
397        self._log_msg(reply)
398        return reply
399
400    async def request_iter(
401        self, msg: p4r.ReadRequest
402    ) -> AsyncIterator[p4r.ReadResponse]:
403        "Send a unary-stream P4Runtime read request and wait for the responses."
404        assert self._stub is not None
405
406        if self._complete_request:
407            self._complete_request(msg)
408
409        msg_type = type(msg).__name__
410        assert msg_type.endswith("Request")
411        rpc_method = getattr(self._stub, msg_type[:-7])
412
413        self._log_msg(msg)
414        try:
415            async for reply in rpc_method(
416                msg,
417                timeout=_DEFAULT_RPC_TIMEOUT,
418            ):
419                self._log_msg(reply)
420                yield reply
421        except grpc.RpcError as ex:
422            raise P4ClientError(ex, msg_type) from None
423
424    def _log_msg(self, msg: pbutil.PBMessage) -> None:
425        "Log a P4Runtime request or response."
426        pbutil.log_msg(self._channel, msg, self._schema)

Implements a P4Runtime client.

P4Client( address: str, credentials: GRPCCredentialsTLS | None = None, *, wait_for_ready: bool = True)
253    def __init__(
254        self,
255        address: str,
256        credentials: GRPCCredentialsTLS | None = None,
257        *,
258        wait_for_ready: bool = True,
259    ) -> None:
260        self._address = address
261        self._credentials = credentials
262        self._wait_for_ready = wait_for_ready
channel: grpc.aio._base_channel.Channel | None
264    @property
265    def channel(self) -> grpc.aio.Channel | None:
266        "Return the GRPC channel object, or None if the channel is not open."
267        return self._channel

Return the GRPC channel object, or None if the channel is not open.

async def __aenter__(self) -> typing_extensions.Self:
269    async def __aenter__(self) -> Self:
270        await self.open()
271        return self
async def open( self, *, schema: P4Schema | None = None, complete_request: Optional[Callable[[google.protobuf.message.Message], NoneType]] = None) -> None:
276    async def open(
277        self,
278        *,
279        schema: P4Schema | None = None,
280        complete_request: Callable[[pbutil.PBMessage], None] | None = None,
281    ) -> None:
282        """Open the client channel.
283
284        Note: This method is `async` for forward-compatible reasons.
285        """
286        assert self._channel is None
287        assert self._stub is None
288
289        # Increase max_metadata_size from 8 KB to 64 KB.
290        options = GRPCOptions(
291            max_metadata_size=64 * 1024,  # 64 kilobytes
292            max_reconnect_backoff_ms=15000,  # 15.0 seconds
293        )
294
295        self._channel = grpc_channel(
296            self._address,
297            credentials=self._credentials,
298            options=options,
299            client_type="P4Client",
300        )
301
302        self._stub = p4r_grpc.P4RuntimeStub(self._channel)
303        self._schema = schema
304        self._complete_request = complete_request

Open the client channel.

Note: This method is async for forward-compatible reasons.

async def close(self) -> None:
306    async def close(self) -> None:
307        "Close the client channel."
308        if self._channel is not None:
309            LOGGER.debug("P4Client: close channel %r", self._address)
310
311            if self._stream is not None:
312                self._stream.cancel()
313                self._stream = None
314
315            await self._channel.close()
316            self._channel = None
317            self._stub = None
318            self._schema = None
319            self._complete_request = None

Close the client channel.

async def send(self, msg: p4.v1.p4runtime_pb2.StreamMessageRequest) -> None:
321    async def send(self, msg: p4r.StreamMessageRequest) -> None:
322        """Send a message to the stream."""
323        assert self._stub is not None
324
325        if not self._stream or self._stream.done():
326            self._stream = cast(
327                _P4StreamTypeAlias,
328                self._stub.StreamChannel(wait_for_ready=self._wait_for_ready),  # type: ignore
329            )
330
331        self._log_msg(msg)
332
333        try:
334            await self._stream.write(msg)
335        except grpc.RpcError as ex:
336            raise P4ClientError(ex, "send") from None

Send a message to the stream.

async def receive(self) -> p4.v1.p4runtime_pb2.StreamMessageResponse:
338    async def receive(self) -> p4r.StreamMessageResponse:
339        """Read a message from the stream."""
340        assert self._stream is not None
341
342        try:
343            msg = cast(
344                p4r.StreamMessageResponse,
345                await self._stream.read(),  # type: ignore
346            )
347            if msg is GRPC_EOF:
348                # Treat EOF as a protocol violation.
349                raise RuntimeError("P4Client.receive got EOF!")
350
351        except grpc.RpcError as ex:
352            raise P4ClientError(ex, "receive") from None
353
354        self._log_msg(msg)
355        return msg

Read a message from the stream.

async def request( self, msg: google.protobuf.message.Message) -> google.protobuf.message.Message:
377    async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage:
378        "Send a unary-unary P4Runtime request and wait for the response."
379        assert self._stub is not None
380
381        if self._complete_request:
382            self._complete_request(msg)
383
384        msg_type = type(msg).__name__
385        assert msg_type.endswith("Request")
386        rpc_method = getattr(self._stub, msg_type[:-7])
387
388        self._log_msg(msg)
389        try:
390            reply = await rpc_method(
391                msg,
392                timeout=_DEFAULT_RPC_TIMEOUT,
393            )
394        except grpc.RpcError as ex:
395            raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None
396
397        self._log_msg(reply)
398        return reply

Send a unary-unary P4Runtime request and wait for the response.

async def request_iter( self, msg: p4.v1.p4runtime_pb2.ReadRequest) -> AsyncIterator[p4.v1.p4runtime_pb2.ReadResponse]:
400    async def request_iter(
401        self, msg: p4r.ReadRequest
402    ) -> AsyncIterator[p4r.ReadResponse]:
403        "Send a unary-stream P4Runtime read request and wait for the responses."
404        assert self._stub is not None
405
406        if self._complete_request:
407            self._complete_request(msg)
408
409        msg_type = type(msg).__name__
410        assert msg_type.endswith("Request")
411        rpc_method = getattr(self._stub, msg_type[:-7])
412
413        self._log_msg(msg)
414        try:
415            async for reply in rpc_method(
416                msg,
417                timeout=_DEFAULT_RPC_TIMEOUT,
418            ):
419                self._log_msg(reply)
420                yield reply
421        except grpc.RpcError as ex:
422            raise P4ClientError(ex, msg_type) from None

Send a unary-stream P4Runtime read request and wait for the responses.

class P4ClientError(builtins.Exception):
125class P4ClientError(Exception):
126    "Wrap `grpc.RpcError`."
127
128    _operation: str
129    _status: P4RpcStatus
130    _outer_code: GRPCStatusCode
131    _outer_message: str
132    _schema: P4Schema | None = None  # for annotating sub-value details
133
134    def __init__(
135        self,
136        error: grpc.RpcError,
137        operation: str,
138        *,
139        msg: pbutil.PBMessage | None = None,
140        schema: P4Schema | None = None,
141    ):
142        super().__init__()
143        assert isinstance(error, grpc.aio.AioRpcError)
144
145        self._operation = operation
146        self._status = P4RpcStatus.from_rpc_error(error)
147        self._outer_code = GRPCStatusCode.from_status_code(error.code())
148        self._outer_message = error.details() or ""
149
150        if msg is not None and self.details:
151            self._attach_details(msg)
152            self._schema = schema
153
154        LOGGER.debug("%s failed: %s", operation, self)
155
156    @property
157    def code(self) -> GRPCStatusCode:
158        "GRPC status code."
159        return self._status.code
160
161    @property
162    def message(self) -> str:
163        "GRPC status message."
164        return self._status.message
165
166    @property
167    def details(self) -> dict[int, P4Error]:
168        "Optional details about P4Runtime Write updates that failed."
169        return self._status.details
170
171    @property
172    def is_not_found_only(self) -> bool:
173        """Return True if the only sub-errors are NOT_FOUND."""
174        if self.code != GRPCStatusCode.UNKNOWN:
175            return False
176
177        for err in self.details.values():
178            if err.canonical_code != GRPCStatusCode.NOT_FOUND:
179                return False
180        return True
181
182    @property
183    def is_election_id_used(self) -> bool:
184        """Return true if error is that election ID is in use."""
185        return (
186            self.code == GRPCStatusCode.INVALID_ARGUMENT
187            and _ELECTION_ID_EXISTS.search(self.message) is not None
188        )
189
190    @property
191    def is_pipeline_missing(self) -> bool:
192        "Return true if error is that no pipeline config is set."
193        return (
194            self.code == GRPCStatusCode.FAILED_PRECONDITION
195            and _NO_PIPELINE_CONFIG.search(self.message) is not None
196        )
197
198    def _attach_details(self, msg: pbutil.PBMessage):
199        "Attach the subvalue(s) from the message that caused the error."
200        if isinstance(msg, p4r.WriteRequest):
201            for key, value in self.details.items():
202                value.subvalue = msg.updates[key]
203
204    def __str__(self) -> str:
205        "Return string representation of P4ClientError object."
206        if self.details:
207
208            def _show(value: P4Error):
209                s = repr(value)
210                if self._schema:
211                    s = pbutil.log_annotate(s, self._schema)
212                s = s.replace("\n}\n)", "\n})")  # tidy multiline repr
213                return s.replace("\n", "\n" + " " * 6)  # indent 6 spaces
214
215            items = [""] + [
216                f"  [details.{key}] {_show(val)}" for key, val in self.details.items()
217            ]
218            details = "\n".join(items)
219        else:
220            details = ""
221
222        if self.code == self._outer_code and self.message == self._outer_message:
223            return (
224                f"operation={self._operation} code={self.code!r} "
225                f"message={self.message!r} {details}"
226            )
227        return (
228            f"code={self.code!r} message={self.message!r} "
229            f"details={self.details!r} operation={self._operation} "
230            f"_outer_message={self._outer_message!r} _outer_code={self._outer_code!r}"
231        )

Wrap grpc.RpcError.

P4ClientError( error: grpc.RpcError, operation: str, *, msg: google.protobuf.message.Message | None = None, schema: P4Schema | None = None)
134    def __init__(
135        self,
136        error: grpc.RpcError,
137        operation: str,
138        *,
139        msg: pbutil.PBMessage | None = None,
140        schema: P4Schema | None = None,
141    ):
142        super().__init__()
143        assert isinstance(error, grpc.aio.AioRpcError)
144
145        self._operation = operation
146        self._status = P4RpcStatus.from_rpc_error(error)
147        self._outer_code = GRPCStatusCode.from_status_code(error.code())
148        self._outer_message = error.details() or ""
149
150        if msg is not None and self.details:
151            self._attach_details(msg)
152            self._schema = schema
153
154        LOGGER.debug("%s failed: %s", operation, self)
code: GRPCStatusCode
156    @property
157    def code(self) -> GRPCStatusCode:
158        "GRPC status code."
159        return self._status.code

GRPC status code.

message: str
161    @property
162    def message(self) -> str:
163        "GRPC status message."
164        return self._status.message

GRPC status message.

details: dict[int, P4Error]
166    @property
167    def details(self) -> dict[int, P4Error]:
168        "Optional details about P4Runtime Write updates that failed."
169        return self._status.details

Optional details about P4Runtime Write updates that failed.

is_not_found_only: bool
171    @property
172    def is_not_found_only(self) -> bool:
173        """Return True if the only sub-errors are NOT_FOUND."""
174        if self.code != GRPCStatusCode.UNKNOWN:
175            return False
176
177        for err in self.details.values():
178            if err.canonical_code != GRPCStatusCode.NOT_FOUND:
179                return False
180        return True

Return True if the only sub-errors are NOT_FOUND.

is_election_id_used: bool
182    @property
183    def is_election_id_used(self) -> bool:
184        """Return true if error is that election ID is in use."""
185        return (
186            self.code == GRPCStatusCode.INVALID_ARGUMENT
187            and _ELECTION_ID_EXISTS.search(self.message) is not None
188        )

Return true if error is that election ID is in use.

is_pipeline_missing: bool
190    @property
191    def is_pipeline_missing(self) -> bool:
192        "Return true if error is that no pipeline config is set."
193        return (
194            self.code == GRPCStatusCode.FAILED_PRECONDITION
195            and _NO_PIPELINE_CONFIG.search(self.message) is not None
196        )

Return true if error is that no pipeline config is set.

@decodable('clone_session_entry')
@dataclass(slots=True)
class P4CloneSessionEntry(finsy.p4entity._P4Writable):
1267@decodable("clone_session_entry")
1268@dataclass(slots=True)
1269class P4CloneSessionEntry(_P4Writable):
1270    "Represents a P4Runtime CloneSessionEntry."
1271
1272    session_id: int = 0
1273    _: KW_ONLY
1274    class_of_service: int = 0
1275    packet_length_bytes: int = 0
1276    replicas: Sequence[_ReplicaType] = ()
1277
1278    def encode(self, schema: P4Schema) -> p4r.Entity:
1279        "Encode CloneSessionEntry data as protobuf."
1280        entry = p4r.CloneSessionEntry(
1281            session_id=self.session_id,
1282            class_of_service=self.class_of_service,
1283            packet_length_bytes=self.packet_length_bytes,
1284            replicas=[encode_replica(replica) for replica in self.replicas],
1285        )
1286        return p4r.Entity(
1287            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1288                clone_session_entry=entry
1289            )
1290        )
1291
1292    @classmethod
1293    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1294        "Decode protobuf to CloneSessionEntry data."
1295        entry = msg.packet_replication_engine_entry.clone_session_entry
1296        return cls(
1297            session_id=entry.session_id,
1298            class_of_service=entry.class_of_service,
1299            packet_length_bytes=entry.packet_length_bytes,
1300            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1301        )
1302
1303    def replicas_str(self) -> str:
1304        "Format the replicas as a human-readable, canonical string."
1305        return " ".join(format_replica(rep) for rep in self.replicas)

Represents a P4Runtime CloneSessionEntry.

P4CloneSessionEntry( session_id: int = 0, *, class_of_service: int = 0, packet_length_bytes: int = 0, replicas: Sequence[tuple[int, int] | int] = ())
session_id: int
class_of_service: int
packet_length_bytes: int
replicas: Sequence[tuple[int, int] | int]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1278    def encode(self, schema: P4Schema) -> p4r.Entity:
1279        "Encode CloneSessionEntry data as protobuf."
1280        entry = p4r.CloneSessionEntry(
1281            session_id=self.session_id,
1282            class_of_service=self.class_of_service,
1283            packet_length_bytes=self.packet_length_bytes,
1284            replicas=[encode_replica(replica) for replica in self.replicas],
1285        )
1286        return p4r.Entity(
1287            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1288                clone_session_entry=entry
1289            )
1290        )

Encode CloneSessionEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1292    @classmethod
1293    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1294        "Decode protobuf to CloneSessionEntry data."
1295        entry = msg.packet_replication_engine_entry.clone_session_entry
1296        return cls(
1297            session_id=entry.session_id,
1298            class_of_service=entry.class_of_service,
1299            packet_length_bytes=entry.packet_length_bytes,
1300            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1301        )

Decode protobuf to CloneSessionEntry data.

def replicas_str(self) -> str:
1303    def replicas_str(self) -> str:
1304        "Format the replicas as a human-readable, canonical string."
1305        return " ".join(format_replica(rep) for rep in self.replicas)

Format the replicas as a human-readable, canonical string.

@dataclass(kw_only=True, slots=True)
class P4CounterData:
826@dataclass(kw_only=True, slots=True)
827class P4CounterData:
828    """Represents a P4Runtime object that keeps statistics of bytes and packets.
829
830    Attributes:
831        byte_count (int): the number of octets
832        packet_count (int): the number of packets
833
834    See Also:
835        - P4TableEntry
836        - P4MeterCounterData
837        - P4CounterEntry
838        - P4DirectCounterEntry
839    """
840
841    byte_count: int = 0
842    "The number of octets."
843    packet_count: int = 0
844    "The number of packets."
845
846    def encode(self) -> p4r.CounterData:
847        "Encode object as CounterData."
848        return p4r.CounterData(
849            byte_count=self.byte_count, packet_count=self.packet_count
850        )
851
852    @classmethod
853    def decode(cls, msg: p4r.CounterData) -> Self:
854        "Decode CounterData."
855        return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)

Represents a P4Runtime object that keeps statistics of bytes and packets.

Attributes:
  • byte_count (int): the number of octets
  • packet_count (int): the number of packets
See Also:
  • P4TableEntry
  • P4MeterCounterData
  • P4CounterEntry
  • P4DirectCounterEntry
P4CounterData(*, byte_count: int = 0, packet_count: int = 0)
byte_count: int

The number of octets.

packet_count: int

The number of packets.

def encode(self) -> p4.v1.p4runtime_pb2.CounterData:
846    def encode(self) -> p4r.CounterData:
847        "Encode object as CounterData."
848        return p4r.CounterData(
849            byte_count=self.byte_count, packet_count=self.packet_count
850        )

Encode object as CounterData.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.CounterData) -> typing_extensions.Self:
852    @classmethod
853    def decode(cls, msg: p4r.CounterData) -> Self:
854        "Decode CounterData."
855        return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)

Decode CounterData.

@decodable('counter_entry')
@dataclass(slots=True)
class P4CounterEntry(finsy.p4entity._P4ModifyOnly):
1653@decodable("counter_entry")
1654@dataclass(slots=True)
1655class P4CounterEntry(_P4ModifyOnly):
1656    "Represents a P4Runtime CounterEntry."
1657
1658    counter_id: str = ""
1659    _: KW_ONLY
1660    index: int | None = None
1661    data: P4CounterData | None = None
1662
1663    @property
1664    def packet_count(self) -> int:
1665        "Packet count from counter data (or 0 if there is no data)."
1666        if self.data is not None:
1667            return self.data.packet_count
1668        return 0
1669
1670    @property
1671    def byte_count(self) -> int:
1672        "Byte count from counter data (or 0 if there is no data)."
1673        if self.data is not None:
1674            return self.data.byte_count
1675        return 0
1676
1677    def encode(self, schema: P4Schema) -> p4r.Entity:
1678        "Encode P4CounterEntry as protobuf."
1679        if not self.counter_id:
1680            return p4r.Entity(counter_entry=p4r.CounterEntry())
1681
1682        counter = schema.counters[self.counter_id]
1683
1684        if self.index is not None:
1685            index = p4r.Index(index=self.index)
1686        else:
1687            index = None
1688
1689        if self.data is not None:
1690            data = self.data.encode()
1691        else:
1692            data = None
1693
1694        entry = p4r.CounterEntry(
1695            counter_id=counter.id,
1696            index=index,
1697            data=data,
1698        )
1699        return p4r.Entity(counter_entry=entry)
1700
1701    @classmethod
1702    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1703        "Decode protobuf to P4CounterEntry."
1704        entry = msg.counter_entry
1705        if not entry.counter_id:
1706            return cls()
1707
1708        counter = schema.counters[entry.counter_id]
1709
1710        if entry.HasField("index"):
1711            index = entry.index.index
1712        else:
1713            index = None
1714
1715        if entry.HasField("data"):
1716            data = P4CounterData.decode(entry.data)
1717        else:
1718            data = None
1719
1720        return cls(counter_id=counter.alias, index=index, data=data)

Represents a P4Runtime CounterEntry.

P4CounterEntry( counter_id: str = '', *, index: int | None = None, data: P4CounterData | None = None)
counter_id: str
index: int | None
data: P4CounterData | None
packet_count: int
1663    @property
1664    def packet_count(self) -> int:
1665        "Packet count from counter data (or 0 if there is no data)."
1666        if self.data is not None:
1667            return self.data.packet_count
1668        return 0

Packet count from counter data (or 0 if there is no data).

byte_count: int
1670    @property
1671    def byte_count(self) -> int:
1672        "Byte count from counter data (or 0 if there is no data)."
1673        if self.data is not None:
1674            return self.data.byte_count
1675        return 0

Byte count from counter data (or 0 if there is no data).

def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1677    def encode(self, schema: P4Schema) -> p4r.Entity:
1678        "Encode P4CounterEntry as protobuf."
1679        if not self.counter_id:
1680            return p4r.Entity(counter_entry=p4r.CounterEntry())
1681
1682        counter = schema.counters[self.counter_id]
1683
1684        if self.index is not None:
1685            index = p4r.Index(index=self.index)
1686        else:
1687            index = None
1688
1689        if self.data is not None:
1690            data = self.data.encode()
1691        else:
1692            data = None
1693
1694        entry = p4r.CounterEntry(
1695            counter_id=counter.id,
1696            index=index,
1697            data=data,
1698        )
1699        return p4r.Entity(counter_entry=entry)

Encode P4CounterEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1701    @classmethod
1702    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1703        "Decode protobuf to P4CounterEntry."
1704        entry = msg.counter_entry
1705        if not entry.counter_id:
1706            return cls()
1707
1708        counter = schema.counters[entry.counter_id]
1709
1710        if entry.HasField("index"):
1711            index = entry.index.index
1712        else:
1713            index = None
1714
1715        if entry.HasField("data"):
1716            data = P4CounterData.decode(entry.data)
1717        else:
1718            data = None
1719
1720        return cls(counter_id=counter.alias, index=index, data=data)

Decode protobuf to P4CounterEntry.

class P4CounterUnit(finsy.grpcutil._EnumBase):
79class P4CounterUnit(_EnumBase):
80    "IntEnum equivalent to `p4i.CounterSpec.Unit`."
81    UNSPECIFIED = p4i.CounterSpec.Unit.UNSPECIFIED
82    BYTES = p4i.CounterSpec.Unit.BYTES
83    PACKETS = p4i.CounterSpec.Unit.PACKETS
84    BOTH = p4i.CounterSpec.Unit.BOTH

IntEnum equivalent to p4i.CounterSpec.Unit.

@decodable('digest_entry')
@dataclass(slots=True)
class P4DigestEntry(finsy.p4entity._P4Writable):
1308@decodable("digest_entry")
1309@dataclass(slots=True)
1310class P4DigestEntry(_P4Writable):
1311    "Represents a P4Runtime DigestEntry."
1312
1313    digest_id: str = ""
1314    _: KW_ONLY
1315    max_list_size: int = 0
1316    max_timeout_ns: int = 0
1317    ack_timeout_ns: int = 0
1318
1319    def encode(self, schema: P4Schema) -> p4r.Entity:
1320        "Encode DigestEntry data as protobuf."
1321        if not self.digest_id:
1322            return p4r.Entity(digest_entry=p4r.DigestEntry())
1323
1324        digest = schema.digests[self.digest_id]
1325
1326        if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0:
1327            config = None
1328        else:
1329            config = p4r.DigestEntry.Config(
1330                max_timeout_ns=self.max_timeout_ns,
1331                max_list_size=self.max_list_size,
1332                ack_timeout_ns=self.ack_timeout_ns,
1333            )
1334
1335        entry = p4r.DigestEntry(digest_id=digest.id, config=config)
1336        return p4r.Entity(digest_entry=entry)
1337
1338    @classmethod
1339    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1340        "Decode protobuf to DigestEntry data."
1341        entry = msg.digest_entry
1342        if entry.digest_id == 0:
1343            return cls()
1344
1345        digest = schema.digests[entry.digest_id]
1346
1347        config = entry.config
1348        return cls(
1349            digest.alias,
1350            max_list_size=config.max_list_size,
1351            max_timeout_ns=config.max_timeout_ns,
1352            ack_timeout_ns=config.ack_timeout_ns,
1353        )

Represents a P4Runtime DigestEntry.

P4DigestEntry( digest_id: str = '', *, max_list_size: int = 0, max_timeout_ns: int = 0, ack_timeout_ns: int = 0)
digest_id: str
max_list_size: int
max_timeout_ns: int
ack_timeout_ns: int
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1319    def encode(self, schema: P4Schema) -> p4r.Entity:
1320        "Encode DigestEntry data as protobuf."
1321        if not self.digest_id:
1322            return p4r.Entity(digest_entry=p4r.DigestEntry())
1323
1324        digest = schema.digests[self.digest_id]
1325
1326        if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0:
1327            config = None
1328        else:
1329            config = p4r.DigestEntry.Config(
1330                max_timeout_ns=self.max_timeout_ns,
1331                max_list_size=self.max_list_size,
1332                ack_timeout_ns=self.ack_timeout_ns,
1333            )
1334
1335        entry = p4r.DigestEntry(digest_id=digest.id, config=config)
1336        return p4r.Entity(digest_entry=entry)

Encode DigestEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1338    @classmethod
1339    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1340        "Decode protobuf to DigestEntry data."
1341        entry = msg.digest_entry
1342        if entry.digest_id == 0:
1343            return cls()
1344
1345        digest = schema.digests[entry.digest_id]
1346
1347        config = entry.config
1348        return cls(
1349            digest.alias,
1350            max_list_size=config.max_list_size,
1351            max_timeout_ns=config.max_timeout_ns,
1352            ack_timeout_ns=config.ack_timeout_ns,
1353        )

Decode protobuf to DigestEntry data.

@decodable('digest')
@dataclass(slots=True)
class P4DigestList:
1967@decodable("digest")
1968@dataclass(slots=True)
1969class P4DigestList:
1970    "Represents a P4Runtime DigestList."
1971
1972    digest_id: str
1973    _: KW_ONLY
1974    list_id: int
1975    timestamp: int
1976    data: list[_DataValueType]
1977
1978    @classmethod
1979    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1980        "Decode protobuf to DigestList data."
1981        digest_list = msg.digest
1982        digest = schema.digests[digest_list.digest_id]
1983
1984        type_spec = digest.type_spec
1985        return cls(
1986            digest_id=digest.alias,
1987            list_id=digest_list.list_id,
1988            timestamp=digest_list.timestamp,
1989            data=[type_spec.decode_data(item) for item in digest_list.data],
1990        )
1991
1992    def __len__(self) -> int:
1993        "Return number of values in digest list."
1994        return len(self.data)
1995
1996    def __getitem__(self, key: int) -> _DataValueType:
1997        "Retrieve value at given index from digest list."
1998        return self.data[key]
1999
2000    def __iter__(self) -> Iterator[_DataValueType]:
2001        "Iterate over values in digest list."
2002        return iter(self.data)
2003
2004    def ack(self) -> "P4DigestListAck":
2005        "Return the corresponding DigestListAck message."
2006        return P4DigestListAck(self.digest_id, self.list_id)

Represents a P4Runtime DigestList.

P4DigestList( digest_id: str, *, list_id: int, timestamp: int, data: list[typing.Any])
digest_id: str
list_id: int
timestamp: int
data: list[typing.Any]
@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.StreamMessageResponse, schema: P4Schema) -> typing_extensions.Self:
1978    @classmethod
1979    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1980        "Decode protobuf to DigestList data."
1981        digest_list = msg.digest
1982        digest = schema.digests[digest_list.digest_id]
1983
1984        type_spec = digest.type_spec
1985        return cls(
1986            digest_id=digest.alias,
1987            list_id=digest_list.list_id,
1988            timestamp=digest_list.timestamp,
1989            data=[type_spec.decode_data(item) for item in digest_list.data],
1990        )

Decode protobuf to DigestList data.

def __len__(self) -> int:
1992    def __len__(self) -> int:
1993        "Return number of values in digest list."
1994        return len(self.data)

Return number of values in digest list.

def __getitem__(self, key: int) -> Any:
1996    def __getitem__(self, key: int) -> _DataValueType:
1997        "Retrieve value at given index from digest list."
1998        return self.data[key]

Retrieve value at given index from digest list.

def __iter__(self) -> Iterator[Any]:
2000    def __iter__(self) -> Iterator[_DataValueType]:
2001        "Iterate over values in digest list."
2002        return iter(self.data)

Iterate over values in digest list.

def ack(self) -> P4DigestListAck:
2004    def ack(self) -> "P4DigestListAck":
2005        "Return the corresponding DigestListAck message."
2006        return P4DigestListAck(self.digest_id, self.list_id)

Return the corresponding DigestListAck message.

@dataclass(slots=True)
class P4DigestListAck:
2009@dataclass(slots=True)
2010class P4DigestListAck:
2011    "Represents a P4Runtime DigestListAck."
2012
2013    digest_id: str
2014    list_id: int
2015
2016    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
2017        "Encode DigestListAck data as protobuf."
2018        digest = schema.digests[self.digest_id]
2019
2020        return p4r.StreamMessageRequest(
2021            digest_ack=p4r.DigestListAck(
2022                digest_id=digest.id,
2023                list_id=self.list_id,
2024            )
2025        )

Represents a P4Runtime DigestListAck.

P4DigestListAck(digest_id: str, list_id: int)
digest_id: str
list_id: int
def encode_update( self, schema: P4Schema) -> p4.v1.p4runtime_pb2.StreamMessageRequest:
2016    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
2017        "Encode DigestListAck data as protobuf."
2018        digest = schema.digests[self.digest_id]
2019
2020        return p4r.StreamMessageRequest(
2021            digest_ack=p4r.DigestListAck(
2022                digest_id=digest.id,
2023                list_id=self.list_id,
2024            )
2025        )

Encode DigestListAck data as protobuf.

@decodable('direct_counter_entry')
@dataclass(slots=True)
class P4DirectCounterEntry(finsy.p4entity._P4ModifyOnly):
1723@decodable("direct_counter_entry")
1724@dataclass(slots=True)
1725class P4DirectCounterEntry(_P4ModifyOnly):
1726    "Represents a P4Runtime DirectCounterEntry."
1727
1728    counter_id: str = ""
1729    _: KW_ONLY
1730    table_entry: P4TableEntry | None = None
1731    data: P4CounterData | None = None
1732
1733    @property
1734    def table_id(self) -> str:
1735        "Return table_id of related table."
1736        if self.table_entry is None:
1737            return ""
1738        return self.table_entry.table_id
1739
1740    @property
1741    def packet_count(self) -> int:
1742        "Packet count from counter data (or 0 if there is no data)."
1743        if self.data is not None:
1744            return self.data.packet_count
1745        return 0
1746
1747    @property
1748    def byte_count(self) -> int:
1749        "Byte count from counter data (or 0 if there is no data)."
1750        if self.data is not None:
1751            return self.data.byte_count
1752        return 0
1753
1754    def encode(self, schema: P4Schema) -> p4r.Entity:
1755        "Encode P4DirectCounterEntry as protobuf."
1756        if self.table_entry is None:
1757            # Use `counter_id` to construct a `P4TableEntry` with the proper
1758            # table name.
1759            if self.counter_id:
1760                tb_name = schema.direct_counters[self.counter_id].direct_table_name
1761                table_entry = P4TableEntry(tb_name)
1762            else:
1763                table_entry = P4TableEntry()
1764        else:
1765            table_entry = self.table_entry
1766
1767        if self.data is not None:
1768            data = self.data.encode()
1769        else:
1770            data = None
1771
1772        entry = p4r.DirectCounterEntry(
1773            table_entry=table_entry.encode_entry(schema),
1774            data=data,
1775        )
1776        return p4r.Entity(direct_counter_entry=entry)
1777
1778    @classmethod
1779    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1780        "Decode protobuf to P4DirectCounterEntry."
1781        entry = msg.direct_counter_entry
1782
1783        if entry.HasField("table_entry"):
1784            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1785        else:
1786            table_entry = None
1787
1788        if entry.HasField("data"):
1789            data = P4CounterData.decode(entry.data)
1790        else:
1791            data = None
1792
1793        # Determine `counter_id` from table_entry.
1794        counter_id = ""
1795        if table_entry is not None and table_entry.table_id:
1796            direct_counter = schema.tables[table_entry.table_id].direct_counter
1797            assert direct_counter is not None
1798            counter_id = direct_counter.alias
1799
1800        return cls(counter_id, table_entry=table_entry, data=data)

Represents a P4Runtime DirectCounterEntry.

P4DirectCounterEntry( counter_id: str = '', *, table_entry: P4TableEntry | None = None, data: P4CounterData | None = None)
counter_id: str
table_entry: P4TableEntry | None
data: P4CounterData | None
table_id: str
1733    @property
1734    def table_id(self) -> str:
1735        "Return table_id of related table."
1736        if self.table_entry is None:
1737            return ""
1738        return self.table_entry.table_id

Return table_id of related table.

packet_count: int
1740    @property
1741    def packet_count(self) -> int:
1742        "Packet count from counter data (or 0 if there is no data)."
1743        if self.data is not None:
1744            return self.data.packet_count
1745        return 0

Packet count from counter data (or 0 if there is no data).

byte_count: int
1747    @property
1748    def byte_count(self) -> int:
1749        "Byte count from counter data (or 0 if there is no data)."
1750        if self.data is not None:
1751            return self.data.byte_count
1752        return 0

Byte count from counter data (or 0 if there is no data).

def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1754    def encode(self, schema: P4Schema) -> p4r.Entity:
1755        "Encode P4DirectCounterEntry as protobuf."
1756        if self.table_entry is None:
1757            # Use `counter_id` to construct a `P4TableEntry` with the proper
1758            # table name.
1759            if self.counter_id:
1760                tb_name = schema.direct_counters[self.counter_id].direct_table_name
1761                table_entry = P4TableEntry(tb_name)
1762            else:
1763                table_entry = P4TableEntry()
1764        else:
1765            table_entry = self.table_entry
1766
1767        if self.data is not None:
1768            data = self.data.encode()
1769        else:
1770            data = None
1771
1772        entry = p4r.DirectCounterEntry(
1773            table_entry=table_entry.encode_entry(schema),
1774            data=data,
1775        )
1776        return p4r.Entity(direct_counter_entry=entry)

Encode P4DirectCounterEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1778    @classmethod
1779    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1780        "Decode protobuf to P4DirectCounterEntry."
1781        entry = msg.direct_counter_entry
1782
1783        if entry.HasField("table_entry"):
1784            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1785        else:
1786            table_entry = None
1787
1788        if entry.HasField("data"):
1789            data = P4CounterData.decode(entry.data)
1790        else:
1791            data = None
1792
1793        # Determine `counter_id` from table_entry.
1794        counter_id = ""
1795        if table_entry is not None and table_entry.table_id:
1796            direct_counter = schema.tables[table_entry.table_id].direct_counter
1797            assert direct_counter is not None
1798            counter_id = direct_counter.alias
1799
1800        return cls(counter_id, table_entry=table_entry, data=data)

Decode protobuf to P4DirectCounterEntry.

@decodable('direct_meter_entry')
@dataclass(kw_only=True, slots=True)
class P4DirectMeterEntry(finsy.p4entity._P4ModifyOnly):
1593@decodable("direct_meter_entry")
1594@dataclass(kw_only=True, slots=True)
1595class P4DirectMeterEntry(_P4ModifyOnly):
1596    "Represents a P4Runtime DirectMeterEntry."
1597
1598    table_entry: P4TableEntry | None = None
1599    config: P4MeterConfig | None = None
1600    counter_data: P4MeterCounterData | None = None
1601
1602    def encode(self, schema: P4Schema) -> p4r.Entity:
1603        "Encode P4DirectMeterEntry as protobuf."
1604        if self.table_entry is not None:
1605            table_entry = self.table_entry.encode_entry(schema)
1606        else:
1607            table_entry = None
1608
1609        if self.config is not None:
1610            config = self.config.encode()
1611        else:
1612            config = None
1613
1614        if self.counter_data is not None:
1615            counter_data = self.counter_data.encode()
1616        else:
1617            counter_data = None
1618
1619        entry = p4r.DirectMeterEntry(
1620            table_entry=table_entry,
1621            config=config,
1622            counter_data=counter_data,
1623        )
1624        return p4r.Entity(direct_meter_entry=entry)
1625
1626    @classmethod
1627    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1628        "Decode protobuf to P4DirectMeterEntry."
1629        entry = msg.direct_meter_entry
1630
1631        if entry.HasField("table_entry"):
1632            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1633        else:
1634            table_entry = None
1635
1636        if entry.HasField("config"):
1637            config = P4MeterConfig.decode(entry.config)
1638        else:
1639            config = None
1640
1641        if entry.HasField("counter_data"):
1642            counter_data = P4MeterCounterData.decode(entry.counter_data)
1643        else:
1644            counter_data = None
1645
1646        return cls(
1647            table_entry=table_entry,
1648            config=config,
1649            counter_data=counter_data,
1650        )

Represents a P4Runtime DirectMeterEntry.

P4DirectMeterEntry( *, table_entry: P4TableEntry | None = None, config: P4MeterConfig | None = None, counter_data: P4MeterCounterData | None = None)
table_entry: P4TableEntry | None
config: P4MeterConfig | None
counter_data: P4MeterCounterData | None
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1602    def encode(self, schema: P4Schema) -> p4r.Entity:
1603        "Encode P4DirectMeterEntry as protobuf."
1604        if self.table_entry is not None:
1605            table_entry = self.table_entry.encode_entry(schema)
1606        else:
1607            table_entry = None
1608
1609        if self.config is not None:
1610            config = self.config.encode()
1611        else:
1612            config = None
1613
1614        if self.counter_data is not None:
1615            counter_data = self.counter_data.encode()
1616        else:
1617            counter_data = None
1618
1619        entry = p4r.DirectMeterEntry(
1620            table_entry=table_entry,
1621            config=config,
1622            counter_data=counter_data,
1623        )
1624        return p4r.Entity(direct_meter_entry=entry)

Encode P4DirectMeterEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1626    @classmethod
1627    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1628        "Decode protobuf to P4DirectMeterEntry."
1629        entry = msg.direct_meter_entry
1630
1631        if entry.HasField("table_entry"):
1632            table_entry = P4TableEntry.decode_entry(entry.table_entry, schema)
1633        else:
1634            table_entry = None
1635
1636        if entry.HasField("config"):
1637            config = P4MeterConfig.decode(entry.config)
1638        else:
1639            config = None
1640
1641        if entry.HasField("counter_data"):
1642            counter_data = P4MeterCounterData.decode(entry.counter_data)
1643        else:
1644            counter_data = None
1645
1646        return cls(
1647            table_entry=table_entry,
1648            config=config,
1649            counter_data=counter_data,
1650        )

Decode protobuf to P4DirectMeterEntry.

@dataclass
class P4Error:
58@dataclass
59class P4Error:
60    "P4Runtime Error message used to report a single P4-entity error."
61
62    canonical_code: GRPCStatusCode
63    message: str
64    space: str
65    code: int
66    subvalue: pbutil.PBMessage | None = None

P4Runtime Error message used to report a single P4-entity error.

P4Error( canonical_code: GRPCStatusCode, message: str, space: str, code: int, subvalue: google.protobuf.message.Message | None = None)
canonical_code: GRPCStatusCode
message: str
space: str
code: int
subvalue: google.protobuf.message.Message | None = None
@decodable('extern_entry')
@dataclass(kw_only=True, slots=True)
class P4ExternEntry(finsy.p4entity._P4Writable):
2060@decodable("extern_entry")
2061@dataclass(kw_only=True, slots=True)
2062class P4ExternEntry(_P4Writable):
2063    "Represents a P4Runtime ExternEntry."
2064
2065    extern_type_id: str
2066    extern_id: str
2067    entry: pbutil.PBAny
2068
2069    def encode(self, schema: P4Schema) -> p4r.Entity:
2070        "Encode ExternEntry data as protobuf."
2071        extern = schema.externs[self.extern_type_id, self.extern_id]
2072        entry = p4r.ExternEntry(
2073            extern_type_id=extern.extern_type_id,
2074            extern_id=extern.id,
2075            entry=self.entry,
2076        )
2077        return p4r.Entity(extern_entry=entry)
2078
2079    @classmethod
2080    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
2081        "Decode protobuf to ExternEntry data."
2082        entry = msg.extern_entry
2083        extern = schema.externs[entry.extern_type_id, entry.extern_id]
2084        return cls(
2085            extern_type_id=extern.extern_type_name,
2086            extern_id=extern.name,
2087            entry=entry.entry,
2088        )

Represents a P4Runtime ExternEntry.

P4ExternEntry( *, extern_type_id: str, extern_id: str, entry: google.protobuf.any_pb2.Any)
extern_type_id: str
extern_id: str
entry: google.protobuf.any_pb2.Any
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
2069    def encode(self, schema: P4Schema) -> p4r.Entity:
2070        "Encode ExternEntry data as protobuf."
2071        extern = schema.externs[self.extern_type_id, self.extern_id]
2072        entry = p4r.ExternEntry(
2073            extern_type_id=extern.extern_type_id,
2074            extern_id=extern.id,
2075            entry=self.entry,
2076        )
2077        return p4r.Entity(extern_entry=entry)

Encode ExternEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
2079    @classmethod
2080    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
2081        "Decode protobuf to ExternEntry data."
2082        entry = msg.extern_entry
2083        extern = schema.externs[entry.extern_type_id, entry.extern_id]
2084        return cls(
2085            extern_type_id=extern.extern_type_name,
2086            extern_id=extern.name,
2087            entry=entry.entry,
2088        )

Decode protobuf to ExternEntry data.

IndirectAction = <class 'P4IndirectAction'>

IndirectAction is an alias for P4IndirectAction.

@dataclass(slots=True)
class P4IndirectAction:
618@dataclass(slots=True)
619class P4IndirectAction:
620    """Represents a P4Runtime Action reference for an indirect table.
621
622    An indirect action can be either:
623
624    1. a "one-shot" action (action_set)
625    2. a reference to an action profile member (member_id)
626    3. a reference to an action profile group (group_id)
627
628    Only one of action_set, member_id or group_id may be configured. The other
629    values must be None.
630
631    Attributes:
632        action_set: sequence of weighted actions for one-shot
633        member_id: ID of action profile member
634        group_id: ID of action profile group
635
636    Examples:
637
638    ```python
639    # Construct a one-shot action profile.
640    one_shot = P4IndirectAction(
641        2 * P4TableAction("forward", port=1),
642        1 * P4TableAction("forward", port=2),
643    )
644
645    # Refer to an action profile member by ID.
646    member_action = P4IndirectAction(member_id=1)
647
648    # Refer to an action profile group by ID.
649    group_action = P4IndirectAction(group_id=2)
650    ```
651
652    References:
653        - "9.1.2. Action Specification",
654        - "9.2.3. One Shot Action Selector Programming"
655
656    TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and
657        P4GroupAction.
658
659    See Also:
660        - P4TableEntry
661    """
662
663    action_set: Sequence[P4WeightedAction] | None = None
664    "Sequence of weighted actions defining one-shot action profile."
665    _: KW_ONLY
666    member_id: int | None = None
667    "ID of action profile member."
668    group_id: int | None = None
669    "ID of action profile group."
670
671    def __post_init__(self) -> None:
672        if not self._check_invariant():
673            raise ValueError(
674                "exactly one of action_set, member_id, or group_id must be set"
675            )
676
677    def _check_invariant(self) -> bool:
678        "Return true if instance satisfies class invariant."
679        if self.action_set is not None:
680            return self.member_id is None and self.group_id is None
681        if self.member_id is not None:
682            return self.group_id is None
683        return self.group_id is not None
684
685    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
686        "Encode object as a TableAction."
687        if self.action_set is not None:
688            return p4r.TableAction(
689                action_profile_action_set=self.encode_action_set(table)
690            )
691
692        if self.member_id is not None:
693            return p4r.TableAction(action_profile_member_id=self.member_id)
694
695        assert self.group_id is not None
696        return p4r.TableAction(action_profile_group_id=self.group_id)
697
698    def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet:
699        "Encode object as an ActionProfileActionSet."
700        assert self.action_set is not None
701
702        profile_actions: list[p4r.ActionProfileAction] = []
703        for weight, table_action in self.action_set:
704            action = table_action.encode_action(table)
705
706            match weight:
707                case int(weight_value):
708                    watch_port = None
709                case (weight_value, int(watch)):
710                    watch_port = encode_watch_port(watch)
711                case _:  # pyright: ignore[reportUnnecessaryComparison]
712                    raise ValueError(f"unexpected action weight: {weight!r}")
713
714            profile = p4r.ActionProfileAction(action=action, weight=weight_value)
715            if watch_port is not None:
716                profile.watch_port = watch_port
717            profile_actions.append(profile)
718
719        return p4r.ActionProfileActionSet(action_profile_actions=profile_actions)
720
721    @classmethod
722    def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self:
723        "Decode ActionProfileActionSet."
724        action_set = list[P4WeightedAction]()
725
726        for action in msg.action_profile_actions:
727            match action.WhichOneof("watch_kind"):
728                case "watch_port":
729                    weight = (action.weight, decode_watch_port(action.watch_port))
730                case None:
731                    weight = action.weight
732                case other:
733                    # "watch" (deprecated) is not supported
734                    raise ValueError(f"unexpected oneof: {other!r}")
735
736            table_action = P4TableAction.decode_action(action.action, table)
737            action_set.append((weight, table_action))
738
739        return cls(action_set)
740
741    def format_str(self, table: P4Table) -> str:
742        """Format the indirect table action as a human-readable string."""
743        if self.action_set is not None:
744            weighted_actions = [
745                f"{weight}*{action.format_str(table)}"
746                for weight, action in self.action_set
747            ]
748            return " ".join(weighted_actions)
749
750        # Use the name of the action_profile, if we can get it. If not, just
751        # use the placeholder "__indirect".
752        if table.action_profile is not None:
753            profile_name = f"@{table.action_profile.alias}"
754        else:
755            profile_name = "__indirect"
756
757        if self.member_id is not None:
758            return f"{profile_name}[[{self.member_id:#x}]]"
759
760        return f"{profile_name}[{self.group_id:#x}]"
761
762    def __repr__(self) -> str:
763        "Customize representation to make it more concise."
764        if self.action_set is not None:
765            return f"P4IndirectAction(action_set={self.action_set!r})"
766        if self.member_id is not None:
767            return f"P4IndirectAction(member_id={self.member_id!r})"
768        return f"P4IndirectAction(group_id={self.group_id!r})"

Represents a P4Runtime Action reference for an indirect table.

An indirect action can be either:

  1. a "one-shot" action (action_set)
  2. a reference to an action profile member (member_id)
  3. a reference to an action profile group (group_id)

Only one of action_set, member_id or group_id may be configured. The other values must be None.

Attributes:
  • action_set: sequence of weighted actions for one-shot
  • member_id: ID of action profile member
  • group_id: ID of action profile group

Examples:

# Construct a one-shot action profile.
one_shot = P4IndirectAction(
    2 * P4TableAction("forward", port=1),
    1 * P4TableAction("forward", port=2),
)

# Refer to an action profile member by ID.
member_action = P4IndirectAction(member_id=1)

# Refer to an action profile group by ID.
group_action = P4IndirectAction(group_id=2)
References:
  • "9.1.2. Action Specification",
  • "9.2.3. One Shot Action Selector Programming"

TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and P4GroupAction.

See Also:
  • P4TableEntry
P4IndirectAction( action_set: Optional[Sequence[tuple[int | tuple[int, int], P4TableAction]]] = None, *, member_id: int | None = None, group_id: int | None = None)
action_set: Optional[Sequence[tuple[int | tuple[int, int], P4TableAction]]]

Sequence of weighted actions defining one-shot action profile.

member_id: int | None

ID of action profile member.

group_id: int | None

ID of action profile group.

def encode_table_action(self, table: finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.TableAction:
685    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
686        "Encode object as a TableAction."
687        if self.action_set is not None:
688            return p4r.TableAction(
689                action_profile_action_set=self.encode_action_set(table)
690            )
691
692        if self.member_id is not None:
693            return p4r.TableAction(action_profile_member_id=self.member_id)
694
695        assert self.group_id is not None
696        return p4r.TableAction(action_profile_group_id=self.group_id)

Encode object as a TableAction.

def encode_action_set( self, table: finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.ActionProfileActionSet:
698    def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet:
699        "Encode object as an ActionProfileActionSet."
700        assert self.action_set is not None
701
702        profile_actions: list[p4r.ActionProfileAction] = []
703        for weight, table_action in self.action_set:
704            action = table_action.encode_action(table)
705
706            match weight:
707                case int(weight_value):
708                    watch_port = None
709                case (weight_value, int(watch)):
710                    watch_port = encode_watch_port(watch)
711                case _:  # pyright: ignore[reportUnnecessaryComparison]
712                    raise ValueError(f"unexpected action weight: {weight!r}")
713
714            profile = p4r.ActionProfileAction(action=action, weight=weight_value)
715            if watch_port is not None:
716                profile.watch_port = watch_port
717            profile_actions.append(profile)
718
719        return p4r.ActionProfileActionSet(action_profile_actions=profile_actions)

Encode object as an ActionProfileActionSet.

@classmethod
def decode_action_set( cls, msg: p4.v1.p4runtime_pb2.ActionProfileActionSet, table: finsy.p4schema.P4Table) -> typing_extensions.Self:
721    @classmethod
722    def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self:
723        "Decode ActionProfileActionSet."
724        action_set = list[P4WeightedAction]()
725
726        for action in msg.action_profile_actions:
727            match action.WhichOneof("watch_kind"):
728                case "watch_port":
729                    weight = (action.weight, decode_watch_port(action.watch_port))
730                case None:
731                    weight = action.weight
732                case other:
733                    # "watch" (deprecated) is not supported
734                    raise ValueError(f"unexpected oneof: {other!r}")
735
736            table_action = P4TableAction.decode_action(action.action, table)
737            action_set.append((weight, table_action))
738
739        return cls(action_set)

Decode ActionProfileActionSet.

def format_str(self, table: finsy.p4schema.P4Table) -> str:
741    def format_str(self, table: P4Table) -> str:
742        """Format the indirect table action as a human-readable string."""
743        if self.action_set is not None:
744            weighted_actions = [
745                f"{weight}*{action.format_str(table)}"
746                for weight, action in self.action_set
747            ]
748            return " ".join(weighted_actions)
749
750        # Use the name of the action_profile, if we can get it. If not, just
751        # use the placeholder "__indirect".
752        if table.action_profile is not None:
753            profile_name = f"@{table.action_profile.alias}"
754        else:
755            profile_name = "__indirect"
756
757        if self.member_id is not None:
758            return f"{profile_name}[[{self.member_id:#x}]]"
759
760        return f"{profile_name}[{self.group_id:#x}]"

Format the indirect table action as a human-readable string.

@dataclass(slots=True)
class P4Member:
1412@dataclass(slots=True)
1413class P4Member:
1414    """Represents an ActionProfileGroup Member.
1415
1416    See Also:
1417        - P4ActionProfileGroup
1418    """
1419
1420    member_id: int
1421    _: KW_ONLY
1422    weight: P4Weight
1423
1424    def encode(self) -> p4r.ActionProfileGroup.Member:
1425        "Encode P4Member as protobuf."
1426        match self.weight:
1427            case int(weight):
1428                watch_port = None
1429            case (int(weight), int(watch)):
1430                watch_port = encode_watch_port(watch)
1431            case other:  # pyright: ignore[reportUnnecessaryComparison]
1432                raise ValueError(f"unexpected weight: {other!r}")
1433
1434        member = p4r.ActionProfileGroup.Member(
1435            member_id=self.member_id,
1436            weight=weight,
1437        )
1438
1439        if watch_port is not None:
1440            member.watch_port = watch_port
1441        return member
1442
1443    @classmethod
1444    def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self:
1445        "Decode protobuf to P4Member."
1446        match msg.WhichOneof("watch_kind"):
1447            case "watch_port":
1448                weight = (msg.weight, decode_watch_port(msg.watch_port))
1449            case None:
1450                weight = msg.weight
1451            case other:
1452                # "watch" (deprecated) is not supported
1453                raise ValueError(f"unknown oneof: {other!r}")
1454
1455        return cls(member_id=msg.member_id, weight=weight)

Represents an ActionProfileGroup Member.

See Also:
  • P4ActionProfileGroup
P4Member(member_id: int, *, weight: int | tuple[int, int])
member_id: int
weight: int | tuple[int, int]
def encode(self) -> p4.v1.p4runtime_pb2.Member:
1424    def encode(self) -> p4r.ActionProfileGroup.Member:
1425        "Encode P4Member as protobuf."
1426        match self.weight:
1427            case int(weight):
1428                watch_port = None
1429            case (int(weight), int(watch)):
1430                watch_port = encode_watch_port(watch)
1431            case other:  # pyright: ignore[reportUnnecessaryComparison]
1432                raise ValueError(f"unexpected weight: {other!r}")
1433
1434        member = p4r.ActionProfileGroup.Member(
1435            member_id=self.member_id,
1436            weight=weight,
1437        )
1438
1439        if watch_port is not None:
1440            member.watch_port = watch_port
1441        return member

Encode P4Member as protobuf.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.Member) -> typing_extensions.Self:
1443    @classmethod
1444    def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self:
1445        "Decode protobuf to P4Member."
1446        match msg.WhichOneof("watch_kind"):
1447            case "watch_port":
1448                weight = (msg.weight, decode_watch_port(msg.watch_port))
1449            case None:
1450                weight = msg.weight
1451            case other:
1452                # "watch" (deprecated) is not supported
1453                raise ValueError(f"unknown oneof: {other!r}")
1454
1455        return cls(member_id=msg.member_id, weight=weight)

Decode protobuf to P4Member.

@dataclass(kw_only=True, slots=True)
class P4MeterConfig:
771@dataclass(kw_only=True, slots=True)
772class P4MeterConfig:
773    """Represents a P4Runtime MeterConfig.
774
775    Attributes:
776        cir (int): Committed information rate (units/sec).
777        cburst (int): Committed burst size.
778        pir (int): Peak information rate (units/sec).
779        pburst (int): Peak burst size.
780        eburst (int): Excess burst size (only used by srTCM). [default=0]
781
782    Example:
783    ```
784    config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)
785    ```
786
787    See Also:
788        - P4TableEntry
789        - P4MeterEntry
790        - P4DirectMeterEntry
791    """
792
793    cir: int
794    "Committed information rate (units/sec)."
795    cburst: int
796    "Committed burst size."
797    pir: int
798    "Peak information rate (units/sec)."
799    pburst: int
800    "Peak burst size."
801    eburst: int = 0
802    "Excess burst size (only used by srTCM)."
803
804    def encode(self) -> p4r.MeterConfig:
805        "Encode object as MeterConfig."
806        return p4r.MeterConfig(
807            cir=self.cir,
808            cburst=self.cburst,
809            pir=self.pir,
810            pburst=self.pburst,
811            eburst=self.eburst,
812        )
813
814    @classmethod
815    def decode(cls, msg: p4r.MeterConfig) -> Self:
816        "Decode MeterConfig."
817        return cls(
818            cir=msg.cir,
819            cburst=msg.cburst,
820            pir=msg.pir,
821            pburst=msg.pburst,
822            eburst=msg.eburst,
823        )

Represents a P4Runtime MeterConfig.

Attributes:
  • cir (int): Committed information rate (units/sec).
  • cburst (int): Committed burst size.
  • pir (int): Peak information rate (units/sec).
  • pburst (int): Peak burst size.
  • eburst (int): Excess burst size (only used by srTCM). [default=0]

Example:

config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)
See Also:
  • P4TableEntry
  • P4MeterEntry
  • P4DirectMeterEntry
P4MeterConfig(*, cir: int, cburst: int, pir: int, pburst: int, eburst: int = 0)
cir: int

Committed information rate (units/sec).

cburst: int

Committed burst size.

pir: int

Peak information rate (units/sec).

pburst: int

Peak burst size.

eburst: int

Excess burst size (only used by srTCM).

def encode(self) -> p4.v1.p4runtime_pb2.MeterConfig:
804    def encode(self) -> p4r.MeterConfig:
805        "Encode object as MeterConfig."
806        return p4r.MeterConfig(
807            cir=self.cir,
808            cburst=self.cburst,
809            pir=self.pir,
810            pburst=self.pburst,
811            eburst=self.eburst,
812        )

Encode object as MeterConfig.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.MeterConfig) -> typing_extensions.Self:
814    @classmethod
815    def decode(cls, msg: p4r.MeterConfig) -> Self:
816        "Decode MeterConfig."
817        return cls(
818            cir=msg.cir,
819            cburst=msg.cburst,
820            pir=msg.pir,
821            pburst=msg.pburst,
822            eburst=msg.eburst,
823        )

Decode MeterConfig.

@dataclass(kw_only=True, slots=True)
class P4MeterCounterData:
858@dataclass(kw_only=True, slots=True)
859class P4MeterCounterData:
860    """Represents a P4Runtime MeterCounterData that stores per-color counters.
861
862    Attributes:
863        green (CounterData): counter data for packets marked GREEN.
864        yellow (CounterData): counter data for packets marked YELLOW.
865        red (CounterData): counter data for packets marked RED.
866
867    See Also:
868        - P4TableEntry
869        - P4MeterEntry
870        - P4DirectMeterEntry
871    """
872
873    green: P4CounterData
874    "Counter of packets marked GREEN."
875    yellow: P4CounterData
876    "Counter of packets marked YELLOW."
877    red: P4CounterData
878    "Counter of packets marked RED."
879
880    def encode(self) -> p4r.MeterCounterData:
881        "Encode object as MeterCounterData."
882        return p4r.MeterCounterData(
883            green=self.green.encode(),
884            yellow=self.yellow.encode(),
885            red=self.red.encode(),
886        )
887
888    @classmethod
889    def decode(cls, msg: p4r.MeterCounterData) -> Self:
890        "Decode MeterCounterData."
891        return cls(
892            green=P4CounterData.decode(msg.green),
893            yellow=P4CounterData.decode(msg.yellow),
894            red=P4CounterData.decode(msg.red),
895        )

Represents a P4Runtime MeterCounterData that stores per-color counters.

Attributes:
  • green (CounterData): counter data for packets marked GREEN.
  • yellow (CounterData): counter data for packets marked YELLOW.
  • red (CounterData): counter data for packets marked RED.
See Also:
  • P4TableEntry
  • P4MeterEntry
  • P4DirectMeterEntry
P4MeterCounterData( *, green: P4CounterData, yellow: P4CounterData, red: P4CounterData)
green: P4CounterData

Counter of packets marked GREEN.

yellow: P4CounterData

Counter of packets marked YELLOW.

Counter of packets marked RED.

def encode(self) -> p4.v1.p4runtime_pb2.MeterCounterData:
880    def encode(self) -> p4r.MeterCounterData:
881        "Encode object as MeterCounterData."
882        return p4r.MeterCounterData(
883            green=self.green.encode(),
884            yellow=self.yellow.encode(),
885            red=self.red.encode(),
886        )

Encode object as MeterCounterData.

@classmethod
def decode(cls, msg: p4.v1.p4runtime_pb2.MeterCounterData) -> typing_extensions.Self:
888    @classmethod
889    def decode(cls, msg: p4r.MeterCounterData) -> Self:
890        "Decode MeterCounterData."
891        return cls(
892            green=P4CounterData.decode(msg.green),
893            yellow=P4CounterData.decode(msg.yellow),
894            red=P4CounterData.decode(msg.red),
895        )

Decode MeterCounterData.

@decodable('meter_entry')
@dataclass(slots=True)
class P4MeterEntry(finsy.p4entity._P4ModifyOnly):
1520@decodable("meter_entry")
1521@dataclass(slots=True)
1522class P4MeterEntry(_P4ModifyOnly):
1523    "Represents a P4Runtime MeterEntry."
1524
1525    meter_id: str = ""
1526    _: KW_ONLY
1527    index: int | None = None
1528    config: P4MeterConfig | None = None
1529    counter_data: P4MeterCounterData | None = None
1530
1531    def encode(self, schema: P4Schema) -> p4r.Entity:
1532        "Encode P4MeterEntry to protobuf."
1533        if not self.meter_id:
1534            return p4r.Entity(meter_entry=p4r.MeterEntry())
1535
1536        meter = schema.meters[self.meter_id]
1537
1538        if self.index is not None:
1539            index = p4r.Index(index=self.index)
1540        else:
1541            index = None
1542
1543        if self.config is not None:
1544            config = self.config.encode()
1545        else:
1546            config = None
1547
1548        if self.counter_data is not None:
1549            counter_data = self.counter_data.encode()
1550        else:
1551            counter_data = None
1552
1553        entry = p4r.MeterEntry(
1554            meter_id=meter.id,
1555            index=index,
1556            config=config,
1557            counter_data=counter_data,
1558        )
1559        return p4r.Entity(meter_entry=entry)
1560
1561    @classmethod
1562    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1563        "Decode protobuf to P4MeterEntry."
1564        entry = msg.meter_entry
1565        if not entry.meter_id:
1566            return cls()
1567
1568        meter = schema.meters[entry.meter_id]
1569
1570        if entry.HasField("index"):
1571            index = entry.index.index
1572        else:
1573            index = None
1574
1575        if entry.HasField("config"):
1576            config = P4MeterConfig.decode(entry.config)
1577        else:
1578            config = None
1579
1580        if entry.HasField("counter_data"):
1581            counter_data = P4MeterCounterData.decode(entry.counter_data)
1582        else:
1583            counter_data = None
1584
1585        return cls(
1586            meter_id=meter.alias,
1587            index=index,
1588            config=config,
1589            counter_data=counter_data,
1590        )

Represents a P4Runtime MeterEntry.

P4MeterEntry( meter_id: str = '', *, index: int | None = None, config: P4MeterConfig | None = None, counter_data: P4MeterCounterData | None = None)
meter_id: str
index: int | None
config: P4MeterConfig | None
counter_data: P4MeterCounterData | None
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1531    def encode(self, schema: P4Schema) -> p4r.Entity:
1532        "Encode P4MeterEntry to protobuf."
1533        if not self.meter_id:
1534            return p4r.Entity(meter_entry=p4r.MeterEntry())
1535
1536        meter = schema.meters[self.meter_id]
1537
1538        if self.index is not None:
1539            index = p4r.Index(index=self.index)
1540        else:
1541            index = None
1542
1543        if self.config is not None:
1544            config = self.config.encode()
1545        else:
1546            config = None
1547
1548        if self.counter_data is not None:
1549            counter_data = self.counter_data.encode()
1550        else:
1551            counter_data = None
1552
1553        entry = p4r.MeterEntry(
1554            meter_id=meter.id,
1555            index=index,
1556            config=config,
1557            counter_data=counter_data,
1558        )
1559        return p4r.Entity(meter_entry=entry)

Encode P4MeterEntry to protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1561    @classmethod
1562    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1563        "Decode protobuf to P4MeterEntry."
1564        entry = msg.meter_entry
1565        if not entry.meter_id:
1566            return cls()
1567
1568        meter = schema.meters[entry.meter_id]
1569
1570        if entry.HasField("index"):
1571            index = entry.index.index
1572        else:
1573            index = None
1574
1575        if entry.HasField("config"):
1576            config = P4MeterConfig.decode(entry.config)
1577        else:
1578            config = None
1579
1580        if entry.HasField("counter_data"):
1581            counter_data = P4MeterCounterData.decode(entry.counter_data)
1582        else:
1583            counter_data = None
1584
1585        return cls(
1586            meter_id=meter.alias,
1587            index=index,
1588            config=config,
1589            counter_data=counter_data,
1590        )

Decode protobuf to P4MeterEntry.

@decodable('multicast_group_entry')
@dataclass(slots=True)
class P4MulticastGroupEntry(finsy.p4entity._P4Writable):
1229@decodable("multicast_group_entry")
1230@dataclass(slots=True)
1231class P4MulticastGroupEntry(_P4Writable):
1232    "Represents a P4Runtime MulticastGroupEntry."
1233
1234    multicast_group_id: int = 0
1235    _: KW_ONLY
1236    replicas: Sequence[_ReplicaType] = ()
1237    metadata: bytes = b""
1238
1239    def encode(self, schema: P4Schema) -> p4r.Entity:
1240        "Encode MulticastGroupEntry data as protobuf."
1241        entry = p4r.MulticastGroupEntry(
1242            multicast_group_id=self.multicast_group_id,
1243            replicas=[encode_replica(replica) for replica in self.replicas],
1244            metadata=self.metadata,
1245        )
1246        return p4r.Entity(
1247            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1248                multicast_group_entry=entry
1249            )
1250        )
1251
1252    @classmethod
1253    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1254        "Decode protobuf to MulticastGroupEntry data."
1255        entry = msg.packet_replication_engine_entry.multicast_group_entry
1256        return cls(
1257            multicast_group_id=entry.multicast_group_id,
1258            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1259            metadata=entry.metadata,
1260        )
1261
1262    def replicas_str(self) -> str:
1263        "Format the replicas as a human-readable, canonical string."
1264        return " ".join(format_replica(rep) for rep in self.replicas)

Represents a P4Runtime MulticastGroupEntry.

P4MulticastGroupEntry( multicast_group_id: int = 0, *, replicas: Sequence[tuple[int, int] | int] = (), metadata: bytes = b'')
multicast_group_id: int
replicas: Sequence[tuple[int, int] | int]
metadata: bytes
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1239    def encode(self, schema: P4Schema) -> p4r.Entity:
1240        "Encode MulticastGroupEntry data as protobuf."
1241        entry = p4r.MulticastGroupEntry(
1242            multicast_group_id=self.multicast_group_id,
1243            replicas=[encode_replica(replica) for replica in self.replicas],
1244            metadata=self.metadata,
1245        )
1246        return p4r.Entity(
1247            packet_replication_engine_entry=p4r.PacketReplicationEngineEntry(
1248                multicast_group_entry=entry
1249            )
1250        )

Encode MulticastGroupEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1252    @classmethod
1253    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1254        "Decode protobuf to MulticastGroupEntry data."
1255        entry = msg.packet_replication_engine_entry.multicast_group_entry
1256        return cls(
1257            multicast_group_id=entry.multicast_group_id,
1258            replicas=tuple(decode_replica(replica) for replica in entry.replicas),
1259            metadata=entry.metadata,
1260        )

Decode protobuf to MulticastGroupEntry data.

def replicas_str(self) -> str:
1262    def replicas_str(self) -> str:
1263        "Format the replicas as a human-readable, canonical string."
1264        return " ".join(format_replica(rep) for rep in self.replicas)

Format the replicas as a human-readable, canonical string.

@decodable('packet')
@dataclass(slots=True)
class P4PacketIn:
1897@decodable("packet")
1898@dataclass(slots=True)
1899class P4PacketIn:
1900    "Represents a P4Runtime PacketIn."
1901
1902    payload: bytes
1903    _: KW_ONLY
1904    metadata: _MetadataDictType
1905
1906    @classmethod
1907    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1908        "Decode protobuf to PacketIn data."
1909        packet = msg.packet
1910        cpm = schema.controller_packet_metadata.get("packet_in")
1911        if cpm is None:
1912            # There is no controller metadata. Warn if message has any.
1913            pkt_meta = packet.metadata
1914            if pkt_meta:
1915                LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta)
1916            return cls(packet.payload, metadata={})
1917
1918        return cls(
1919            packet.payload,
1920            metadata=cpm.decode(packet.metadata),
1921        )
1922
1923    def __getitem__(self, key: str) -> Any:
1924        "Retrieve metadata value."
1925        return self.metadata[key]
1926
1927    def __repr__(self) -> str:
1928        "Return friendlier hexadecimal description of packet."
1929        if self.metadata:
1930            return f"P4PacketIn(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')"
1931        return f"P4PacketIn(payload=h'{self.payload.hex()}')"

Represents a P4Runtime PacketIn.

P4PacketIn(payload: bytes, *, metadata: dict[str, typing.Any])
payload: bytes
metadata: dict[str, typing.Any]
@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.StreamMessageResponse, schema: P4Schema) -> typing_extensions.Self:
1906    @classmethod
1907    def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self:
1908        "Decode protobuf to PacketIn data."
1909        packet = msg.packet
1910        cpm = schema.controller_packet_metadata.get("packet_in")
1911        if cpm is None:
1912            # There is no controller metadata. Warn if message has any.
1913            pkt_meta = packet.metadata
1914            if pkt_meta:
1915                LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta)
1916            return cls(packet.payload, metadata={})
1917
1918        return cls(
1919            packet.payload,
1920            metadata=cpm.decode(packet.metadata),
1921        )

Decode protobuf to PacketIn data.

def __getitem__(self, key: str) -> Any:
1923    def __getitem__(self, key: str) -> Any:
1924        "Retrieve metadata value."
1925        return self.metadata[key]

Retrieve metadata value.

@dataclass(slots=True)
class P4PacketOut:
1934@dataclass(slots=True)
1935class P4PacketOut:
1936    "Represents a P4Runtime PacketOut."
1937
1938    payload: bytes
1939    _: KW_ONLY
1940    metadata: _MetadataDictType
1941
1942    def __init__(self, __payload: bytes, /, **metadata: Any):
1943        self.payload = __payload
1944        self.metadata = metadata
1945
1946    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
1947        "Encode PacketOut data as protobuf."
1948        cpm = schema.controller_packet_metadata["packet_out"]
1949        return p4r.StreamMessageRequest(
1950            packet=p4r.PacketOut(
1951                payload=self.payload,
1952                metadata=cpm.encode(self.metadata),
1953            )
1954        )
1955
1956    def __getitem__(self, key: str) -> Any:
1957        "Retrieve metadata value."
1958        return self.metadata[key]
1959
1960    def __repr__(self) -> str:
1961        "Return friendlier hexadecimal description of packet."
1962        if self.metadata:
1963            return f"P4PacketOut(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')"
1964        return f"P4PacketOut(payload=h'{self.payload.hex()}')"

Represents a P4Runtime PacketOut.

P4PacketOut(_P4PacketOut__payload: bytes, /, **metadata: Any)
1942    def __init__(self, __payload: bytes, /, **metadata: Any):
1943        self.payload = __payload
1944        self.metadata = metadata
payload: bytes
metadata: dict[str, typing.Any]
def encode_update( self, schema: P4Schema) -> p4.v1.p4runtime_pb2.StreamMessageRequest:
1946    def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest:
1947        "Encode PacketOut data as protobuf."
1948        cpm = schema.controller_packet_metadata["packet_out"]
1949        return p4r.StreamMessageRequest(
1950            packet=p4r.PacketOut(
1951                payload=self.payload,
1952                metadata=cpm.encode(self.metadata),
1953            )
1954        )

Encode PacketOut data as protobuf.

def __getitem__(self, key: str) -> Any:
1956    def __getitem__(self, key: str) -> Any:
1957        "Retrieve metadata value."
1958        return self.metadata[key]

Retrieve metadata value.

@decodable('register_entry')
@dataclass(slots=True)
class P4RegisterEntry(finsy.p4entity._P4ModifyOnly):
1169@decodable("register_entry")
1170@dataclass(slots=True)
1171class P4RegisterEntry(_P4ModifyOnly):
1172    "Represents a P4Runtime RegisterEntry."
1173
1174    register_id: str = ""
1175    _: KW_ONLY
1176    index: int | None = None
1177    data: _DataValueType | None = None
1178
1179    def encode(self, schema: P4Schema) -> p4r.Entity:
1180        "Encode RegisterEntry data as protobuf."
1181        if not self.register_id:
1182            return p4r.Entity(register_entry=p4r.RegisterEntry())
1183
1184        register = schema.registers[self.register_id]
1185
1186        if self.index is not None:
1187            index = p4r.Index(index=self.index)
1188        else:
1189            index = None
1190
1191        if self.data is not None:
1192            data = register.type_spec.encode_data(self.data)
1193        else:
1194            data = None
1195
1196        entry = p4r.RegisterEntry(
1197            register_id=register.id,
1198            index=index,
1199            data=data,
1200        )
1201        return p4r.Entity(register_entry=entry)
1202
1203    @classmethod
1204    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1205        "Decode protobuf to RegisterEntry data."
1206        entry = msg.register_entry
1207        if entry.register_id == 0:
1208            return cls()
1209
1210        register = schema.registers[entry.register_id]
1211
1212        if entry.HasField("index"):
1213            index = entry.index.index
1214        else:
1215            index = None
1216
1217        if entry.HasField("data"):
1218            data = register.type_spec.decode_data(entry.data)
1219        else:
1220            data = None
1221
1222        return cls(
1223            register.alias,
1224            index=index,
1225            data=data,
1226        )

Represents a P4Runtime RegisterEntry.

P4RegisterEntry( register_id: str = '', *, index: int | None = None, data: Optional[Any] = None)
register_id: str
index: int | None
data: Optional[Any]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1179    def encode(self, schema: P4Schema) -> p4r.Entity:
1180        "Encode RegisterEntry data as protobuf."
1181        if not self.register_id:
1182            return p4r.Entity(register_entry=p4r.RegisterEntry())
1183
1184        register = schema.registers[self.register_id]
1185
1186        if self.index is not None:
1187            index = p4r.Index(index=self.index)
1188        else:
1189            index = None
1190
1191        if self.data is not None:
1192            data = register.type_spec.encode_data(self.data)
1193        else:
1194            data = None
1195
1196        entry = p4r.RegisterEntry(
1197            register_id=register.id,
1198            index=index,
1199            data=data,
1200        )
1201        return p4r.Entity(register_entry=entry)

Encode RegisterEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1203    @classmethod
1204    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1205        "Decode protobuf to RegisterEntry data."
1206        entry = msg.register_entry
1207        if entry.register_id == 0:
1208            return cls()
1209
1210        register = schema.registers[entry.register_id]
1211
1212        if entry.HasField("index"):
1213            index = entry.index.index
1214        else:
1215            index = None
1216
1217        if entry.HasField("data"):
1218            data = register.type_spec.decode_data(entry.data)
1219        else:
1220            data = None
1221
1222        return cls(
1223            register.alias,
1224            index=index,
1225            data=data,
1226        )

Decode protobuf to RegisterEntry data.

Action = <class 'P4TableAction'>

Action is an alias for P4TableAction.

@dataclass(init=False, slots=True)
class P4TableAction:
439@dataclass(init=False, slots=True)
440class P4TableAction:
441    """Represents a P4Runtime Action reference for a direct table.
442
443    Attributes:
444        name (str): the name of the action.
445        args (dict[str, Any]): the action's arguments as a dictionary.
446
447    Example:
448        If the name of the action is "ipv4_forward" and it takes a single
449        "port" parameter, you can construct the action as:
450
451        ```
452        action = P4TableAction("ipv4_forward", port=1)
453        ```
454
455    Reference "9.1.2 Action Specification":
456        The Action Protobuf has fields: (action_id, params). Finsy translates
457        `name` to the appropriate `action_id` as determined by P4Info. It also
458        translates each named argument in `args` to the appropriate `param_id`.
459
460    See Also:
461        To specify an action for an indirect table, use `P4IndirectAction`.
462        Note that P4TableAction will automatically be promoted to an "indirect"
463        action if needed.
464
465    Operators:
466        A `P4TableAction` supports the multiplication operator (*) for
467        constructing "weighted actions". A weighted action is used in specifying
468        indirect actions. Here is an action with a weight of 3:
469
470        ```
471        weighted_action = 3 * P4TableAction("ipv4_forward", port=1)
472        ```
473
474        To specify a weight with a `watch_port`, use a tuple `(weight, port)`.
475        The weight is always a positive integer.
476
477    See Also:
478        - P4TableEntry
479    """
480
481    name: str
482    "The name of the action."
483    args: dict[str, Any]
484    "The action's arguments as a dictionary."
485
486    def __init__(self, __name: str, /, **args: Any):
487        self.name = __name
488        self.args = args
489
490    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
491        """Encode TableAction data as protobuf.
492
493        If the table is indirect, promote the action to a "one-shot" indirect
494        action.
495        """
496        try:
497            action = table.actions[self.name]
498        except Exception as ex:
499            raise ValueError(f"{table.name!r}: {ex}") from ex
500
501        action_p4 = self._encode_action(action)
502
503        if table.action_profile is not None:
504            # Promote action to ActionProfileActionSet entry with weight=1.
505            return p4r.TableAction(
506                action_profile_action_set=p4r.ActionProfileActionSet(
507                    action_profile_actions=[
508                        p4r.ActionProfileAction(action=action_p4, weight=1)
509                    ]
510                )
511            )
512
513        return p4r.TableAction(action=action_p4)
514
515    def _fail_missing_params(self, action: P4ActionRef | P4Action) -> NoReturn:
516        "Report missing parameters."
517        seen = {param.name for param in action.params}
518        for name in self.args:
519            param = action.params[name]
520            seen.remove(param.name)
521
522        raise ValueError(f"Action {action.alias!r}: missing parameters {seen}")
523
524    def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action:
525        "Encode Action data as protobuf."
526        action = schema.actions[self.name]
527        return self._encode_action(action)
528
529    def _encode_action(self, action: P4ActionRef | P4Action) -> p4r.Action:
530        "Helper to encode an action."
531        aps = action.params
532        try:
533            params = [
534                aps[name].encode_param(value) for name, value in self.args.items()
535            ]
536        except ValueError as ex:
537            raise ValueError(f"{action.alias!r}: {ex}") from ex
538
539        # Check for missing action parameters. We always accept an action with
540        # no parameters (for wildcard ReadRequests).
541        param_count = len(params)
542        if param_count > 0 and param_count != len(aps):
543            self._fail_missing_params(action)
544
545        return p4r.Action(action_id=action.id, params=params)
546
547    @classmethod
548    def decode_table_action(
549        cls, msg: p4r.TableAction, table: P4Table
550    ) -> Self | "P4IndirectAction":
551        "Decode protobuf to TableAction data."
552        match msg.WhichOneof("type"):
553            case "action":
554                return cls.decode_action(msg.action, table)
555            case "action_profile_member_id":
556                return P4IndirectAction(member_id=msg.action_profile_member_id)
557            case "action_profile_group_id":
558                return P4IndirectAction(group_id=msg.action_profile_group_id)
559            case "action_profile_action_set":
560                return P4IndirectAction.decode_action_set(
561                    msg.action_profile_action_set, table
562                )
563            case other:
564                raise ValueError(f"unknown oneof: {other!r}")
565
566    @classmethod
567    def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self:
568        "Decode protobuf to Action data."
569        action = parent.actions[msg.action_id]
570        args = {}
571        for param in msg.params:
572            action_param = action.params[param.param_id]
573            value = action_param.decode_param(param)
574            args[action_param.name] = value
575
576        return cls(action.alias, **args)
577
578    def format_str(self, schema: P4Schema | P4Table) -> str:
579        """Format the table action as a human-readable string.
580
581        The result is formatted to look like a function call:
582
583        ```
584        name(param1=value1, ...)
585        ```
586
587        Where `name` is the action name, and `(param<N>, value<N>)` are the
588        action parameters. The format of `value<N>` is schema-dependent.
589        """
590        aps = schema.actions[self.name].params
591        args = [
592            f"{key}={aps[key].format_param(value)}" for key, value in self.args.items()
593        ]
594
595        return f"{self.name}({', '.join(args)})"
596
597    def __mul__(self, weight: P4Weight) -> P4WeightedAction:
598        "Make a weighted action."
599        if not isinstance(
600            weight, (int, tuple)
601        ):  # pyright: ignore[reportUnnecessaryIsInstance]
602            raise NotImplementedError("expected P4Weight")
603        return (weight, self)
604
605    def __rmul__(self, weight: P4Weight) -> P4WeightedAction:
606        "Make a weighted action."
607        if not isinstance(
608            weight, (int, tuple)
609        ):  # pyright: ignore[reportUnnecessaryIsInstance]
610            raise NotImplementedError("expected P4Weight")
611        return (weight, self)
612
613    def __call__(self, **params: Any) -> Self:
614        "Return a new action with the updated parameters."
615        return self.__class__(self.name, **(self.args | params))

Represents a P4Runtime Action reference for a direct table.

Attributes:
  • name (str): the name of the action.
  • args (dict[str, Any]): the action's arguments as a dictionary.
Example:

If the name of the action is "ipv4_forward" and it takes a single "port" parameter, you can construct the action as:

action = P4TableAction("ipv4_forward", port=1)

Reference "9.1.2 Action Specification": The Action Protobuf has fields: (action_id, params). Finsy translates name to the appropriate action_id as determined by P4Info. It also translates each named argument in args to the appropriate param_id.

See Also:

To specify an action for an indirect table, use P4IndirectAction. Note that P4TableAction will automatically be promoted to an "indirect" action if needed.

Operators:

A P4TableAction supports the multiplication operator (*) for constructing "weighted actions". A weighted action is used in specifying indirect actions. Here is an action with a weight of 3:

weighted_action = 3 * P4TableAction("ipv4_forward", port=1)

To specify a weight with a watch_port, use a tuple (weight, port). The weight is always a positive integer.

See Also:
  • P4TableEntry
P4TableAction(_P4TableAction__name: str, /, **args: Any)
486    def __init__(self, __name: str, /, **args: Any):
487        self.name = __name
488        self.args = args
name: str

The name of the action.

args: dict[str, typing.Any]

The action's arguments as a dictionary.

def encode_table_action(self, table: finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.TableAction:
490    def encode_table_action(self, table: P4Table) -> p4r.TableAction:
491        """Encode TableAction data as protobuf.
492
493        If the table is indirect, promote the action to a "one-shot" indirect
494        action.
495        """
496        try:
497            action = table.actions[self.name]
498        except Exception as ex:
499            raise ValueError(f"{table.name!r}: {ex}") from ex
500
501        action_p4 = self._encode_action(action)
502
503        if table.action_profile is not None:
504            # Promote action to ActionProfileActionSet entry with weight=1.
505            return p4r.TableAction(
506                action_profile_action_set=p4r.ActionProfileActionSet(
507                    action_profile_actions=[
508                        p4r.ActionProfileAction(action=action_p4, weight=1)
509                    ]
510                )
511            )
512
513        return p4r.TableAction(action=action_p4)

Encode TableAction data as protobuf.

If the table is indirect, promote the action to a "one-shot" indirect action.

def encode_action( self, schema: P4Schema | finsy.p4schema.P4Table) -> p4.v1.p4runtime_pb2.Action:
524    def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action:
525        "Encode Action data as protobuf."
526        action = schema.actions[self.name]
527        return self._encode_action(action)

Encode Action data as protobuf.

@classmethod
def decode_table_action( cls, msg: p4.v1.p4runtime_pb2.TableAction, table: finsy.p4schema.P4Table) -> Union[typing_extensions.Self, P4IndirectAction]:
547    @classmethod
548    def decode_table_action(
549        cls, msg: p4r.TableAction, table: P4Table
550    ) -> Self | "P4IndirectAction":
551        "Decode protobuf to TableAction data."
552        match msg.WhichOneof("type"):
553            case "action":
554                return cls.decode_action(msg.action, table)
555            case "action_profile_member_id":
556                return P4IndirectAction(member_id=msg.action_profile_member_id)
557            case "action_profile_group_id":
558                return P4IndirectAction(group_id=msg.action_profile_group_id)
559            case "action_profile_action_set":
560                return P4IndirectAction.decode_action_set(
561                    msg.action_profile_action_set, table
562                )
563            case other:
564                raise ValueError(f"unknown oneof: {other!r}")

Decode protobuf to TableAction data.

@classmethod
def decode_action( cls, msg: p4.v1.p4runtime_pb2.Action, parent: P4Schema | finsy.p4schema.P4Table) -> typing_extensions.Self:
566    @classmethod
567    def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self:
568        "Decode protobuf to Action data."
569        action = parent.actions[msg.action_id]
570        args = {}
571        for param in msg.params:
572            action_param = action.params[param.param_id]
573            value = action_param.decode_param(param)
574            args[action_param.name] = value
575
576        return cls(action.alias, **args)

Decode protobuf to Action data.

def format_str(self, schema: P4Schema | finsy.p4schema.P4Table) -> str:
578    def format_str(self, schema: P4Schema | P4Table) -> str:
579        """Format the table action as a human-readable string.
580
581        The result is formatted to look like a function call:
582
583        ```
584        name(param1=value1, ...)
585        ```
586
587        Where `name` is the action name, and `(param<N>, value<N>)` are the
588        action parameters. The format of `value<N>` is schema-dependent.
589        """
590        aps = schema.actions[self.name].params
591        args = [
592            f"{key}={aps[key].format_param(value)}" for key, value in self.args.items()
593        ]
594
595        return f"{self.name}({', '.join(args)})"

Format the table action as a human-readable string.

The result is formatted to look like a function call:

name(param1=value1, ...)

Where name is the action name, and (param<N>, value<N>) are the action parameters. The format of value<N> is schema-dependent.

def __call__(self, **params: Any) -> typing_extensions.Self:
613    def __call__(self, **params: Any) -> Self:
614        "Return a new action with the updated parameters."
615        return self.__class__(self.name, **(self.args | params))

Return a new action with the updated parameters.

@decodable('table_entry')
@dataclass(slots=True)
class P4TableEntry(finsy.p4entity._P4Writable):
 898@decodable("table_entry")
 899@dataclass(slots=True)
 900class P4TableEntry(_P4Writable):
 901    """Represents a P4Runtime table entry.
 902
 903    Attributes:
 904        table_id (str): Name of the table.
 905        match (P4TableMatch | None): Entry's match fields.
 906        action (P4TableAction | P4IndirectAction | None): Entry's action.
 907        is_default_action (bool): True if entry is the default table entry.
 908        priority (int): Priority of a table entry when match implies TCAM lookup.
 909        metadata (bytes): Arbitrary controller cookie (1.2.0).
 910        controller_metadata (int): Deprecated controller cookie (< 1.2.0).
 911        meter_config (P4MeterConfig | None): Meter configuration.
 912        counter_data (P4CounterData | None): Counter data for table entry.
 913        meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0).
 914        idle_timeout_ns (int): Idle timeout in nanoseconds.
 915        time_since_last_hit (int | None): Nanoseconds since entry last matched.
 916        is_const (bool): True if entry is constant (1.4.0).
 917
 918    The most commonly used fields are table_id, match, action, is_default_action,
 919    and priority. See the P4Runtime Spec for usage examples regarding the other
 920    attributes.
 921
 922    When writing a P4TableEntry, you can specify the type of update using '+',
 923    '-', and '~'.
 924
 925    Examples:
 926    ```
 927    # Specify all tables when using "read".
 928    entry = fy.P4TableEntry()
 929
 930    # Specify the table named "ipv4" when using "read".
 931    entry = fy.P4TableEntry("ipv4")
 932
 933    # Specify the default entry in the "ipv4" table when using "read".
 934    entry = fy.P4TableEntry("ipv4", is_default_action=True)
 935
 936    # Insert an entry into the "ipv4" table.
 937    update = +fy.P4TableEntry(
 938        "ipv4",
 939        match=fy.Match(ipv4_dst="10.0.0.0/8"),
 940        action=fy.Action("forward", port=1),
 941    )
 942
 943    # Modify the default action in the "ipv4" table.
 944    update = ~fy.P4TableEntry(
 945        "ipv4",
 946        action=fy.Action("forward", port=5),
 947        is_default_action=True
 948    )
 949    ```
 950
 951    Operators:
 952        You can retrieve a match field from a table entry using `[]`. For
 953        example, `entry["ipv4_dst"]` is the same as `entry.match["ipv4_dst"]`.
 954
 955    Formatting Helpers:
 956        The `match_str` and `action_str` methods provide P4Info-aware formatting
 957        of the match and action attributes.
 958    """
 959
 960    table_id: str = ""
 961    "Name of the table."
 962    _: KW_ONLY
 963    match: P4TableMatch | None = None
 964    "Entry's match fields."
 965    action: P4TableAction | P4IndirectAction | None = None
 966    "Entry's action."
 967    is_default_action: bool = False
 968    "True if entry is the default table entry."
 969    priority: int = 0
 970    "Priority of a table entry when match implies TCAM lookup."
 971    metadata: bytes = b""
 972    "Arbitrary controller cookie. (1.2.0)."
 973    controller_metadata: int = 0
 974    "Deprecated controller cookie (< 1.2.0)."
 975    meter_config: P4MeterConfig | None = None
 976    "Meter configuration."
 977    counter_data: P4CounterData | None = None
 978    "Counter data for table entry."
 979    meter_counter_data: P4MeterCounterData | None = None
 980    "Meter counter data (1.4.0)."
 981    idle_timeout_ns: int = 0
 982    "Idle timeout in nanoseconds."
 983    time_since_last_hit: int | None = None
 984    "Nanoseconds since entry last matched."
 985    is_const: bool = False
 986    "True if entry is constant (1.4.0)."
 987
 988    def __getitem__(self, key: str) -> Any:
 989        "Convenience accessor to retrieve a value from the `match` property."
 990        if self.match is not None:
 991            return self.match[key]
 992        raise KeyError(key)
 993
 994    def encode(self, schema: P4Schema) -> p4r.Entity:
 995        "Encode TableEntry data as protobuf."
 996        return p4r.Entity(table_entry=self.encode_entry(schema))
 997
 998    def encode_entry(self, schema: P4Schema) -> p4r.TableEntry:
 999        "Encode TableEntry data as protobuf."
1000        if not self.table_id:
1001            return self._encode_empty()
1002
1003        table = schema.tables[self.table_id]
1004
1005        if self.match:
1006            match = self.match.encode(table)
1007        else:
1008            match = None
1009
1010        if self.action:
1011            action = self.action.encode_table_action(table)
1012        else:
1013            action = None
1014
1015        if self.meter_config:
1016            meter_config = self.meter_config.encode()
1017        else:
1018            meter_config = None
1019
1020        if self.counter_data:
1021            counter_data = self.counter_data.encode()
1022        else:
1023            counter_data = None
1024
1025        if self.meter_counter_data:
1026            meter_counter_data = self.meter_counter_data.encode()
1027        else:
1028            meter_counter_data = None
1029
1030        if self.time_since_last_hit is not None:
1031            time_since_last_hit = p4r.TableEntry.IdleTimeout(
1032                elapsed_ns=self.time_since_last_hit
1033            )
1034        else:
1035            time_since_last_hit = None
1036
1037        return p4r.TableEntry(
1038            table_id=table.id,
1039            match=match,
1040            action=action,
1041            priority=self.priority,
1042            controller_metadata=self.controller_metadata,
1043            meter_config=meter_config,
1044            counter_data=counter_data,
1045            meter_counter_data=meter_counter_data,
1046            is_default_action=self.is_default_action,
1047            idle_timeout_ns=self.idle_timeout_ns,
1048            time_since_last_hit=time_since_last_hit,
1049            metadata=self.metadata,
1050            is_const=self.is_const,
1051        )
1052
1053    def _encode_empty(self) -> p4r.TableEntry:
1054        "Encode an empty wildcard request."
1055        if self.counter_data is not None:
1056            counter_data = self.counter_data.encode()
1057        else:
1058            counter_data = None
1059
1060        # FIXME: time_since_last_hit not supported for wildcard reads?
1061        if self.time_since_last_hit is not None:
1062            time_since_last_hit = p4r.TableEntry.IdleTimeout(
1063                elapsed_ns=self.time_since_last_hit
1064            )
1065        else:
1066            time_since_last_hit = None
1067
1068        return p4r.TableEntry(
1069            counter_data=counter_data,
1070            time_since_last_hit=time_since_last_hit,
1071        )
1072
1073    @classmethod
1074    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1075        "Decode protobuf to TableEntry data."
1076        return cls.decode_entry(msg.table_entry, schema)
1077
1078    @classmethod
1079    def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self:
1080        "Decode protobuf to TableEntry data."
1081        if entry.table_id == 0:
1082            return cls("")
1083
1084        table = schema.tables[entry.table_id]
1085
1086        if entry.match:
1087            match = P4TableMatch.decode(entry.match, table)
1088        else:
1089            match = None
1090
1091        if entry.HasField("action"):
1092            action = P4TableAction.decode_table_action(entry.action, table)
1093        else:
1094            action = None
1095
1096        if entry.HasField("time_since_last_hit"):
1097            last_hit = entry.time_since_last_hit.elapsed_ns
1098        else:
1099            last_hit = None
1100
1101        if entry.HasField("meter_config"):
1102            meter_config = P4MeterConfig.decode(entry.meter_config)
1103        else:
1104            meter_config = None
1105
1106        if entry.HasField("counter_data"):
1107            counter_data = P4CounterData.decode(entry.counter_data)
1108        else:
1109            counter_data = None
1110
1111        if entry.HasField("meter_counter_data"):
1112            meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data)
1113        else:
1114            meter_counter_data = None
1115
1116        return cls(
1117            table_id=table.alias,
1118            match=match,
1119            action=action,
1120            priority=entry.priority,
1121            controller_metadata=entry.controller_metadata,
1122            meter_config=meter_config,
1123            counter_data=counter_data,
1124            meter_counter_data=meter_counter_data,
1125            is_default_action=entry.is_default_action,
1126            idle_timeout_ns=entry.idle_timeout_ns,
1127            time_since_last_hit=last_hit,
1128            metadata=entry.metadata,
1129            is_const=entry.is_const,
1130        )
1131
1132    def match_dict(
1133        self,
1134        schema: P4Schema,
1135        *,
1136        wildcard: str | None = None,
1137    ) -> dict[str, str]:
1138        """Format the match fields as a dictionary of strings.
1139
1140        If `wildcard` is None, only include match fields that have values. If
1141        `wildcard` is set, include all field names but replace unset values with
1142        given wildcard value (e.g. "*")
1143        """
1144        table = schema.tables[self.table_id]
1145        if self.match is not None:
1146            return self.match.format_dict(table, wildcard=wildcard)
1147        return P4TableMatch().format_dict(table, wildcard=wildcard)
1148
1149    def match_str(
1150        self,
1151        schema: P4Schema,
1152        *,
1153        wildcard: str | None = None,
1154    ) -> str:
1155        "Format the match fields as a human-readable, canonical string."
1156        table = schema.tables[self.table_id]
1157        if self.match is not None:
1158            return self.match.format_str(table, wildcard=wildcard)
1159        return P4TableMatch().format_str(table, wildcard=wildcard)
1160
1161    def action_str(self, schema: P4Schema) -> str:
1162        "Format the actions as a human-readable, canonical string."
1163        table = schema.tables[self.table_id]
1164        if self.action is None:
1165            return NOACTION_STR
1166        return self.action.format_str(table)

Represents a P4Runtime table entry.

Attributes:
  • table_id (str): Name of the table.
  • match (P4TableMatch | None): Entry's match fields.
  • action (P4TableAction | P4IndirectAction | None): Entry's action.
  • is_default_action (bool): True if entry is the default table entry.
  • priority (int): Priority of a table entry when match implies TCAM lookup.
  • metadata (bytes): Arbitrary controller cookie (1.2.0).
  • controller_metadata (int): Deprecated controller cookie (< 1.2.0).
  • meter_config (P4MeterConfig | None): Meter configuration.
  • counter_data (P4CounterData | None): Counter data for table entry.
  • meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0).
  • idle_timeout_ns (int): Idle timeout in nanoseconds.
  • time_since_last_hit (int | None): Nanoseconds since entry last matched.
  • is_const (bool): True if entry is constant (1.4.0).

The most commonly used fields are table_id, match, action, is_default_action, and priority. See the P4Runtime Spec for usage examples regarding the other attributes.

When writing a P4TableEntry, you can specify the type of update using '+', '-', and '~'.

Examples:

# Specify all tables when using "read".
entry = fy.P4TableEntry()

# Specify the table named "ipv4" when using "read".
entry = fy.P4TableEntry("ipv4")

# Specify the default entry in the "ipv4" table when using "read".
entry = fy.P4TableEntry("ipv4", is_default_action=True)

# Insert an entry into the "ipv4" table.
update = +fy.P4TableEntry(
    "ipv4",
    match=fy.Match(ipv4_dst="10.0.0.0/8"),
    action=fy.Action("forward", port=1),
)

# Modify the default action in the "ipv4" table.
update = ~fy.P4TableEntry(
    "ipv4",
    action=fy.Action("forward", port=5),
    is_default_action=True
)
Operators:

You can retrieve a match field from a table entry using []. For example, entry["ipv4_dst"] is the same as entry.match["ipv4_dst"].

Formatting Helpers:

The match_str and action_str methods provide P4Info-aware formatting of the match and action attributes.

P4TableEntry( table_id: str = '', *, match: P4TableMatch | None = None, action: P4TableAction | P4IndirectAction | None = None, is_default_action: bool = False, priority: int = 0, metadata: bytes = b'', controller_metadata: int = 0, meter_config: P4MeterConfig | None = None, counter_data: P4CounterData | None = None, meter_counter_data: P4MeterCounterData | None = None, idle_timeout_ns: int = 0, time_since_last_hit: int | None = None, is_const: bool = False)
table_id: str

Name of the table.

match: P4TableMatch | None

Entry's match fields.

action: P4TableAction | P4IndirectAction | None

Entry's action.

is_default_action: bool

True if entry is the default table entry.

priority: int

Priority of a table entry when match implies TCAM lookup.

metadata: bytes

Arbitrary controller cookie. (1.2.0).

controller_metadata: int

Deprecated controller cookie (< 1.2.0).

meter_config: P4MeterConfig | None

Meter configuration.

counter_data: P4CounterData | None

Counter data for table entry.

meter_counter_data: P4MeterCounterData | None

Meter counter data (1.4.0).

idle_timeout_ns: int

Idle timeout in nanoseconds.

time_since_last_hit: int | None

Nanoseconds since entry last matched.

is_const: bool

True if entry is constant (1.4.0).

def __getitem__(self, key: str) -> Any:
988    def __getitem__(self, key: str) -> Any:
989        "Convenience accessor to retrieve a value from the `match` property."
990        if self.match is not None:
991            return self.match[key]
992        raise KeyError(key)

Convenience accessor to retrieve a value from the match property.

def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
994    def encode(self, schema: P4Schema) -> p4r.Entity:
995        "Encode TableEntry data as protobuf."
996        return p4r.Entity(table_entry=self.encode_entry(schema))

Encode TableEntry data as protobuf.

def encode_entry(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.TableEntry:
 998    def encode_entry(self, schema: P4Schema) -> p4r.TableEntry:
 999        "Encode TableEntry data as protobuf."
1000        if not self.table_id:
1001            return self._encode_empty()
1002
1003        table = schema.tables[self.table_id]
1004
1005        if self.match:
1006            match = self.match.encode(table)
1007        else:
1008            match = None
1009
1010        if self.action:
1011            action = self.action.encode_table_action(table)
1012        else:
1013            action = None
1014
1015        if self.meter_config:
1016            meter_config = self.meter_config.encode()
1017        else:
1018            meter_config = None
1019
1020        if self.counter_data:
1021            counter_data = self.counter_data.encode()
1022        else:
1023            counter_data = None
1024
1025        if self.meter_counter_data:
1026            meter_counter_data = self.meter_counter_data.encode()
1027        else:
1028            meter_counter_data = None
1029
1030        if self.time_since_last_hit is not None:
1031            time_since_last_hit = p4r.TableEntry.IdleTimeout(
1032                elapsed_ns=self.time_since_last_hit
1033            )
1034        else:
1035            time_since_last_hit = None
1036
1037        return p4r.TableEntry(
1038            table_id=table.id,
1039            match=match,
1040            action=action,
1041            priority=self.priority,
1042            controller_metadata=self.controller_metadata,
1043            meter_config=meter_config,
1044            counter_data=counter_data,
1045            meter_counter_data=meter_counter_data,
1046            is_default_action=self.is_default_action,
1047            idle_timeout_ns=self.idle_timeout_ns,
1048            time_since_last_hit=time_since_last_hit,
1049            metadata=self.metadata,
1050            is_const=self.is_const,
1051        )

Encode TableEntry data as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1073    @classmethod
1074    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1075        "Decode protobuf to TableEntry data."
1076        return cls.decode_entry(msg.table_entry, schema)

Decode protobuf to TableEntry data.

@classmethod
def decode_entry( cls, entry: p4.v1.p4runtime_pb2.TableEntry, schema: P4Schema) -> typing_extensions.Self:
1078    @classmethod
1079    def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self:
1080        "Decode protobuf to TableEntry data."
1081        if entry.table_id == 0:
1082            return cls("")
1083
1084        table = schema.tables[entry.table_id]
1085
1086        if entry.match:
1087            match = P4TableMatch.decode(entry.match, table)
1088        else:
1089            match = None
1090
1091        if entry.HasField("action"):
1092            action = P4TableAction.decode_table_action(entry.action, table)
1093        else:
1094            action = None
1095
1096        if entry.HasField("time_since_last_hit"):
1097            last_hit = entry.time_since_last_hit.elapsed_ns
1098        else:
1099            last_hit = None
1100
1101        if entry.HasField("meter_config"):
1102            meter_config = P4MeterConfig.decode(entry.meter_config)
1103        else:
1104            meter_config = None
1105
1106        if entry.HasField("counter_data"):
1107            counter_data = P4CounterData.decode(entry.counter_data)
1108        else:
1109            counter_data = None
1110
1111        if entry.HasField("meter_counter_data"):
1112            meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data)
1113        else:
1114            meter_counter_data = None
1115
1116        return cls(
1117            table_id=table.alias,
1118            match=match,
1119            action=action,
1120            priority=entry.priority,
1121            controller_metadata=entry.controller_metadata,
1122            meter_config=meter_config,
1123            counter_data=counter_data,
1124            meter_counter_data=meter_counter_data,
1125            is_default_action=entry.is_default_action,
1126            idle_timeout_ns=entry.idle_timeout_ns,
1127            time_since_last_hit=last_hit,
1128            metadata=entry.metadata,
1129            is_const=entry.is_const,
1130        )

Decode protobuf to TableEntry data.

def match_dict( self, schema: P4Schema, *, wildcard: str | None = None) -> dict[str, str]:
1132    def match_dict(
1133        self,
1134        schema: P4Schema,
1135        *,
1136        wildcard: str | None = None,
1137    ) -> dict[str, str]:
1138        """Format the match fields as a dictionary of strings.
1139
1140        If `wildcard` is None, only include match fields that have values. If
1141        `wildcard` is set, include all field names but replace unset values with
1142        given wildcard value (e.g. "*")
1143        """
1144        table = schema.tables[self.table_id]
1145        if self.match is not None:
1146            return self.match.format_dict(table, wildcard=wildcard)
1147        return P4TableMatch().format_dict(table, wildcard=wildcard)

Format the match fields as a dictionary of strings.

If wildcard is None, only include match fields that have values. If wildcard is set, include all field names but replace unset values with given wildcard value (e.g. "*")

def match_str( self, schema: P4Schema, *, wildcard: str | None = None) -> str:
1149    def match_str(
1150        self,
1151        schema: P4Schema,
1152        *,
1153        wildcard: str | None = None,
1154    ) -> str:
1155        "Format the match fields as a human-readable, canonical string."
1156        table = schema.tables[self.table_id]
1157        if self.match is not None:
1158            return self.match.format_str(table, wildcard=wildcard)
1159        return P4TableMatch().format_str(table, wildcard=wildcard)

Format the match fields as a human-readable, canonical string.

def action_str(self, schema: P4Schema) -> str:
1161    def action_str(self, schema: P4Schema) -> str:
1162        "Format the actions as a human-readable, canonical string."
1163        table = schema.tables[self.table_id]
1164        if self.action is None:
1165            return NOACTION_STR
1166        return self.action.format_str(table)

Format the actions as a human-readable, canonical string.

Match = <class 'P4TableMatch'>

Match is an alias for P4TableMatch.

class P4TableMatch(dict[str, typing.Any]):
292class P4TableMatch(dict[str, Any]):
293    """Represents a set of P4Runtime field matches.
294
295    Each match field is stored as a dictionary key, where the key is the name
296    of the match field. The field's value should be appropriate for the type
297    of match (EXACT, LPM, TERNARY, etc.)
298
299    Construct a match similar to a dictionary.
300
301    Example:
302    ```
303    # Keyword arguments:
304    match = P4TableMatch(ipv4_dst="10.0.0.1")
305
306    # Dictionary argument:
307    match = P4TableMatch({"ipv4_dst": "10.0.0.1"})
308
309    # List of 2-tuples:
310    match = P4TableMatch([("ipv4_dst", "10.0.0.1")])
311    ```
312
313    P4TableMatch is implemented as a subclass of `dict`. It supports all of the
314    standard dictionary methods:
315    ```
316    match = P4TableMatch()
317    match["ipv4_dst"] = "10.0.0.1"
318    assert len(match) == 1
319    ```
320
321    Reference "9.1.1 Match Format":
322        Each match field is translated to a FieldMatch Protobuf message by
323        translating the entry key to a `field_id`. The type of match (EXACT,
324        LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the
325        value is converted to the Protobuf representation.
326
327    Supported String Values:
328    ```
329    EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"
330
331    LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
332            "00:00:00:00:00:01/48"
333        (+ all exact formats are promoted to all-1 masks)
334
335    TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
336        "2000::1/&128", "00:00:00:00:00:01/&48"
337        (+ all exact formats are promoted to all-1 masks)
338        (+ all lpm formats are promoted to the specified contiguous mask)
339
340    RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
341        "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
342        (+ all exact formats are promoted to single-value ranges)
343
344    OPTIONAL: Same as exact format.
345    ```
346
347    See the `p4values.py` module for all supported value classes.
348
349    TODO:
350        - Change range delimiter to '-' (and drop '-' delimited MAC's).
351        - Consider supporting ternary values with just '/' (and drop support
352          for decimal masks; mask must be hexadecimal number).
353
354    See Also:
355        - P4TableEntry
356    """
357
358    def encode(self, table: P4Table) -> list[p4r.FieldMatch]:
359        "Encode TableMatch data as a list of Protobuf fields."
360        result: list[p4r.FieldMatch] = []
361        match_fields = table.match_fields
362
363        for key, value in self.items():
364            try:
365                field = match_fields[key].encode_field(value)
366                if field is not None:
367                    result.append(field)
368            except Exception as ex:
369                raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex
370
371        return result
372
373    @classmethod
374    def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self:
375        "Decode Protobuf fields as TableMatch data."
376        result: dict[str, Any] = {}
377        match_fields = table.match_fields
378
379        for field in msgs:
380            fld = match_fields[field.field_id]
381            result[fld.alias] = fld.decode_field(field)
382
383        return cls(result)
384
385    def format_dict(
386        self,
387        table: P4Table,
388        *,
389        wildcard: str | None = None,
390    ) -> dict[str, str]:
391        """Format the table match fields as a human-readable dictionary.
392
393        The result is a dictionary showing the TableMatch data for fields
394        included in the match. If `wildcard` is specified, all fields defined
395        in P4Info will be included with their value set to the wildcard string.
396
397        Values are formatted using the format/type specified in P4Info.
398        """
399        result: dict[str, str] = {}
400
401        for fld in table.match_fields:
402            value = self.get(fld.alias, None)
403            if value is not None:
404                result[fld.alias] = fld.format_field(value)
405            elif wildcard is not None:
406                result[fld.alias] = wildcard
407
408        return result
409
410    def format_str(
411        self,
412        table: P4Table,
413        *,
414        wildcard: str | None = None,
415    ) -> str:
416        """Format the table match fields as a human-readable string.
417
418        The result is a string showing the TableMatch data for fields included
419        in the match. If `wildcard` is specified, all fields defined in P4Info
420        will be included with their value set to the wildcard string.
421
422        All fields are formatted as "name=value" and they are delimited by
423        spaces.
424
425        Values are formatted using the format/type specified in P4Info.
426        """
427        result: list[str] = []
428
429        for fld in table.match_fields:
430            value = self.get(fld.alias, None)
431            if value is not None:
432                result.append(f"{fld.alias}={fld.format_field(value)}")
433            elif wildcard is not None:
434                result.append(f"{fld.alias}={wildcard}")
435
436        return " ".join(result)

Represents a set of P4Runtime field matches.

Each match field is stored as a dictionary key, where the key is the name of the match field. The field's value should be appropriate for the type of match (EXACT, LPM, TERNARY, etc.)

Construct a match similar to a dictionary.

Example:

# Keyword arguments:
match = P4TableMatch(ipv4_dst="10.0.0.1")

# Dictionary argument:
match = P4TableMatch({"ipv4_dst": "10.0.0.1"})

# List of 2-tuples:
match = P4TableMatch([("ipv4_dst", "10.0.0.1")])

P4TableMatch is implemented as a subclass of dict. It supports all of the standard dictionary methods:

match = P4TableMatch()
match["ipv4_dst"] = "10.0.0.1"
assert len(match) == 1

Reference "9.1.1 Match Format": Each match field is translated to a FieldMatch Protobuf message by translating the entry key to a field_id. The type of match (EXACT, LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the value is converted to the Protobuf representation.

Supported String Values:

EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"

LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
        "00:00:00:00:00:01/48"
    (+ all exact formats are promoted to all-1 masks)

TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
    "2000::1/&128", "00:00:00:00:00:01/&48"
    (+ all exact formats are promoted to all-1 masks)
    (+ all lpm formats are promoted to the specified contiguous mask)

RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
    "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
    (+ all exact formats are promoted to single-value ranges)

OPTIONAL: Same as exact format.

See the p4values.py module for all supported value classes.

TODO:
  • Change range delimiter to '-' (and drop '-' delimited MAC's).
  • Consider supporting ternary values with just '/' (and drop support for decimal masks; mask must be hexadecimal number).
See Also:
  • P4TableEntry
def encode( self, table: finsy.p4schema.P4Table) -> list[p4.v1.p4runtime_pb2.FieldMatch]:
358    def encode(self, table: P4Table) -> list[p4r.FieldMatch]:
359        "Encode TableMatch data as a list of Protobuf fields."
360        result: list[p4r.FieldMatch] = []
361        match_fields = table.match_fields
362
363        for key, value in self.items():
364            try:
365                field = match_fields[key].encode_field(value)
366                if field is not None:
367                    result.append(field)
368            except Exception as ex:
369                raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex
370
371        return result

Encode TableMatch data as a list of Protobuf fields.

@classmethod
def decode( cls, msgs: Iterable[p4.v1.p4runtime_pb2.FieldMatch], table: finsy.p4schema.P4Table) -> typing_extensions.Self:
373    @classmethod
374    def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self:
375        "Decode Protobuf fields as TableMatch data."
376        result: dict[str, Any] = {}
377        match_fields = table.match_fields
378
379        for field in msgs:
380            fld = match_fields[field.field_id]
381            result[fld.alias] = fld.decode_field(field)
382
383        return cls(result)

Decode Protobuf fields as TableMatch data.

def format_dict( self, table: finsy.p4schema.P4Table, *, wildcard: str | None = None) -> dict[str, str]:
385    def format_dict(
386        self,
387        table: P4Table,
388        *,
389        wildcard: str | None = None,
390    ) -> dict[str, str]:
391        """Format the table match fields as a human-readable dictionary.
392
393        The result is a dictionary showing the TableMatch data for fields
394        included in the match. If `wildcard` is specified, all fields defined
395        in P4Info will be included with their value set to the wildcard string.
396
397        Values are formatted using the format/type specified in P4Info.
398        """
399        result: dict[str, str] = {}
400
401        for fld in table.match_fields:
402            value = self.get(fld.alias, None)
403            if value is not None:
404                result[fld.alias] = fld.format_field(value)
405            elif wildcard is not None:
406                result[fld.alias] = wildcard
407
408        return result

Format the table match fields as a human-readable dictionary.

The result is a dictionary showing the TableMatch data for fields included in the match. If wildcard is specified, all fields defined in P4Info will be included with their value set to the wildcard string.

Values are formatted using the format/type specified in P4Info.

def format_str( self, table: finsy.p4schema.P4Table, *, wildcard: str | None = None) -> str:
410    def format_str(
411        self,
412        table: P4Table,
413        *,
414        wildcard: str | None = None,
415    ) -> str:
416        """Format the table match fields as a human-readable string.
417
418        The result is a string showing the TableMatch data for fields included
419        in the match. If `wildcard` is specified, all fields defined in P4Info
420        will be included with their value set to the wildcard string.
421
422        All fields are formatted as "name=value" and they are delimited by
423        spaces.
424
425        Values are formatted using the format/type specified in P4Info.
426        """
427        result: list[str] = []
428
429        for fld in table.match_fields:
430            value = self.get(fld.alias, None)
431            if value is not None:
432                result.append(f"{fld.alias}={fld.format_field(value)}")
433            elif wildcard is not None:
434                result.append(f"{fld.alias}={wildcard}")
435
436        return " ".join(result)

Format the table match fields as a human-readable string.

The result is a string showing the TableMatch data for fields included in the match. If wildcard is specified, all fields defined in P4Info will be included with their value set to the wildcard string.

All fields are formatted as "name=value" and they are delimited by spaces.

Values are formatted using the format/type specified in P4Info.

@decodable('value_set_entry')
@dataclass(slots=True)
class P4ValueSetEntry(finsy.p4entity._P4ModifyOnly):
1861@decodable("value_set_entry")
1862@dataclass(slots=True)
1863class P4ValueSetEntry(_P4ModifyOnly):
1864    "Represents a P4Runtime ValueSetEntry."
1865
1866    value_set_id: str
1867    _: KW_ONLY
1868    members: list[P4ValueSetMember]
1869
1870    def encode(self, schema: P4Schema) -> p4r.Entity:
1871        "Encode P4ValueSetEntry as protobuf."
1872        value_set = schema.value_sets[self.value_set_id]
1873        members = [
1874            p4r.ValueSetMember(match=member.encode(value_set))
1875            for member in self.members
1876        ]
1877
1878        return p4r.Entity(
1879            value_set_entry=p4r.ValueSetEntry(
1880                value_set_id=value_set.id, members=members
1881            )
1882        )
1883
1884    @classmethod
1885    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1886        "Decode protobuf to P4ValueSetEntry."
1887        entry = msg.value_set_entry
1888        value_set = schema.value_sets[entry.value_set_id]
1889
1890        members = [
1891            P4ValueSetMember.decode(member.match, value_set) for member in entry.members
1892        ]
1893
1894        return cls(value_set.alias, members=members)

Represents a P4Runtime ValueSetEntry.

P4ValueSetEntry(value_set_id: str, *, members: list[finsy.p4entity.P4ValueSetMember])
value_set_id: str
members: list[finsy.p4entity.P4ValueSetMember]
def encode(self, schema: P4Schema) -> p4.v1.p4runtime_pb2.Entity:
1870    def encode(self, schema: P4Schema) -> p4r.Entity:
1871        "Encode P4ValueSetEntry as protobuf."
1872        value_set = schema.value_sets[self.value_set_id]
1873        members = [
1874            p4r.ValueSetMember(match=member.encode(value_set))
1875            for member in self.members
1876        ]
1877
1878        return p4r.Entity(
1879            value_set_entry=p4r.ValueSetEntry(
1880                value_set_id=value_set.id, members=members
1881            )
1882        )

Encode P4ValueSetEntry as protobuf.

@classmethod
def decode( cls, msg: p4.v1.p4runtime_pb2.Entity, schema: P4Schema) -> typing_extensions.Self:
1884    @classmethod
1885    def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self:
1886        "Decode protobuf to P4ValueSetEntry."
1887        entry = msg.value_set_entry
1888        value_set = schema.value_sets[entry.value_set_id]
1889
1890        members = [
1891            P4ValueSetMember.decode(member.match, value_set) for member in entry.members
1892        ]
1893
1894        return cls(value_set.alias, members=members)

Decode protobuf to P4ValueSetEntry.

class P4ConfigAction(finsy.grpcutil._EnumBase):
117class P4ConfigAction(_EnumBase):
118    "IntEnum equivalent to `p4r.SetForwardingPipelineConfigRequest.Action`."
119    UNSPECIFIED = p4r.SetForwardingPipelineConfigRequest.Action.UNSPECIFIED
120    VERIFY = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY
121    VERIFY_AND_SAVE = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_SAVE
122    VERIFY_AND_COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT
123    COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.COMMIT
124    RECONCILE_AND_COMMIT = (
125        p4r.SetForwardingPipelineConfigRequest.Action.RECONCILE_AND_COMMIT
126    )
127
128    def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType:
129        "Cast `self` to `ValueType`."
130        return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)

IntEnum equivalent to p4r.SetForwardingPipelineConfigRequest.Action.

VERIFY_AND_SAVE = P4ConfigAction.VERIFY_AND_SAVE
VERIFY_AND_COMMIT = P4ConfigAction.VERIFY_AND_COMMIT
RECONCILE_AND_COMMIT = P4ConfigAction.RECONCILE_AND_COMMIT
def vt(self) -> int:
128    def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType:
129        "Cast `self` to `ValueType`."
130        return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)

Cast self to ValueType.

class P4Schema(finsy.p4schema._ReprMixin):
487class P4Schema(_ReprMixin):
488    """Represents a P4Info file and its associated P4 blob (optional).
489
490    ```
491    p4 = P4Schema(Path("basic.p4info.txtpb"))
492    ```
493
494    This class parses the P4Info contents to produce an in-memory representation
495    of the tables, actions, types, etc. inside. This in-memory graph of the
496    contents of the P4Info file may be shared when we parse identical
497    P4Info files. The sharing of P4Info data is controlled by the
498    `P4SchemaCache` class.
499    """
500
501    _p4info: p4i.P4Info | None
502    _p4blob: Path | bytes | SupportsBytes | None
503    _p4defs: _P4Defs  # possibly shared in-memory representation
504    _p4cookie: int = 0
505
506    def __init__(
507        self,
508        p4info: p4i.P4Info | Path | None = None,
509        p4blob: Path | bytes | SupportsBytes | None = None,
510    ):
511        self._p4blob = p4blob
512        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
513            p4info,
514            self._p4blob,
515        )
516
517    @property
518    def exists(self) -> bool:
519        "True if p4info is configured."
520        return self._p4info is not None
521
522    @property
523    def is_authoritative(self) -> bool:
524        "True if both p4info and p4blob are configured."
525        return self._p4info is not None and self._p4blob is not None
526
527    @property
528    def p4info(self) -> p4i.P4Info:
529        "P4Info value."
530        if self._p4info is None:
531            raise ValueError("No P4Info configured.")
532        return self._p4info
533
534    def set_p4info(self, p4info: p4i.P4Info) -> None:
535        "Set P4Info using value returned from switch."
536        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
537            p4info,
538            self._p4blob,
539        )
540
541    def has_p4info(self, p4info: p4i.P4Info) -> bool:
542        "Return true if the current P4Info equals the given P4Info."
543        if self._p4info is None:
544            return False
545        return self._p4info.SerializeToString(
546            deterministic=True
547        ) == p4info.SerializeToString(deterministic=True)
548
549    @property
550    def p4blob(self) -> bytes:
551        "P4Blob value a.k.a p4_device_config."
552        return _blob_bytes(self._p4blob)
553
554    @property
555    def p4cookie(self) -> int:
556        """Cookie value for p4info and p4blob."""
557        return self._p4cookie
558
559    def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig:
560        """The forwarding pipeline configuration."""
561        return p4r.ForwardingPipelineConfig(
562            p4info=self.p4info,
563            p4_device_config=self.p4blob,
564            cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie),
565        )
566
567    def get_pipeline_info(self) -> str:
568        "Concise string description of the pipeline (suitable for logging)."
569        if self.exists:
570            pipeline = self.name
571            version = self.version
572            arch = self.arch
573            return f"{pipeline=} {version=} {arch=}"
574
575        return "<No pipeline exists>"
576
577    @property
578    def name(self) -> str:
579        "Name from pkg_info."
580        if self._p4info is None:
581            return ""
582        return self._p4info.pkg_info.name
583
584    @property
585    def version(self) -> str:
586        "Version from pkg_info."
587        if self._p4info is None:
588            return ""
589        return self._p4info.pkg_info.version
590
591    @property
592    def arch(self) -> str:
593        "Arch from pkg_info."
594        if self._p4info is None:
595            return ""
596        return self._p4info.pkg_info.arch
597
598    @property
599    def pkg_info(self) -> p4i.PkgInfo:
600        """Protobuf message containing original `PkgInfo` header.
601
602        Use this to access less frequently used fields like `contact`, `url`,
603        and `platform_properties`.
604        """
605        if self._p4info is None:
606            raise ValueError("P4Info: No pipeline configured")
607        return self._p4info.pkg_info
608
609    @property
610    def tables(self) -> P4EntityMap["P4Table"]:
611        "Collection of P4 tables."
612        return self._p4defs.tables
613
614    @property
615    def actions(self) -> P4EntityMap["P4Action"]:
616        "Collection of P4 actions."
617        return self._p4defs.actions
618
619    @property
620    def action_profiles(self) -> P4EntityMap["P4ActionProfile"]:
621        "Collection of P4 action profiles."
622        return self._p4defs.action_profiles
623
624    @property
625    def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]:
626        "Collection of P4 controller packet metadata."
627        return self._p4defs.controller_packet_metadata
628
629    @property
630    def direct_counters(self) -> P4EntityMap["P4DirectCounter"]:
631        "Collection of P4 direct counters."
632        return self._p4defs.direct_counters
633
634    @property
635    def direct_meters(self) -> P4EntityMap["P4DirectMeter"]:
636        "Collection of P4 direct meters."
637        return self._p4defs.direct_meters
638
639    @property
640    def counters(self) -> P4EntityMap["P4Counter"]:
641        "Collection of P4 counters."
642        return self._p4defs.counters
643
644    @property
645    def meters(self) -> P4EntityMap["P4Meter"]:
646        "Collection of P4 meters."
647        return self._p4defs.meters
648
649    @property
650    def registers(self) -> P4EntityMap["P4Register"]:
651        "Collection of P4 registers."
652        return self._p4defs.registers
653
654    @property
655    def digests(self) -> P4EntityMap["P4Digest"]:
656        "Collection of P4 digests."
657        return self._p4defs.digests
658
659    @property
660    def value_sets(self) -> P4EntityMap["P4ValueSet"]:
661        "Collection of P4 value sets."
662        return self._p4defs.value_sets
663
664    @property
665    def type_info(self) -> "P4TypeInfo":
666        "Type Info object."
667        return self._p4defs.type_info
668
669    @property
670    def externs(self) -> "P4ExternMap":
671        "Collection of P4 extern instances."
672        return self._p4defs.externs
673
674    def __str__(self) -> str:
675        if self._p4info is None:
676            return "<P4Info: No pipeline configured>"
677        return str(P4SchemaDescription(self))

Represents a P4Info file and its associated P4 blob (optional).

p4 = P4Schema(Path("basic.p4info.txtpb"))

This class parses the P4Info contents to produce an in-memory representation of the tables, actions, types, etc. inside. This in-memory graph of the contents of the P4Info file may be shared when we parse identical P4Info files. The sharing of P4Info data is controlled by the P4SchemaCache class.

P4Schema( p4info: p4.config.v1.p4info_pb2.P4Info | pathlib.Path | None = None, p4blob: pathlib.Path | bytes | typing.SupportsBytes | None = None)
506    def __init__(
507        self,
508        p4info: p4i.P4Info | Path | None = None,
509        p4blob: Path | bytes | SupportsBytes | None = None,
510    ):
511        self._p4blob = p4blob
512        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
513            p4info,
514            self._p4blob,
515        )
exists: bool
517    @property
518    def exists(self) -> bool:
519        "True if p4info is configured."
520        return self._p4info is not None

True if p4info is configured.

is_authoritative: bool
522    @property
523    def is_authoritative(self) -> bool:
524        "True if both p4info and p4blob are configured."
525        return self._p4info is not None and self._p4blob is not None

True if both p4info and p4blob are configured.

p4info: p4.config.v1.p4info_pb2.P4Info
527    @property
528    def p4info(self) -> p4i.P4Info:
529        "P4Info value."
530        if self._p4info is None:
531            raise ValueError("No P4Info configured.")
532        return self._p4info

P4Info value.

def set_p4info(self, p4info: p4.config.v1.p4info_pb2.P4Info) -> None:
534    def set_p4info(self, p4info: p4i.P4Info) -> None:
535        "Set P4Info using value returned from switch."
536        self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info(
537            p4info,
538            self._p4blob,
539        )

Set P4Info using value returned from switch.

def has_p4info(self, p4info: p4.config.v1.p4info_pb2.P4Info) -> bool:
541    def has_p4info(self, p4info: p4i.P4Info) -> bool:
542        "Return true if the current P4Info equals the given P4Info."
543        if self._p4info is None:
544            return False
545        return self._p4info.SerializeToString(
546            deterministic=True
547        ) == p4info.SerializeToString(deterministic=True)

Return true if the current P4Info equals the given P4Info.

p4blob: bytes
549    @property
550    def p4blob(self) -> bytes:
551        "P4Blob value a.k.a p4_device_config."
552        return _blob_bytes(self._p4blob)

P4Blob value a.k.a p4_device_config.

p4cookie: int
554    @property
555    def p4cookie(self) -> int:
556        """Cookie value for p4info and p4blob."""
557        return self._p4cookie

Cookie value for p4info and p4blob.

def get_pipeline_config(self) -> p4.v1.p4runtime_pb2.ForwardingPipelineConfig:
559    def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig:
560        """The forwarding pipeline configuration."""
561        return p4r.ForwardingPipelineConfig(
562            p4info=self.p4info,
563            p4_device_config=self.p4blob,
564            cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie),
565        )

The forwarding pipeline configuration.

def get_pipeline_info(self) -> str:
567    def get_pipeline_info(self) -> str:
568        "Concise string description of the pipeline (suitable for logging)."
569        if self.exists:
570            pipeline = self.name
571            version = self.version
572            arch = self.arch
573            return f"{pipeline=} {version=} {arch=}"
574
575        return "<No pipeline exists>"

Concise string description of the pipeline (suitable for logging).

name: str
577    @property
578    def name(self) -> str:
579        "Name from pkg_info."
580        if self._p4info is None:
581            return ""
582        return self._p4info.pkg_info.name

Name from pkg_info.

version: str
584    @property
585    def version(self) -> str:
586        "Version from pkg_info."
587        if self._p4info is None:
588            return ""
589        return self._p4info.pkg_info.version

Version from pkg_info.

arch: str
591    @property
592    def arch(self) -> str:
593        "Arch from pkg_info."
594        if self._p4info is None:
595            return ""
596        return self._p4info.pkg_info.arch

Arch from pkg_info.

pkg_info: p4.config.v1.p4info_pb2.PkgInfo
598    @property
599    def pkg_info(self) -> p4i.PkgInfo:
600        """Protobuf message containing original `PkgInfo` header.
601
602        Use this to access less frequently used fields like `contact`, `url`,
603        and `platform_properties`.
604        """
605        if self._p4info is None:
606            raise ValueError("P4Info: No pipeline configured")
607        return self._p4info.pkg_info

Protobuf message containing original PkgInfo header.

Use this to access less frequently used fields like contact, url, and platform_properties.

tables: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Table]
609    @property
610    def tables(self) -> P4EntityMap["P4Table"]:
611        "Collection of P4 tables."
612        return self._p4defs.tables

Collection of P4 tables.

actions: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Action]
614    @property
615    def actions(self) -> P4EntityMap["P4Action"]:
616        "Collection of P4 actions."
617        return self._p4defs.actions

Collection of P4 actions.

action_profiles: finsy.p4schema.P4EntityMap[finsy.p4schema.P4ActionProfile]
619    @property
620    def action_profiles(self) -> P4EntityMap["P4ActionProfile"]:
621        "Collection of P4 action profiles."
622        return self._p4defs.action_profiles

Collection of P4 action profiles.

controller_packet_metadata: finsy.p4schema.P4EntityMap[finsy.p4schema.P4ControllerPacketMetadata]
624    @property
625    def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]:
626        "Collection of P4 controller packet metadata."
627        return self._p4defs.controller_packet_metadata

Collection of P4 controller packet metadata.

direct_counters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4DirectCounter]
629    @property
630    def direct_counters(self) -> P4EntityMap["P4DirectCounter"]:
631        "Collection of P4 direct counters."
632        return self._p4defs.direct_counters

Collection of P4 direct counters.

direct_meters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4DirectMeter]
634    @property
635    def direct_meters(self) -> P4EntityMap["P4DirectMeter"]:
636        "Collection of P4 direct meters."
637        return self._p4defs.direct_meters

Collection of P4 direct meters.

counters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Counter]
639    @property
640    def counters(self) -> P4EntityMap["P4Counter"]:
641        "Collection of P4 counters."
642        return self._p4defs.counters

Collection of P4 counters.

meters: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Meter]
644    @property
645    def meters(self) -> P4EntityMap["P4Meter"]:
646        "Collection of P4 meters."
647        return self._p4defs.meters

Collection of P4 meters.

registers: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Register]
649    @property
650    def registers(self) -> P4EntityMap["P4Register"]:
651        "Collection of P4 registers."
652        return self._p4defs.registers

Collection of P4 registers.

digests: finsy.p4schema.P4EntityMap[finsy.p4schema.P4Digest]
654    @property
655    def digests(self) -> P4EntityMap["P4Digest"]:
656        "Collection of P4 digests."
657        return self._p4defs.digests

Collection of P4 digests.

value_sets: finsy.p4schema.P4EntityMap[finsy.p4schema.P4ValueSet]
659    @property
660    def value_sets(self) -> P4EntityMap["P4ValueSet"]:
661        "Collection of P4 value sets."
662        return self._p4defs.value_sets

Collection of P4 value sets.

type_info: finsy.p4schema.P4TypeInfo
664    @property
665    def type_info(self) -> "P4TypeInfo":
666        "Type Info object."
667        return self._p4defs.type_info

Type Info object.

externs: finsy.p4schema.P4ExternMap
669    @property
670    def externs(self) -> "P4ExternMap":
671        "Collection of P4 extern instances."
672        return self._p4defs.externs

Collection of P4 extern instances.

@final
class Switch:
 154@final
 155class Switch:
 156    """Represents a P4Runtime Switch.
 157
 158    A `Switch` is constructed with a `name`, `address` and an optional
 159    `SwitchOptions` configuration.
 160
 161    The `name` is up to the user but should uniquely identify the switch.
 162
 163    The `address` identifies the target endpoint of the GRPC channel. It should
 164    have the format `<ADDRESS>:<PORT>` where `<ADDRESS>` can be a domain name,
 165    IPv4 address, or IPv6 address in square brackets, and `<PORT>` is the TCP
 166    port number.
 167
 168    The `options` is a `SwitchOptions` object that specifies how the `Switch`
 169    will behave.
 170
 171    ```
 172    opts = SwitchOptions(p4info=..., p4blob=...)
 173    sw1 = Switch('sw1', '10.0.0.1:50000', opts)
 174    ```
 175
 176    The `stash` is an optional dictionary for storing arbitrary per-switch
 177    information. The dictionary keys are strings. You can use this to store
 178    anything that you need to manage the switch.
 179
 180    Each switch object has an event emitter `ee`. Use the EventEmitter to listen
 181    for port change events like PORT_UP and PORT_DOWN. See the `SwitchEvent`
 182    class for a list of other switch events.
 183    """
 184
 185    _name: str
 186    _address: str
 187    _options: SwitchOptions
 188    _stash: dict[str, Any]
 189    _ee: "SwitchEmitter"
 190    _p4client: P4Client | None
 191    _p4schema: P4Schema
 192    _tasks: "SwitchTasks | None"
 193    _packet_queues: list[tuple[Callable[[bytes], bool], Queue[p4entity.P4PacketIn]]]
 194    _digest_queues: dict[str, Queue[p4entity.P4DigestList]]
 195    _timeout_queue: Queue[p4entity.P4IdleTimeoutNotification] | None
 196    _arbitrator: "Arbitrator"
 197    _gnmi_client: GNMIClient | None
 198    _ports: SwitchPortList
 199    _is_channel_up: bool = False
 200    _api_version: ApiVersion = ApiVersion(1, 0, 0, "")
 201    _control_task: asyncio.Task[Any] | None = None
 202
 203    def __init__(
 204        self,
 205        name: str,
 206        address: str,
 207        options: SwitchOptions | None = None,
 208        *,
 209        stash: dict[str, Any] | None = None,
 210    ) -> None:
 211        """Initialize switch with name, address and options.
 212
 213        Args:
 214            name: A human-readable name to uniquely identify the switch.
 215            address: The target address of the P4Runtime GRPC channel.
 216              Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or
 217              IP address, and `<PORT>` is the TCP port number.
 218            options: Configuration options for the switch.
 219            stash: Optional user-controlled dictionary.
 220              Used to store information that user code needs to access or share.
 221        """
 222        if options is None:
 223            options = SwitchOptions()
 224
 225        self._name = name
 226        self._address = address
 227        self._options = options
 228        self._stash = stash or {}
 229        self._ee = SwitchEmitter(self)
 230        self._p4client = None
 231        self._p4schema = P4Schema(options.p4info, options.p4blob)
 232        self._tasks = None
 233        self._packet_queues = []
 234        self._digest_queues = {}
 235        self._timeout_queue = None
 236        self._arbitrator = Arbitrator(
 237            options.initial_election_id, options.role_name, options.role_config
 238        )
 239        self._gnmi_client = None
 240        self._ports = SwitchPortList()
 241
 242    @property
 243    def name(self) -> str:
 244        "Name of the switch."
 245        return self._name
 246
 247    @property
 248    def address(self) -> str:
 249        """Address of the switch formatted as a GRPC target.
 250
 251        Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP
 252        address, and `<PORT>` is the TCP port number.
 253        """
 254        return self._address
 255
 256    @property
 257    def options(self) -> SwitchOptions:
 258        "Switch options."
 259        return self._options
 260
 261    @options.setter
 262    def options(self, opts: SwitchOptions) -> None:
 263        "Set switch options to a new value."
 264        if self._p4client is not None:
 265            raise RuntimeError("Cannot change switch options while client is open.")
 266
 267        self._options = opts
 268        self._p4schema = P4Schema(opts.p4info, opts.p4blob)
 269        self._arbitrator = Arbitrator(
 270            opts.initial_election_id, opts.role_name, opts.role_config
 271        )
 272
 273    @property
 274    def stash(self) -> dict[str, Any]:
 275        "Switch stash. Used to store per-switch data for any purpose."
 276        return self._stash
 277
 278    @property
 279    def ee(self) -> "SwitchEmitter":
 280        "Switch event emitter. See `SwitchEvent` for more details on events."
 281        return self._ee
 282
 283    @property
 284    def device_id(self) -> int:
 285        "Switch's device ID."
 286        return self._options.device_id
 287
 288    @property
 289    def is_up(self) -> bool:
 290        "True if switch is UP."
 291        return self._is_channel_up
 292
 293    @property
 294    def is_primary(self) -> bool:
 295        "True if switch is primary."
 296        return self._arbitrator.is_primary
 297
 298    @property
 299    def primary_id(self) -> int:
 300        "Election ID of switch that is currently primary."
 301        return self._arbitrator.primary_id
 302
 303    @property
 304    def election_id(self) -> int:
 305        "Switch's current election ID."
 306        return self._arbitrator.election_id
 307
 308    @property
 309    def role_name(self) -> str:
 310        "Switch's current role name."
 311        return self._arbitrator.role_name
 312
 313    @property
 314    def p4info(self) -> P4Schema:
 315        "Switch's P4 schema."
 316        return self._p4schema
 317
 318    @property
 319    def gnmi_client(self) -> GNMIClient | None:
 320        "Switch's gNMI client."
 321        return self._gnmi_client
 322
 323    @property
 324    def ports(self) -> SwitchPortList:
 325        "Switch's list of interfaces."
 326        return self._ports
 327
 328    @property
 329    def api_version(self) -> ApiVersion:
 330        "P4Runtime protocol version."
 331        return self._api_version
 332
 333    @overload
 334    async def read(
 335        self,
 336        entities: _ET,
 337    ) -> AsyncGenerator[_ET, None]:
 338        "Overload for read of a single P4Entity subtype."
 339        ...  # pragma: no cover
 340
 341    @overload
 342    async def read(
 343        self,
 344        entities: Iterable[_ET],
 345    ) -> AsyncGenerator[_ET, None]:
 346        "Overload for read of an iterable of the same P4Entity subtype."
 347        ...  # pragma: no cover
 348
 349    @overload
 350    async def read(
 351        self,
 352        entities: Iterable[p4entity.P4EntityList],
 353    ) -> AsyncGenerator[p4entity.P4Entity, None]:
 354        "Most general overload: we can't determine the return type exactly."
 355        ...  # pragma: no cover
 356
 357    async def read(
 358        self,
 359        entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity,
 360    ) -> AsyncGenerator[p4entity.P4Entity, None]:
 361        "Async iterator that reads entities from the switch."
 362        assert self._p4client is not None
 363
 364        if not entities:
 365            return
 366
 367        if isinstance(entities, p4entity.P4Entity):
 368            entities = [entities]
 369
 370        request = p4r.ReadRequest(
 371            device_id=self.device_id,
 372            entities=p4entity.encode_entities(entities, self.p4info),
 373        )
 374
 375        async for reply in self._p4client.request_iter(request):
 376            for ent in reply.entities:
 377                yield p4entity.decode_entity(ent, self.p4info)
 378
 379    async def read_packets(
 380        self,
 381        *,
 382        queue_size: int = _DEFAULT_QUEUE_SIZE,
 383        eth_types: Iterable[int] | None = None,
 384    ) -> AsyncIterator["p4entity.P4PacketIn"]:
 385        "Async iterator for incoming packets (P4PacketIn)."
 386        LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types)
 387
 388        if eth_types is None:
 389
 390            def _pkt_filter(_payload: bytes) -> bool:
 391                return True
 392
 393        else:
 394            _filter = {eth.to_bytes(2, "big") for eth in eth_types}
 395
 396            def _pkt_filter(_payload: bytes) -> bool:
 397                return _payload[12:14] in _filter
 398
 399        queue = Queue[p4entity.P4PacketIn](queue_size)
 400        queue_filter = (_pkt_filter, queue)
 401        self._packet_queues.append(queue_filter)
 402
 403        try:
 404            while True:
 405                yield await queue.get()
 406        finally:
 407            LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types)
 408            self._packet_queues.remove(queue_filter)
 409
 410    async def read_digests(
 411        self,
 412        digest_id: str,
 413        *,
 414        queue_size: int = _DEFAULT_QUEUE_SIZE,
 415    ) -> AsyncIterator["p4entity.P4DigestList"]:
 416        "Async iterator for incoming digest lists (P4DigestList)."
 417        LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id)
 418
 419        if digest_id in self._digest_queues:
 420            raise ValueError(f"queue for digest_id {digest_id!r} already open")
 421
 422        queue = Queue[p4entity.P4DigestList](queue_size)
 423        self._digest_queues[digest_id] = queue
 424        try:
 425            while True:
 426                yield await queue.get()
 427        finally:
 428            LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id)
 429            del self._digest_queues[digest_id]
 430
 431    async def read_idle_timeouts(
 432        self,
 433        *,
 434        queue_size: int = _DEFAULT_QUEUE_SIZE,
 435    ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]:
 436        "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)."
 437        LOGGER.debug("read_idle_timeouts: opening queue")
 438
 439        if self._timeout_queue is not None:
 440            raise ValueError("timeout queue already open")
 441
 442        queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size)
 443        self._timeout_queue = queue
 444        try:
 445            while True:
 446                yield await queue.get()
 447        finally:
 448            LOGGER.debug("read_idle_timeouts: closing queue")
 449            self._timeout_queue = None
 450
 451    async def write(
 452        self,
 453        entities: Iterable[p4entity.P4UpdateList],
 454        *,
 455        strict: bool = True,
 456        warn_only: bool = False,
 457    ) -> None:
 458        """Write updates and stream messages to the switch.
 459
 460        If `strict` is False, MODIFY and DELETE operations will NOT raise an
 461        error if the entity does not exist (NOT_FOUND).
 462
 463        If `warn_only` is True, no operations will raise an error. Instead,
 464        the exception will be logged as a WARNING and the method will return
 465        normally.
 466        """
 467        assert self._p4client is not None
 468
 469        if not entities:
 470            return
 471
 472        msgs = p4entity.encode_updates(entities, self.p4info)
 473
 474        updates: list[p4r.Update] = []
 475        for msg in msgs:
 476            if isinstance(msg, p4r.StreamMessageRequest):
 477                # StreamMessageRequests are transmitted immediately.
 478                # TODO: Understand what happens with backpressure?
 479                await self._p4client.send(msg)
 480            else:
 481                updates.append(msg)
 482
 483        if updates:
 484            await self._write_request(updates, strict, warn_only)
 485
 486    async def insert(
 487        self,
 488        entities: Iterable[p4entity.P4EntityList],
 489        *,
 490        warn_only: bool = False,
 491    ) -> None:
 492        """Insert the specified entities.
 493
 494        If `warn_only` is True, errors will be logged as warnings instead of
 495        raising an exception.
 496        """
 497        if entities:
 498            await self._write_request(
 499                [
 500                    p4r.Update(type=p4r.Update.INSERT, entity=ent)
 501                    for ent in p4entity.encode_entities(entities, self.p4info)
 502                ],
 503                True,
 504                warn_only,
 505            )
 506
 507    async def modify(
 508        self,
 509        entities: Iterable[p4entity.P4EntityList],
 510        *,
 511        strict: bool = True,
 512        warn_only: bool = False,
 513    ) -> None:
 514        """Modify the specified entities.
 515
 516        If `strict` is False, NOT_FOUND errors will be ignored.
 517
 518        If `warn_only` is True, errors will be logged as warnings instead of
 519        raising an exception.
 520        """
 521        if entities:
 522            await self._write_request(
 523                [
 524                    p4r.Update(type=p4r.Update.MODIFY, entity=ent)
 525                    for ent in p4entity.encode_entities(entities, self.p4info)
 526                ],
 527                strict,
 528                warn_only,
 529            )
 530
 531    async def delete(
 532        self,
 533        entities: Iterable[p4entity.P4EntityList],
 534        *,
 535        strict: bool = True,
 536        warn_only: bool = False,
 537    ) -> None:
 538        """Delete the specified entities.
 539
 540        If `strict` is False, NOT_FOUND errors will be ignored.
 541
 542        If `warn_only` is True, errors will be logged as warnings instead of
 543        raising an exception.
 544        """
 545        if entities:
 546            await self._write_request(
 547                [
 548                    p4r.Update(type=p4r.Update.DELETE, entity=ent)
 549                    for ent in p4entity.encode_entities(entities, self.p4info)
 550                ],
 551                strict,
 552                warn_only,
 553            )
 554
 555    async def delete_all(self) -> None:
 556        """Delete all entities if no parameter is passed. Otherwise, delete
 557        items that match `entities`.
 558
 559        This method does not attempt to delete entries in const tables.
 560
 561        TODO: This method does not affect indirect counters, meters or
 562        value_sets.
 563        """
 564        await self.delete_many(
 565            [
 566                p4entity.P4TableEntry(),
 567                p4entity.P4MulticastGroupEntry(),
 568                p4entity.P4CloneSessionEntry(),
 569            ]
 570        )
 571
 572        # Reset all default table entries.
 573        default_entries = [
 574            p4entity.P4TableEntry(table.alias, is_default_action=True)
 575            for table in self.p4info.tables
 576            if table.const_default_action is None and table.action_profile is None
 577        ]
 578        if default_entries:
 579            await self.modify(default_entries)
 580
 581        # Delete all P4ActionProfileGroup's and P4ActionProfileMember's.
 582        # We do this after deleting the P4TableEntry's in case a client is using
 583        # "one-shot" references; these are incompatible with separate
 584        # action profiles.
 585        await self.delete_many(
 586            [
 587                p4entity.P4ActionProfileGroup(),
 588                p4entity.P4ActionProfileMember(),
 589            ]
 590        )
 591
 592        # Delete DigestEntry separately. Wildcard reads are not supported.
 593        digest_entries = [
 594            p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests
 595        ]
 596        if digest_entries:
 597            await self.delete(digest_entries, strict=False)
 598
 599    async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None:
 600        """Delete entities that match a wildcard read.
 601
 602        This method always skips over entries in const tables. It is an error
 603        to attempt to delete those.
 604        """
 605        assert self._p4client is not None
 606
 607        request = p4r.ReadRequest(
 608            device_id=self.device_id,
 609            entities=p4entity.encode_entities(entities, self.p4info),
 610        )
 611
 612        # Compute set of all const table ID's (may be empty).
 613        to_skip = {table.id for table in self.p4info.tables if table.is_const}
 614
 615        async for reply in self._p4client.request_iter(request):
 616            if reply.entities:
 617                if to_skip:
 618                    await self.delete(
 619                        reply
 620                        for reply in reply.entities
 621                        if reply.HasField("table_entry")
 622                        and reply.table_entry.table_id not in to_skip
 623                    )
 624                else:
 625                    await self.delete(reply.entities)
 626
 627    async def run(self) -> None:
 628        "Run the switch's lifecycle repeatedly."
 629        assert self._p4client is None
 630        assert self._tasks is None
 631
 632        self._tasks = SwitchTasks(self._options.fail_fast)
 633        self._p4client = P4Client(self._address, self._options.channel_credentials)
 634        self._switch_start()
 635
 636        try:
 637            while True:
 638                # If the switch fails and restarts too quickly, slow it down.
 639                async with _throttle_failure():
 640                    self.create_task(self._run(), background=True)
 641                    await self._tasks.wait()
 642                    self._arbitrator.reset()
 643
 644        finally:
 645            self._p4client = None
 646            self._tasks = None
 647            self._switch_stop()
 648
 649    def create_task(
 650        self,
 651        coro: Coroutine[Any, Any, _T],
 652        *,
 653        background: bool = False,
 654        name: str | None = None,
 655    ) -> asyncio.Task[_T]:
 656        "Create an asyncio task tied to the Switch's lifecycle."
 657        assert self._tasks is not None
 658
 659        return self._tasks.create_task(
 660            coro,
 661            switch=self,
 662            background=background,
 663            name=name,
 664        )
 665
 666    async def _run(self):
 667        "Main Switch task runs the stream."
 668        assert not self._is_channel_up
 669        assert self._p4client is not None
 670
 671        try:
 672            await self._p4client.open(
 673                schema=self.p4info,
 674                complete_request=self._arbitrator.complete_request,
 675            )
 676            await self._arbitrator.handshake(self)
 677            await self._fetch_capabilities()
 678            await self._start_gnmi()
 679            self._channel_up()
 680
 681            # Receive messages from the stream until it closes.
 682            await self._receive_until_closed()
 683
 684        finally:
 685            await self._stop_gnmi()
 686            await self._p4client.close()
 687            self._channel_down()
 688
 689    async def _receive_until_closed(self):
 690        "Receive messages from stream until EOF."
 691        assert self._p4client is not None
 692
 693        client = self._p4client
 694
 695        while True:
 696            try:
 697                msg = await client.receive()
 698            except P4ClientError as ex:
 699                if not ex.is_election_id_used:
 700                    raise
 701                # Handle "election ID in use" error.
 702                await self._arbitrator.handshake(self, conflict=True)
 703            else:
 704                await self._handle_stream_message(msg)
 705
 706    async def _handle_stream_message(self, msg: p4r.StreamMessageResponse):
 707        "Handle a P4Runtime StreamMessageResponse."
 708        match msg.WhichOneof("update"):
 709            case "packet":
 710                self._stream_packet_message(msg)
 711            case "digest":
 712                self._stream_digest_message(msg)
 713            case "idle_timeout_notification":
 714                self._stream_timeout_message(msg)
 715            case "arbitration":
 716                await self._arbitrator.update(self, msg.arbitration)
 717            case "error":
 718                self._stream_error_message(msg)
 719            case other:
 720                LOGGER.error("_handle_stream_message: unknown update %r", other)
 721
 722    async def __aenter__(self) -> Self:
 723        "Similar to run() but provides a one-time context manager interface."
 724        assert self._p4client is None
 725        assert self._tasks is None
 726
 727        self._tasks = SwitchTasks(self._options.fail_fast)
 728        self._p4client = P4Client(
 729            self._address,
 730            self._options.channel_credentials,
 731            wait_for_ready=False,
 732        )
 733        self._switch_start()
 734
 735        try:
 736            # Start the switch's `_run` task in the background. Then, wait for
 737            # `_run` task to fire the CHANNEL_READY event. If the `_run` task
 738            # cannot connect or fails in some other way, it will finish before
 739            # the `ready` future. We need to handle the error in this case.
 740
 741            run = self.create_task(self._run(), background=True)
 742            ready = self.ee.event_future(SwitchEvent.CHANNEL_READY)
 743            done, _ = await asyncio.wait(
 744                [run, ready], return_when=asyncio.FIRST_COMPLETED
 745            )
 746            if run in done:
 747                await run
 748
 749        except BaseException:
 750            await self.__aexit__(None, None, None)
 751            raise
 752
 753        return self
 754
 755    async def __aexit__(
 756        self,
 757        _exc_type: type[BaseException] | None,
 758        _exc_val: BaseException | None,
 759        _exc_tb: TracebackType | None,
 760    ) -> bool | None:
 761        "Similar to run() but provides a one-time context manager interface."
 762        assert self._p4client is not None
 763        assert self._tasks is not None
 764
 765        self._tasks.cancel_all()
 766        await self._tasks.wait()
 767        self._arbitrator.reset()
 768        self._p4client = None
 769        self._tasks = None
 770        self._switch_stop()
 771
 772    def _switch_start(self):
 773        "Called when switch starts its run() cycle."
 774        assert not self._is_channel_up
 775
 776        LOGGER.info(
 777            "Switch start (name=%r, address=%r, device_id=%r, role_name=%r, initial_election_id=%r)",
 778            self.name,
 779            self.address,
 780            self.device_id,
 781            self.role_name,
 782            self.options.initial_election_id,
 783        )
 784        self.ee.emit(SwitchEvent.SWITCH_START)
 785
 786    def _switch_stop(self):
 787        "Called when switch stops its run() cycle."
 788        assert not self._is_channel_up
 789
 790        LOGGER.info(
 791            "Switch stop (name=%r, address=%r, device_id=%r, role_name=%r)",
 792            self.name,
 793            self.address,
 794            self.device_id,
 795            self.role_name,
 796        )
 797        self.ee.emit(SwitchEvent.SWITCH_STOP)
 798
 799    def _channel_up(self):
 800        "Called when P4Runtime channel is UP."
 801        assert not self._is_channel_up
 802
 803        ports = " ".join(f"({port.id}){port.name}" for port in self.ports)
 804        LOGGER.info(
 805            "Channel up (is_primary=%r, role_name=%r, p4r=%s): %s",
 806            self.is_primary,
 807            self.role_name,
 808            self.api_version,
 809            ports,
 810        )
 811        self._is_channel_up = True
 812        self.create_task(self._ready())
 813
 814        self.ee.emit(SwitchEvent.CHANNEL_UP, self)
 815
 816    def _channel_down(self):
 817        "Called when P4Runtime channel is DOWN."
 818        if not self._is_channel_up:
 819            return  # do nothing!
 820
 821        LOGGER.info(
 822            "Channel down (is_primary=%r, role_name=%r)",
 823            self.is_primary,
 824            self.role_name,
 825        )
 826        self._is_channel_up = False
 827
 828        self.ee.emit(SwitchEvent.CHANNEL_DOWN, self)
 829
 830    def _become_primary(self):
 831        "Called when a P4Runtime backup channel becomes the primary."
 832        assert self._tasks is not None
 833
 834        LOGGER.info(
 835            "Become primary (is_primary=%r, role_name=%r)",
 836            self.is_primary,
 837            self.role_name,
 838        )
 839
 840        self._tasks.cancel_primary()
 841        self.create_task(self._ready())
 842
 843        self.ee.emit(SwitchEvent.BECOME_PRIMARY, self)
 844
 845    def _become_backup(self):
 846        "Called when a P4Runtime primary channel becomes a backup."
 847        assert self._tasks is not None
 848
 849        LOGGER.info(
 850            "Become backup (is_primary=%r, role_name=%r)",
 851            self.is_primary,
 852            self.role_name,
 853        )
 854
 855        self._tasks.cancel_primary()
 856        self.create_task(self._ready())
 857
 858        self.ee.emit(SwitchEvent.BECOME_BACKUP, self)
 859
 860    def _channel_ready(self):
 861        "Called when a P4Runtime channel is READY."
 862        LOGGER.info(
 863            "Channel ready (is_primary=%r, role_name=%r): %s",
 864            self.is_primary,
 865            self.role_name,
 866            self.p4info.get_pipeline_info(),
 867        )
 868
 869        if self._options.ready_handler:
 870            self.create_task(self._options.ready_handler(self))
 871
 872        self.ee.emit(SwitchEvent.CHANNEL_READY, self)
 873
 874    def _stream_packet_message(self, msg: p4r.StreamMessageResponse):
 875        "Called when a P4Runtime packet-in response is received."
 876        packet = p4entity.decode_stream(msg, self.p4info)
 877
 878        was_queued = False
 879        for pkt_filter, queue in self._packet_queues:
 880            if not queue.full() and pkt_filter(packet.payload):
 881                queue.put_nowait(packet)
 882                was_queued = True
 883
 884        if not was_queued:
 885            LOGGER.warning("packet ignored: %r", packet)
 886
 887    def _stream_digest_message(self, msg: p4r.StreamMessageResponse):
 888        "Called when a P4Runtime digest response is received."
 889        try:
 890            # Decode the digest list message.
 891            digest: p4entity.P4DigestList = p4entity.decode_stream(msg, self.p4info)
 892        except ValueError as ex:
 893            # It's possible to receive a digest for a different P4Info file, or
 894            # even before a P4Info is fetched from the switch.
 895            LOGGER.warning("digest decode failed: %s", ex)
 896        else:
 897            # Place the decoded digest list in a queue, if one is waiting.
 898            queue = self._digest_queues.get(digest.digest_id)
 899            if queue is not None and not queue.full():
 900                queue.put_nowait(digest)
 901            else:
 902                LOGGER.warning("digest ignored: %r", digest)
 903
 904    def _stream_timeout_message(self, msg: p4r.StreamMessageResponse):
 905        "Called when a P4Runtime timeout notification is received."
 906        timeout: p4entity.P4IdleTimeoutNotification = p4entity.decode_stream(
 907            msg, self.p4info
 908        )
 909        queue = self._timeout_queue
 910
 911        if queue is not None and not queue.full():
 912            queue.put_nowait(timeout)
 913        else:
 914            LOGGER.warning("timeout ignored: %r", timeout)
 915
 916    def _stream_error_message(self, msg: p4r.StreamMessageResponse):
 917        "Called when a P4Runtime stream error response is received."
 918        assert self._p4client is not None
 919
 920        # Log the message at the ERROR level.
 921        pbutil.log_msg(self._p4client.channel, msg, self.p4info, level=logging.ERROR)
 922
 923        self.ee.emit(SwitchEvent.STREAM_ERROR, self, msg)
 924
 925    async def _ready(self):
 926        "Prepare the pipeline."
 927        if self.p4info.is_authoritative and self.is_primary:
 928            await self._set_pipeline()
 929        else:
 930            await self._get_pipeline()
 931
 932        self._channel_ready()
 933
 934    async def _get_pipeline(self):
 935        "Get the switch's P4Info."
 936        has_pipeline = False
 937
 938        try:
 939            reply = await self._get_pipeline_config_request(
 940                response_type=P4ConfigResponseType.P4INFO_AND_COOKIE
 941            )
 942
 943            if reply.config.HasField("p4info"):
 944                has_pipeline = True
 945                p4info = reply.config.p4info
 946                if not self.p4info.exists:
 947                    # If we don't have P4Info yet, set it.
 948                    self.p4info.set_p4info(p4info)
 949                elif not self.p4info.has_p4info(p4info):
 950                    # If P4Info is not identical, log a warning message.
 951                    LOGGER.warning("Retrieved P4Info is different than expected!")
 952
 953        except P4ClientError as ex:
 954            if not ex.is_pipeline_missing:
 955                raise
 956
 957        if not has_pipeline and self.p4info.exists:
 958            LOGGER.warning("Forwarding pipeline is not configured")
 959
 960    async def _set_pipeline(self):
 961        """Set up the pipeline.
 962
 963        If `p4force` is false (the default), we first retrieve the cookie for
 964        the current pipeline and see if it matches the new pipeline's cookie.
 965        If the cookies match, we are done; there is no need to set the pipeline.
 966
 967        If `p4force` is true, we always load the new pipeline.
 968        """
 969        cookie = -1
 970        try:
 971            if not self.options.p4force:
 972                reply = await self._get_pipeline_config_request()
 973                if reply.config.HasField("cookie"):
 974                    cookie = reply.config.cookie.cookie
 975
 976        except P4ClientError as ex:
 977            if not ex.is_pipeline_missing:
 978                raise
 979
 980        if cookie != self.p4info.p4cookie:
 981            LOGGER.debug(
 982                "cookie %#x does not match expected %#x", cookie, self.p4info.p4cookie
 983            )
 984            await self._set_pipeline_config_request(
 985                config=self.p4info.get_pipeline_config()
 986            )
 987            LOGGER.info("Pipeline installed: %s", self.p4info.get_pipeline_info())
 988
 989    async def _get_pipeline_config_request(
 990        self,
 991        *,
 992        response_type: P4ConfigResponseType = P4ConfigResponseType.COOKIE_ONLY,
 993    ) -> p4r.GetForwardingPipelineConfigResponse:
 994        "Send a GetForwardingPipelineConfigRequest and await the response."
 995        assert self._p4client is not None
 996
 997        return await self._p4client.request(
 998            p4r.GetForwardingPipelineConfigRequest(
 999                device_id=self.device_id,
1000                response_type=response_type.vt(),
1001            )
1002        )
1003
1004    async def _set_pipeline_config_request(
1005        self,
1006        *,
1007        action: P4ConfigAction = P4ConfigAction.VERIFY_AND_COMMIT,
1008        config: p4r.ForwardingPipelineConfig,
1009    ) -> p4r.SetForwardingPipelineConfigResponse:
1010        "Send a SetForwardingPipelineConfigRequest and await the response."
1011        assert self._p4client is not None
1012
1013        return await self._p4client.request(
1014            p4r.SetForwardingPipelineConfigRequest(
1015                device_id=self.device_id,
1016                action=action.vt(),
1017                config=config,
1018            )
1019        )
1020
1021    async def _write_request(
1022        self,
1023        updates: list[p4r.Update],
1024        strict: bool,
1025        warn_only: bool,
1026    ):
1027        "Send a P4Runtime WriteRequest."
1028        assert self._p4client is not None
1029
1030        try:
1031            await self._p4client.request(
1032                p4r.WriteRequest(
1033                    device_id=self.device_id,
1034                    updates=updates,
1035                )
1036            )
1037        except P4ClientError as ex:
1038            if strict or not ex.is_not_found_only:
1039                if warn_only:
1040                    LOGGER.warning(
1041                        "WriteRequest with `warn_only=True` failed",
1042                        exc_info=True,
1043                    )
1044                else:
1045                    raise
1046
1047            assert (not strict and ex.is_not_found_only) or warn_only
1048
1049    async def _fetch_capabilities(self):
1050        "Check the P4Runtime protocol version supported by the other end."
1051        assert self._p4client is not None
1052
1053        try:
1054            reply = await self._p4client.request(p4r.CapabilitiesRequest())
1055            self._api_version = ApiVersion.parse(reply.p4runtime_api_version)
1056
1057        except P4ClientError as ex:
1058            if ex.code != GRPCStatusCode.UNIMPLEMENTED:
1059                raise
1060            LOGGER.warning("CapabilitiesRequest is not implemented")
1061
1062    async def _start_gnmi(self):
1063        "Start the associated gNMI client."
1064        assert self._gnmi_client is None
1065        assert self._p4client is not None
1066
1067        self._gnmi_client = GNMIClient(self._address, self._options.channel_credentials)
1068        await self._gnmi_client.open(channel=self._p4client.channel)
1069
1070        try:
1071            await self._ports.subscribe(self._gnmi_client)
1072            if self._ports:
1073                self.create_task(self._ports.listen(), background=True, name="_ports")
1074
1075        except GNMIClientError as ex:
1076            if ex.code != GRPCStatusCode.UNIMPLEMENTED:
1077                raise
1078            LOGGER.warning("gNMI is not implemented")
1079            await self._gnmi_client.close()
1080            self._gnmi_client = None
1081
1082    async def _stop_gnmi(self):
1083        "Stop the associated gNMI client."
1084        if self._gnmi_client is not None:
1085            self._ports.close()
1086            await self._gnmi_client.close()
1087            self._gnmi_client = None
1088
1089    def __repr__(self) -> str:
1090        "Return string representation of switch."
1091        return f"Switch(name={self._name!r}, address={self._address!r})"

Represents a P4Runtime Switch.

A Switch is constructed with a name, address and an optional SwitchOptions configuration.

The name is up to the user but should uniquely identify the switch.

The address identifies the target endpoint of the GRPC channel. It should have the format <ADDRESS>:<PORT> where <ADDRESS> can be a domain name, IPv4 address, or IPv6 address in square brackets, and <PORT> is the TCP port number.

The options is a SwitchOptions object that specifies how the Switch will behave.

opts = SwitchOptions(p4info=..., p4blob=...)
sw1 = Switch('sw1', '10.0.0.1:50000', opts)

The stash is an optional dictionary for storing arbitrary per-switch information. The dictionary keys are strings. You can use this to store anything that you need to manage the switch.

Each switch object has an event emitter ee. Use the EventEmitter to listen for port change events like PORT_UP and PORT_DOWN. See the SwitchEvent class for a list of other switch events.

Switch( name: str, address: str, options: SwitchOptions | None = None, *, stash: dict[str, typing.Any] | None = None)
203    def __init__(
204        self,
205        name: str,
206        address: str,
207        options: SwitchOptions | None = None,
208        *,
209        stash: dict[str, Any] | None = None,
210    ) -> None:
211        """Initialize switch with name, address and options.
212
213        Args:
214            name: A human-readable name to uniquely identify the switch.
215            address: The target address of the P4Runtime GRPC channel.
216              Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or
217              IP address, and `<PORT>` is the TCP port number.
218            options: Configuration options for the switch.
219            stash: Optional user-controlled dictionary.
220              Used to store information that user code needs to access or share.
221        """
222        if options is None:
223            options = SwitchOptions()
224
225        self._name = name
226        self._address = address
227        self._options = options
228        self._stash = stash or {}
229        self._ee = SwitchEmitter(self)
230        self._p4client = None
231        self._p4schema = P4Schema(options.p4info, options.p4blob)
232        self._tasks = None
233        self._packet_queues = []
234        self._digest_queues = {}
235        self._timeout_queue = None
236        self._arbitrator = Arbitrator(
237            options.initial_election_id, options.role_name, options.role_config
238        )
239        self._gnmi_client = None
240        self._ports = SwitchPortList()

Initialize switch with name, address and options.

Arguments:
  • name: A human-readable name to uniquely identify the switch.
  • address: The target address of the P4Runtime GRPC channel. Format is <ADDRESS>:<PORT> where <ADDRESS> is a DNS name or IP address, and <PORT> is the TCP port number.
  • options: Configuration options for the switch.
  • stash: Optional user-controlled dictionary. Used to store information that user code needs to access or share.
name: str
242    @property
243    def name(self) -> str:
244        "Name of the switch."
245        return self._name

Name of the switch.

address: str
247    @property
248    def address(self) -> str:
249        """Address of the switch formatted as a GRPC target.
250
251        Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP
252        address, and `<PORT>` is the TCP port number.
253        """
254        return self._address

Address of the switch formatted as a GRPC target.

Format is <ADDRESS>:<PORT> where <ADDRESS> is a DNS name or IP address, and <PORT> is the TCP port number.

options: SwitchOptions
256    @property
257    def options(self) -> SwitchOptions:
258        "Switch options."
259        return self._options

Switch options.

stash: dict[str, typing.Any]
273    @property
274    def stash(self) -> dict[str, Any]:
275        "Switch stash. Used to store per-switch data for any purpose."
276        return self._stash

Switch stash. Used to store per-switch data for any purpose.

ee: finsy.switch.SwitchEmitter
278    @property
279    def ee(self) -> "SwitchEmitter":
280        "Switch event emitter. See `SwitchEvent` for more details on events."
281        return self._ee

Switch event emitter. See SwitchEvent for more details on events.

device_id: int
283    @property
284    def device_id(self) -> int:
285        "Switch's device ID."
286        return self._options.device_id

Switch's device ID.

is_up: bool
288    @property
289    def is_up(self) -> bool:
290        "True if switch is UP."
291        return self._is_channel_up

True if switch is UP.

is_primary: bool
293    @property
294    def is_primary(self) -> bool:
295        "True if switch is primary."
296        return self._arbitrator.is_primary

True if switch is primary.

primary_id: int
298    @property
299    def primary_id(self) -> int:
300        "Election ID of switch that is currently primary."
301        return self._arbitrator.primary_id

Election ID of switch that is currently primary.

election_id: int
303    @property
304    def election_id(self) -> int:
305        "Switch's current election ID."
306        return self._arbitrator.election_id

Switch's current election ID.

role_name: str
308    @property
309    def role_name(self) -> str:
310        "Switch's current role name."
311        return self._arbitrator.role_name

Switch's current role name.

p4info: P4Schema
313    @property
314    def p4info(self) -> P4Schema:
315        "Switch's P4 schema."
316        return self._p4schema

Switch's P4 schema.

gnmi_client: GNMIClient | None
318    @property
319    def gnmi_client(self) -> GNMIClient | None:
320        "Switch's gNMI client."
321        return self._gnmi_client

Switch's gNMI client.

ports: SwitchPortList
323    @property
324    def ports(self) -> SwitchPortList:
325        "Switch's list of interfaces."
326        return self._ports

Switch's list of interfaces.

api_version: finsy.switch.ApiVersion
328    @property
329    def api_version(self) -> ApiVersion:
330        "P4Runtime protocol version."
331        return self._api_version

P4Runtime protocol version.

async def read( self, entities: Union[Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], finsy.p4entity.P4Entity]) -> AsyncGenerator[finsy.p4entity.P4Entity, NoneType]:
357    async def read(
358        self,
359        entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity,
360    ) -> AsyncGenerator[p4entity.P4Entity, None]:
361        "Async iterator that reads entities from the switch."
362        assert self._p4client is not None
363
364        if not entities:
365            return
366
367        if isinstance(entities, p4entity.P4Entity):
368            entities = [entities]
369
370        request = p4r.ReadRequest(
371            device_id=self.device_id,
372            entities=p4entity.encode_entities(entities, self.p4info),
373        )
374
375        async for reply in self._p4client.request_iter(request):
376            for ent in reply.entities:
377                yield p4entity.decode_entity(ent, self.p4info)

Async iterator that reads entities from the switch.

async def read_packets( self, *, queue_size: int = 50, eth_types: Optional[Iterable[int]] = None) -> AsyncIterator[P4PacketIn]:
379    async def read_packets(
380        self,
381        *,
382        queue_size: int = _DEFAULT_QUEUE_SIZE,
383        eth_types: Iterable[int] | None = None,
384    ) -> AsyncIterator["p4entity.P4PacketIn"]:
385        "Async iterator for incoming packets (P4PacketIn)."
386        LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types)
387
388        if eth_types is None:
389
390            def _pkt_filter(_payload: bytes) -> bool:
391                return True
392
393        else:
394            _filter = {eth.to_bytes(2, "big") for eth in eth_types}
395
396            def _pkt_filter(_payload: bytes) -> bool:
397                return _payload[12:14] in _filter
398
399        queue = Queue[p4entity.P4PacketIn](queue_size)
400        queue_filter = (_pkt_filter, queue)
401        self._packet_queues.append(queue_filter)
402
403        try:
404            while True:
405                yield await queue.get()
406        finally:
407            LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types)
408            self._packet_queues.remove(queue_filter)

Async iterator for incoming packets (P4PacketIn).

async def read_digests( self, digest_id: str, *, queue_size: int = 50) -> AsyncIterator[P4DigestList]:
410    async def read_digests(
411        self,
412        digest_id: str,
413        *,
414        queue_size: int = _DEFAULT_QUEUE_SIZE,
415    ) -> AsyncIterator["p4entity.P4DigestList"]:
416        "Async iterator for incoming digest lists (P4DigestList)."
417        LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id)
418
419        if digest_id in self._digest_queues:
420            raise ValueError(f"queue for digest_id {digest_id!r} already open")
421
422        queue = Queue[p4entity.P4DigestList](queue_size)
423        self._digest_queues[digest_id] = queue
424        try:
425            while True:
426                yield await queue.get()
427        finally:
428            LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id)
429            del self._digest_queues[digest_id]

Async iterator for incoming digest lists (P4DigestList).

async def read_idle_timeouts( self, *, queue_size: int = 50) -> AsyncIterator[finsy.p4entity.P4IdleTimeoutNotification]:
431    async def read_idle_timeouts(
432        self,
433        *,
434        queue_size: int = _DEFAULT_QUEUE_SIZE,
435    ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]:
436        "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)."
437        LOGGER.debug("read_idle_timeouts: opening queue")
438
439        if self._timeout_queue is not None:
440            raise ValueError("timeout queue already open")
441
442        queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size)
443        self._timeout_queue = queue
444        try:
445            while True:
446                yield await queue.get()
447        finally:
448            LOGGER.debug("read_idle_timeouts: closing queue")
449            self._timeout_queue = None

Async iterator for incoming idle timeouts (P4IdleTimeoutNotification).

async def write( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Update, p4.v1.p4runtime_pb2.StreamMessageRequest, finsy.p4entity._SupportsEncodeUpdate, Iterable[ForwardRef('P4UpdateList')]]], *, strict: bool = True, warn_only: bool = False) -> None:
451    async def write(
452        self,
453        entities: Iterable[p4entity.P4UpdateList],
454        *,
455        strict: bool = True,
456        warn_only: bool = False,
457    ) -> None:
458        """Write updates and stream messages to the switch.
459
460        If `strict` is False, MODIFY and DELETE operations will NOT raise an
461        error if the entity does not exist (NOT_FOUND).
462
463        If `warn_only` is True, no operations will raise an error. Instead,
464        the exception will be logged as a WARNING and the method will return
465        normally.
466        """
467        assert self._p4client is not None
468
469        if not entities:
470            return
471
472        msgs = p4entity.encode_updates(entities, self.p4info)
473
474        updates: list[p4r.Update] = []
475        for msg in msgs:
476            if isinstance(msg, p4r.StreamMessageRequest):
477                # StreamMessageRequests are transmitted immediately.
478                # TODO: Understand what happens with backpressure?
479                await self._p4client.send(msg)
480            else:
481                updates.append(msg)
482
483        if updates:
484            await self._write_request(updates, strict, warn_only)

Write updates and stream messages to the switch.

If strict is False, MODIFY and DELETE operations will NOT raise an error if the entity does not exist (NOT_FOUND).

If warn_only is True, no operations will raise an error. Instead, the exception will be logged as a WARNING and the method will return normally.

async def insert( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], *, warn_only: bool = False) -> None:
486    async def insert(
487        self,
488        entities: Iterable[p4entity.P4EntityList],
489        *,
490        warn_only: bool = False,
491    ) -> None:
492        """Insert the specified entities.
493
494        If `warn_only` is True, errors will be logged as warnings instead of
495        raising an exception.
496        """
497        if entities:
498            await self._write_request(
499                [
500                    p4r.Update(type=p4r.Update.INSERT, entity=ent)
501                    for ent in p4entity.encode_entities(entities, self.p4info)
502                ],
503                True,
504                warn_only,
505            )

Insert the specified entities.

If warn_only is True, errors will be logged as warnings instead of raising an exception.

async def modify( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], *, strict: bool = True, warn_only: bool = False) -> None:
507    async def modify(
508        self,
509        entities: Iterable[p4entity.P4EntityList],
510        *,
511        strict: bool = True,
512        warn_only: bool = False,
513    ) -> None:
514        """Modify the specified entities.
515
516        If `strict` is False, NOT_FOUND errors will be ignored.
517
518        If `warn_only` is True, errors will be logged as warnings instead of
519        raising an exception.
520        """
521        if entities:
522            await self._write_request(
523                [
524                    p4r.Update(type=p4r.Update.MODIFY, entity=ent)
525                    for ent in p4entity.encode_entities(entities, self.p4info)
526                ],
527                strict,
528                warn_only,
529            )

Modify the specified entities.

If strict is False, NOT_FOUND errors will be ignored.

If warn_only is True, errors will be logged as warnings instead of raising an exception.

async def delete( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]], *, strict: bool = True, warn_only: bool = False) -> None:
531    async def delete(
532        self,
533        entities: Iterable[p4entity.P4EntityList],
534        *,
535        strict: bool = True,
536        warn_only: bool = False,
537    ) -> None:
538        """Delete the specified entities.
539
540        If `strict` is False, NOT_FOUND errors will be ignored.
541
542        If `warn_only` is True, errors will be logged as warnings instead of
543        raising an exception.
544        """
545        if entities:
546            await self._write_request(
547                [
548                    p4r.Update(type=p4r.Update.DELETE, entity=ent)
549                    for ent in p4entity.encode_entities(entities, self.p4info)
550                ],
551                strict,
552                warn_only,
553            )

Delete the specified entities.

If strict is False, NOT_FOUND errors will be ignored.

If warn_only is True, errors will be logged as warnings instead of raising an exception.

async def delete_all(self) -> None:
555    async def delete_all(self) -> None:
556        """Delete all entities if no parameter is passed. Otherwise, delete
557        items that match `entities`.
558
559        This method does not attempt to delete entries in const tables.
560
561        TODO: This method does not affect indirect counters, meters or
562        value_sets.
563        """
564        await self.delete_many(
565            [
566                p4entity.P4TableEntry(),
567                p4entity.P4MulticastGroupEntry(),
568                p4entity.P4CloneSessionEntry(),
569            ]
570        )
571
572        # Reset all default table entries.
573        default_entries = [
574            p4entity.P4TableEntry(table.alias, is_default_action=True)
575            for table in self.p4info.tables
576            if table.const_default_action is None and table.action_profile is None
577        ]
578        if default_entries:
579            await self.modify(default_entries)
580
581        # Delete all P4ActionProfileGroup's and P4ActionProfileMember's.
582        # We do this after deleting the P4TableEntry's in case a client is using
583        # "one-shot" references; these are incompatible with separate
584        # action profiles.
585        await self.delete_many(
586            [
587                p4entity.P4ActionProfileGroup(),
588                p4entity.P4ActionProfileMember(),
589            ]
590        )
591
592        # Delete DigestEntry separately. Wildcard reads are not supported.
593        digest_entries = [
594            p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests
595        ]
596        if digest_entries:
597            await self.delete(digest_entries, strict=False)

Delete all entities if no parameter is passed. Otherwise, delete items that match entities.

This method does not attempt to delete entries in const tables.

TODO: This method does not affect indirect counters, meters or value_sets.

async def delete_many( self, entities: Iterable[Union[p4.v1.p4runtime_pb2.Entity, finsy.p4entity._SupportsEncodeEntity, Iterable[ForwardRef('P4EntityList')]]]) -> None:
599    async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None:
600        """Delete entities that match a wildcard read.
601
602        This method always skips over entries in const tables. It is an error
603        to attempt to delete those.
604        """
605        assert self._p4client is not None
606
607        request = p4r.ReadRequest(
608            device_id=self.device_id,
609            entities=p4entity.encode_entities(entities, self.p4info),
610        )
611
612        # Compute set of all const table ID's (may be empty).
613        to_skip = {table.id for table in self.p4info.tables if table.is_const}
614
615        async for reply in self._p4client.request_iter(request):
616            if reply.entities:
617                if to_skip:
618                    await self.delete(
619                        reply
620                        for reply in reply.entities
621                        if reply.HasField("table_entry")
622                        and reply.table_entry.table_id not in to_skip
623                    )
624                else:
625                    await self.delete(reply.entities)

Delete entities that match a wildcard read.

This method always skips over entries in const tables. It is an error to attempt to delete those.

async def run(self) -> None:
627    async def run(self) -> None:
628        "Run the switch's lifecycle repeatedly."
629        assert self._p4client is None
630        assert self._tasks is None
631
632        self._tasks = SwitchTasks(self._options.fail_fast)
633        self._p4client = P4Client(self._address, self._options.channel_credentials)
634        self._switch_start()
635
636        try:
637            while True:
638                # If the switch fails and restarts too quickly, slow it down.
639                async with _throttle_failure():
640                    self.create_task(self._run(), background=True)
641                    await self._tasks.wait()
642                    self._arbitrator.reset()
643
644        finally:
645            self._p4client = None
646            self._tasks = None
647            self._switch_stop()

Run the switch's lifecycle repeatedly.

def create_task( self, coro: Coroutine[Any, Any, ~_T], *, background: bool = False, name: str | None = None) -> _asyncio.Task[~_T]:
649    def create_task(
650        self,
651        coro: Coroutine[Any, Any, _T],
652        *,
653        background: bool = False,
654        name: str | None = None,
655    ) -> asyncio.Task[_T]:
656        "Create an asyncio task tied to the Switch's lifecycle."
657        assert self._tasks is not None
658
659        return self._tasks.create_task(
660            coro,
661            switch=self,
662            background=background,
663            name=name,
664        )

Create an asyncio task tied to the Switch's lifecycle.

async def __aenter__(self) -> typing_extensions.Self:
722    async def __aenter__(self) -> Self:
723        "Similar to run() but provides a one-time context manager interface."
724        assert self._p4client is None
725        assert self._tasks is None
726
727        self._tasks = SwitchTasks(self._options.fail_fast)
728        self._p4client = P4Client(
729            self._address,
730            self._options.channel_credentials,
731            wait_for_ready=False,
732        )
733        self._switch_start()
734
735        try:
736            # Start the switch's `_run` task in the background. Then, wait for
737            # `_run` task to fire the CHANNEL_READY event. If the `_run` task
738            # cannot connect or fails in some other way, it will finish before
739            # the `ready` future. We need to handle the error in this case.
740
741            run = self.create_task(self._run(), background=True)
742            ready = self.ee.event_future(SwitchEvent.CHANNEL_READY)
743            done, _ = await asyncio.wait(
744                [run, ready], return_when=asyncio.FIRST_COMPLETED
745            )
746            if run in done:
747                await run
748
749        except BaseException:
750            await self.__aexit__(None, None, None)
751            raise
752
753        return self

Similar to run() but provides a one-time context manager interface.

class SwitchEvent(builtins.str, enum.Enum):
1094class SwitchEvent(str, enum.Enum):
1095    "Events for Switch class."
1096
1097    CONTROLLER_ENTER = "controller_enter"  # (switch)
1098    CONTROLLER_LEAVE = "controller_leave"  # (switch)
1099    SWITCH_START = "switch_start"  # (switch)
1100    SWITCH_STOP = "switch_stop"  # (switch)
1101    CHANNEL_UP = "channel_up"  # (switch)
1102    CHANNEL_DOWN = "channel_down"  # (switch)
1103    CHANNEL_READY = "channel_ready"  # (switch)
1104    BECOME_PRIMARY = "become_primary"  # (switch)
1105    BECOME_BACKUP = "become_backup"  # (switch)
1106    PORT_UP = "port_up"  # (switch, port)
1107    PORT_DOWN = "port_down"  # (switch, port)
1108    STREAM_ERROR = "stream_error"  # (switch, p4r.StreamMessageResponse)

Events for Switch class.

CONTROLLER_ENTER = <SwitchEvent.CONTROLLER_ENTER: 'controller_enter'>
CONTROLLER_LEAVE = <SwitchEvent.CONTROLLER_LEAVE: 'controller_leave'>
SWITCH_START = <SwitchEvent.SWITCH_START: 'switch_start'>
SWITCH_STOP = <SwitchEvent.SWITCH_STOP: 'switch_stop'>
CHANNEL_UP = <SwitchEvent.CHANNEL_UP: 'channel_up'>
CHANNEL_DOWN = <SwitchEvent.CHANNEL_DOWN: 'channel_down'>
CHANNEL_READY = <SwitchEvent.CHANNEL_READY: 'channel_ready'>
BECOME_PRIMARY = <SwitchEvent.BECOME_PRIMARY: 'become_primary'>
BECOME_BACKUP = <SwitchEvent.BECOME_BACKUP: 'become_backup'>
PORT_UP = <SwitchEvent.PORT_UP: 'port_up'>
PORT_DOWN = <SwitchEvent.PORT_DOWN: 'port_down'>
STREAM_ERROR = <SwitchEvent.STREAM_ERROR: 'stream_error'>
@final
@dataclasses.dataclass(frozen=True)
class SwitchOptions:
 70@final
 71@dataclasses.dataclass(frozen=True)
 72class SwitchOptions:
 73    """Represents the configuration options for a `Switch`.
 74
 75    ```
 76    opts = SwitchOptions(
 77        p4info=Path("basic.p4info.txtpb"),
 78        p4blob=Path("basic.json"),
 79        ready_handler=on_ready,
 80    )
 81    ```
 82
 83    Each `SwitchOptions` object is immutable and may be shared by multiple
 84    switches. You should treat all values as read-only.
 85
 86    You can use function call syntax to return a copy of a `SwitchOptions` with
 87    one or more propertise altered.
 88
 89    ```
 90    new_opts = opts(device_id=6)
 91    ```
 92    """
 93
 94    p4info: Path | None = None
 95    "Path to P4Info protobuf text file."
 96
 97    p4blob: Path | SupportsBytes | None = None
 98    "Path to P4Blob file, or an object that can provide the bytes value."
 99
100    p4force: bool = False
101    "If true, always load the P4 program after initial handshake."
102
103    device_id: int = 1
104    "Default P4Runtime device ID."
105
106    initial_election_id: int = 10
107    "Initial P4Runtime election ID."
108
109    channel_credentials: GRPCCredentialsTLS | None = None
110    "P4Runtime channel credentials. Used for TLS support."
111
112    role_name: str = ""
113    "P4Runtime role configuration."
114
115    role_config: pbutil.PBMessage | None = None
116    "P4Runtime role configuration."
117
118    ready_handler: Callable[["Switch"], Coroutine[Any, Any, None]] | None = None
119    "Ready handler async function callback."
120
121    fail_fast: bool = False
122    """If true, log switch errors as CRITICAL and immediately abort when the
123    switch is running in a Controller."""
124
125    def __call__(self, **kwds: Any) -> Self:
126        return dataclasses.replace(self, **kwds)

Represents the configuration options for a Switch.

opts = SwitchOptions(
    p4info=Path("basic.p4info.txtpb"),
    p4blob=Path("basic.json"),
    ready_handler=on_ready,
)

Each SwitchOptions object is immutable and may be shared by multiple switches. You should treat all values as read-only.

You can use function call syntax to return a copy of a SwitchOptions with one or more propertise altered.

new_opts = opts(device_id=6)
SwitchOptions( p4info: pathlib.Path | None = None, p4blob: pathlib.Path | typing.SupportsBytes | None = None, p4force: bool = False, device_id: int = 1, initial_election_id: int = 10, channel_credentials: GRPCCredentialsTLS | None = None, role_name: str = '', role_config: google.protobuf.message.Message | None = None, ready_handler: Optional[Callable[[Switch], Coroutine[Any, Any, NoneType]]] = None, fail_fast: bool = False)
p4info: pathlib.Path | None = None

Path to P4Info protobuf text file.

p4blob: pathlib.Path | typing.SupportsBytes | None = None

Path to P4Blob file, or an object that can provide the bytes value.

p4force: bool = False

If true, always load the P4 program after initial handshake.

device_id: int = 1

Default P4Runtime device ID.

initial_election_id: int = 10

Initial P4Runtime election ID.

channel_credentials: GRPCCredentialsTLS | None = None

P4Runtime channel credentials. Used for TLS support.

role_name: str = ''

P4Runtime role configuration.

role_config: google.protobuf.message.Message | None = None

P4Runtime role configuration.

ready_handler: Optional[Callable[[Switch], Coroutine[Any, Any, NoneType]]] = None

Ready handler async function callback.

fail_fast: bool = False

If true, log switch errors as CRITICAL and immediately abort when the switch is running in a Controller.

def __call__(self, **kwds: Any) -> typing_extensions.Self:
125    def __call__(self, **kwds: Any) -> Self:
126        return dataclasses.replace(self, **kwds)

Call self as a function.

@dataclass
class SwitchPort:
43@dataclass
44class SwitchPort:
45    "Represents a switch port."
46
47    id: int
48    name: str
49    oper_status: OperStatus = OperStatus.UNKNOWN
50
51    @property
52    def up(self) -> bool:
53        "Return true if port is basically up."
54        return self.oper_status == OperStatus.UP

Represents a switch port.

SwitchPort( id: int, name: str, oper_status: finsy.ports.OperStatus = <OperStatus.UNKNOWN: 'UNKNOWN'>)
id: int
name: str
oper_status: finsy.ports.OperStatus = <OperStatus.UNKNOWN: 'UNKNOWN'>
up: bool
51    @property
52    def up(self) -> bool:
53        "Return true if port is basically up."
54        return self.oper_status == OperStatus.UP

Return true if port is basically up.

class SwitchPortList:
 57class SwitchPortList:
 58    "Represents a list of switch ports."
 59
 60    _ports: dict[str, SwitchPort]
 61    _subscription: GNMISubscription | None = None
 62
 63    def __init__(self):
 64        self._ports = {}
 65
 66    def __getitem__(self, key: str) -> SwitchPort:
 67        "Retrieve interface by ID."
 68        return self._ports[key]
 69
 70    def __len__(self) -> int:
 71        "Return number of switch ports."
 72        return len(self._ports)
 73
 74    def __iter__(self) -> Iterator[SwitchPort]:
 75        "Iterate over switch ports."
 76        return iter(self._ports.values())
 77
 78    async def subscribe(self, client: GNMIClient) -> None:
 79        """Obtain the initial list of ports and subscribe to switch port status
 80        updates using GNMI."""
 81        assert self._subscription is None
 82
 83        self._ports = await self._get_ports(client)
 84        if self._ports:
 85            self._subscription = await self._get_subscription(client)
 86        else:
 87            LOGGER.warning("No switch ports exist")
 88
 89    async def listen(self, switch: "_sw.Switch | None" = None) -> None:
 90        "Listen for switch port updates."
 91        assert self._subscription is not None
 92
 93        async for update in self._subscription.updates():
 94            self._update(update, switch)
 95
 96    def close(self) -> None:
 97        "Close the switch port subscription."
 98        if self._subscription is not None:
 99            self._subscription.cancel()
100            self._subscription = None
101            self._ports = {}
102
103    async def _get_ports(self, client: GNMIClient) -> dict[str, SwitchPort]:
104        "Retrieve ID and name of each port."
105        ports: dict[str, SwitchPort] = {}
106
107        result = await client.get(_ifIndex)
108        for update in result:
109            path = update.path
110            assert path.last == _ifIndex.last
111
112            port = SwitchPort(update.value, path["name"])
113            ports[port.name] = port
114
115        return ports
116
117    async def _get_subscription(self, client: GNMIClient) -> GNMISubscription:
118        sub = client.subscribe()
119
120        # Subscribe to change notifications.
121        for port in self._ports.values():
122            sub.on_change(_ifOperStatus.set(name=port.name))
123
124        # Synchronize initial settings for ports.
125        async for update in sub.synchronize():
126            self._update(update, None)
127
128        return sub
129
130    def _update(self, update: GNMIUpdate, switch: "_sw.Switch | None"):
131        path = update.path
132        if path.last == _ifOperStatus.last:
133            status = OperStatus(update.value)
134            self._update_port(path["name"], status, switch)
135        else:
136            LOGGER.warning(f"PortList: unknown gNMI path: {path}")
137
138    def _update_port(self, name: str, status: OperStatus, switch: "_sw.Switch | None"):
139        port = self._ports[name]
140
141        prev_up = port.up
142        port.oper_status = status
143        curr_up = port.up
144
145        if switch is not None and curr_up != prev_up:
146            if curr_up:
147                switch.ee.emit(_sw.SwitchEvent.PORT_UP, switch, port)
148            else:
149                switch.ee.emit(_sw.SwitchEvent.PORT_DOWN, switch, port)

Represents a list of switch ports.

def __getitem__(self, key: str) -> SwitchPort:
66    def __getitem__(self, key: str) -> SwitchPort:
67        "Retrieve interface by ID."
68        return self._ports[key]

Retrieve interface by ID.

def __len__(self) -> int:
70    def __len__(self) -> int:
71        "Return number of switch ports."
72        return len(self._ports)

Return number of switch ports.

def __iter__(self) -> Iterator[SwitchPort]:
74    def __iter__(self) -> Iterator[SwitchPort]:
75        "Iterate over switch ports."
76        return iter(self._ports.values())

Iterate over switch ports.

async def subscribe(self, client: GNMIClient) -> None:
78    async def subscribe(self, client: GNMIClient) -> None:
79        """Obtain the initial list of ports and subscribe to switch port status
80        updates using GNMI."""
81        assert self._subscription is None
82
83        self._ports = await self._get_ports(client)
84        if self._ports:
85            self._subscription = await self._get_subscription(client)
86        else:
87            LOGGER.warning("No switch ports exist")

Obtain the initial list of ports and subscribe to switch port status updates using GNMI.

async def listen(self, switch: Switch | None = None) -> None:
89    async def listen(self, switch: "_sw.Switch | None" = None) -> None:
90        "Listen for switch port updates."
91        assert self._subscription is not None
92
93        async for update in self._subscription.updates():
94            self._update(update, switch)

Listen for switch port updates.

def close(self) -> None:
 96    def close(self) -> None:
 97        "Close the switch port subscription."
 98        if self._subscription is not None:
 99            self._subscription.cancel()
100            self._subscription = None
101            self._ports = {}

Close the switch port subscription.

class GNMIClient:
119class GNMIClient:
120    """Async GNMI client.
121
122    This client implements `get`, `set`, `subscribe` and `capabilities`.
123
124    The API depends on the protobuf definition of `gnmi.TypedValue`.
125
126    Get usage:
127    ```
128    client = GNMIClient('127.0.0.1:9339')
129    await client.open()
130
131    path = GNMIPath("interfaces/interface")
132    async for update in client.get(path):
133        print(update)
134    ```
135
136    Subscribe usage:
137    ```
138    path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
139    sub = client.subscribe()
140    sub.on_change(path)
141
142    async for initial_state in sub.synchronize():
143        print(initial_state)
144
145    async for update in sub.updates():
146        print(update)
147    ```
148
149    Set usage:
150    ```
151    enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")
152
153    await client.set(update={
154        enabled: gnmi.TypedValue(boolValue=True),
155    })
156    ```
157    """
158
159    _address: str
160    _credentials: GRPCCredentialsTLS | None
161    _channel: grpc.aio.Channel | None = None
162    _stub: gnmi_grpc.gNMIStub | None = None
163    _channel_reused: bool = False
164
165    def __init__(
166        self,
167        address: str,
168        credentials: GRPCCredentialsTLS | None = None,
169    ):
170        self._address = address
171        self._credentials = credentials
172
173    async def __aenter__(self) -> Self:
174        await self.open()
175        return self
176
177    async def __aexit__(self, *_args: Any) -> bool | None:
178        await self.close()
179
180    async def open(
181        self,
182        *,
183        channel: grpc.aio.Channel | None = None,
184    ) -> None:
185        """Open the client channel.
186
187        Note: This method is `async` for forward-compatible reasons.
188        """
189        if self._channel is not None:
190            raise RuntimeError("GNMIClient: client is already open")
191
192        assert self._stub is None
193
194        if channel is not None:
195            self._channel = channel
196            self._channel_reused = True
197        else:
198            self._channel = grpc_channel(
199                self._address,
200                credentials=self._credentials,
201                client_type="GNMIClient",
202            )
203
204        self._stub = gnmi_grpc.gNMIStub(self._channel)
205
206    async def close(self) -> None:
207        "Close the client channel."
208        if self._channel is not None:
209            if not self._channel_reused:
210                LOGGER.debug("GNMIClient: close channel %r", self._address)
211                await self._channel.close()
212
213            self._channel = None
214            self._stub = None
215            self._channel_reused = False
216
217    async def get(
218        self,
219        *path: GNMIPath,
220        prefix: GNMIPath | None = None,
221        config: bool = False,
222    ) -> Sequence[GNMIUpdate]:
223        "Retrieve value(s) using a GetRequest."
224        if self._stub is None:
225            raise RuntimeError("GNMIClient: client is not open")
226
227        request = gnmi.GetRequest(
228            path=(i.path for i in path),
229            encoding=gnmi.Encoding.PROTO,
230        )
231
232        if prefix is not None:
233            request.prefix.CopyFrom(prefix.path)
234
235        if config:
236            request.type = gnmi.GetRequest.CONFIG
237
238        self._log_msg(request)
239        try:
240            reply = cast(
241                gnmi.GetResponse,
242                await self._stub.Get(request),
243            )
244        except grpc.RpcError as ex:
245            raise GNMIClientError(ex) from None
246
247        self._log_msg(reply)
248
249        result: list[GNMIUpdate] = []
250        for notification in reply.notification:
251            for update in _read_updates(notification):
252                result.append(update)
253
254        return result
255
256    def subscribe(
257        self,
258        *,
259        prefix: GNMIPath | None = None,
260    ) -> "GNMISubscription":
261        """Subscribe to gNMI change notifications.
262
263        Usage:
264        ```
265        sub = client.subscribe()
266        sub.on_change(path1, ...)
267        sub.sample(path3, path4, sample_interval=1000000000)
268
269        async for update in sub.synchronize():
270            # do something with initial state
271
272        async for update in sub.updates():
273            # do something with updates
274        ```
275
276        You can also subscribe in "ONCE" mode:
277        ```
278        sub = client.subscribe()
279        sub.once(path1, path2, ...)
280
281        async for info in sub.synchronize():
282            # do something with info
283        ```
284
285        The subscription object is not re-entrant, but a fully consumed
286        subscription may be reused.
287        """
288        if self._stub is None:
289            raise RuntimeError("GNMIClient: client is not open")
290
291        return GNMISubscription(self, prefix)
292
293    async def capabilities(self) -> gnmi.CapabilityResponse:
294        "Issue a CapabilitiesRequest."
295        if self._stub is None:
296            raise RuntimeError("GNMIClient: client is not open")
297
298        request = gnmi.CapabilityRequest()
299
300        self._log_msg(request)
301        try:
302            reply = cast(
303                gnmi.CapabilityResponse,
304                await self._stub.Capabilities(request),
305            )
306        except grpc.RpcError as ex:
307            raise GNMIClientError(ex) from None
308
309        self._log_msg(reply)
310        return reply
311
312    async def set(
313        self,
314        *,
315        update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
316        replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
317        delete: Sequence[GNMIPath] | None = None,
318        prefix: GNMIPath | None = None,
319    ) -> int:
320        """Set value(s) using SetRequest.
321
322        Returns the timestamp from the successful `SetResponse`.
323        """
324        if self._stub is None:
325            raise RuntimeError("GNMIClient: client is not open")
326
327        if update is not None:
328            updates = [gnmi_update(path, value) for path, value in update]
329        else:
330            updates = None
331
332        if replace is not None:
333            replaces = [gnmi_update(path, value) for path, value in replace]
334        else:
335            replaces = None
336
337        if delete is not None:
338            deletes = [path.path for path in delete]
339        else:
340            deletes = None
341
342        request = gnmi.SetRequest(
343            update=updates,
344            replace=replaces,
345            delete=deletes,
346        )
347
348        if prefix is not None:
349            request.prefix.CopyFrom(prefix.path)
350
351        self._log_msg(request)
352        try:
353            reply = cast(
354                gnmi.SetResponse,
355                await self._stub.Set(request),
356            )
357        except grpc.RpcError as ex:
358            raise GNMIClientError(ex) from None
359
360        self._log_msg(reply)
361
362        # According to the comments in the current protobuf, I expect all error
363        # results to be raised as an exception. The only useful value in a
364        # successful response is the timestamp.
365
366        if reply.HasField("message"):
367            raise NotImplementedError("SetResponse error not supported")
368
369        for result in reply.response:
370            if result.HasField("message"):
371                raise NotImplementedError("SetResponse suberror not supported")
372
373        return reply.timestamp
374
375    def _log_msg(self, msg: "pbutil.PBMessage"):
376        "Log a gNMI message."
377        pbutil.log_msg(self._channel, msg, None)

Async GNMI client.

This client implements get, set, subscribe and capabilities.

The API depends on the protobuf definition of gnmi.TypedValue.

Get usage:

client = GNMIClient('127.0.0.1:9339')
await client.open()

path = GNMIPath("interfaces/interface")
async for update in client.get(path):
    print(update)

Subscribe usage:

path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
sub = client.subscribe()
sub.on_change(path)

async for initial_state in sub.synchronize():
    print(initial_state)

async for update in sub.updates():
    print(update)

Set usage:

enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")

await client.set(update={
    enabled: gnmi.TypedValue(boolValue=True),
})
GNMIClient( address: str, credentials: GRPCCredentialsTLS | None = None)
165    def __init__(
166        self,
167        address: str,
168        credentials: GRPCCredentialsTLS | None = None,
169    ):
170        self._address = address
171        self._credentials = credentials
async def __aenter__(self) -> typing_extensions.Self:
173    async def __aenter__(self) -> Self:
174        await self.open()
175        return self
async def open(self, *, channel: grpc.aio._base_channel.Channel | None = None) -> None:
180    async def open(
181        self,
182        *,
183        channel: grpc.aio.Channel | None = None,
184    ) -> None:
185        """Open the client channel.
186
187        Note: This method is `async` for forward-compatible reasons.
188        """
189        if self._channel is not None:
190            raise RuntimeError("GNMIClient: client is already open")
191
192        assert self._stub is None
193
194        if channel is not None:
195            self._channel = channel
196            self._channel_reused = True
197        else:
198            self._channel = grpc_channel(
199                self._address,
200                credentials=self._credentials,
201                client_type="GNMIClient",
202            )
203
204        self._stub = gnmi_grpc.gNMIStub(self._channel)

Open the client channel.

Note: This method is async for forward-compatible reasons.

async def close(self) -> None:
206    async def close(self) -> None:
207        "Close the client channel."
208        if self._channel is not None:
209            if not self._channel_reused:
210                LOGGER.debug("GNMIClient: close channel %r", self._address)
211                await self._channel.close()
212
213            self._channel = None
214            self._stub = None
215            self._channel_reused = False

Close the client channel.

async def get( self, *path: GNMIPath, prefix: GNMIPath | None = None, config: bool = False) -> Sequence[GNMIUpdate]:
217    async def get(
218        self,
219        *path: GNMIPath,
220        prefix: GNMIPath | None = None,
221        config: bool = False,
222    ) -> Sequence[GNMIUpdate]:
223        "Retrieve value(s) using a GetRequest."
224        if self._stub is None:
225            raise RuntimeError("GNMIClient: client is not open")
226
227        request = gnmi.GetRequest(
228            path=(i.path for i in path),
229            encoding=gnmi.Encoding.PROTO,
230        )
231
232        if prefix is not None:
233            request.prefix.CopyFrom(prefix.path)
234
235        if config:
236            request.type = gnmi.GetRequest.CONFIG
237
238        self._log_msg(request)
239        try:
240            reply = cast(
241                gnmi.GetResponse,
242                await self._stub.Get(request),
243            )
244        except grpc.RpcError as ex:
245            raise GNMIClientError(ex) from None
246
247        self._log_msg(reply)
248
249        result: list[GNMIUpdate] = []
250        for notification in reply.notification:
251            for update in _read_updates(notification):
252                result.append(update)
253
254        return result

Retrieve value(s) using a GetRequest.

def subscribe( self, *, prefix: GNMIPath | None = None) -> GNMISubscription:
256    def subscribe(
257        self,
258        *,
259        prefix: GNMIPath | None = None,
260    ) -> "GNMISubscription":
261        """Subscribe to gNMI change notifications.
262
263        Usage:
264        ```
265        sub = client.subscribe()
266        sub.on_change(path1, ...)
267        sub.sample(path3, path4, sample_interval=1000000000)
268
269        async for update in sub.synchronize():
270            # do something with initial state
271
272        async for update in sub.updates():
273            # do something with updates
274        ```
275
276        You can also subscribe in "ONCE" mode:
277        ```
278        sub = client.subscribe()
279        sub.once(path1, path2, ...)
280
281        async for info in sub.synchronize():
282            # do something with info
283        ```
284
285        The subscription object is not re-entrant, but a fully consumed
286        subscription may be reused.
287        """
288        if self._stub is None:
289            raise RuntimeError("GNMIClient: client is not open")
290
291        return GNMISubscription(self, prefix)

Subscribe to gNMI change notifications.

Usage:

sub = client.subscribe()
sub.on_change(path1, ...)
sub.sample(path3, path4, sample_interval=1000000000)

async for update in sub.synchronize():
    # do something with initial state

async for update in sub.updates():
    # do something with updates

You can also subscribe in "ONCE" mode:

sub = client.subscribe()
sub.once(path1, path2, ...)

async for info in sub.synchronize():
    # do something with info

The subscription object is not re-entrant, but a fully consumed subscription may be reused.

async def capabilities(self) -> gnmi1.gnmi_pb2.CapabilityResponse:
293    async def capabilities(self) -> gnmi.CapabilityResponse:
294        "Issue a CapabilitiesRequest."
295        if self._stub is None:
296            raise RuntimeError("GNMIClient: client is not open")
297
298        request = gnmi.CapabilityRequest()
299
300        self._log_msg(request)
301        try:
302            reply = cast(
303                gnmi.CapabilityResponse,
304                await self._stub.Capabilities(request),
305            )
306        except grpc.RpcError as ex:
307            raise GNMIClientError(ex) from None
308
309        self._log_msg(reply)
310        return reply

Issue a CapabilitiesRequest.

async def set( self, *, update: Optional[Sequence[tuple[GNMIPath, bool | int | float | str | bytes | gnmi1.gnmi_pb2.TypedValue]]] = None, replace: Optional[Sequence[tuple[GNMIPath, bool | int | float | str | bytes | gnmi1.gnmi_pb2.TypedValue]]] = None, delete: Optional[Sequence[GNMIPath]] = None, prefix: GNMIPath | None = None) -> int:
312    async def set(
313        self,
314        *,
315        update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
316        replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None,
317        delete: Sequence[GNMIPath] | None = None,
318        prefix: GNMIPath | None = None,
319    ) -> int:
320        """Set value(s) using SetRequest.
321
322        Returns the timestamp from the successful `SetResponse`.
323        """
324        if self._stub is None:
325            raise RuntimeError("GNMIClient: client is not open")
326
327        if update is not None:
328            updates = [gnmi_update(path, value) for path, value in update]
329        else:
330            updates = None
331
332        if replace is not None:
333            replaces = [gnmi_update(path, value) for path, value in replace]
334        else:
335            replaces = None
336
337        if delete is not None:
338            deletes = [path.path for path in delete]
339        else:
340            deletes = None
341
342        request = gnmi.SetRequest(
343            update=updates,
344            replace=replaces,
345            delete=deletes,
346        )
347
348        if prefix is not None:
349            request.prefix.CopyFrom(prefix.path)
350
351        self._log_msg(request)
352        try:
353            reply = cast(
354                gnmi.SetResponse,
355                await self._stub.Set(request),
356            )
357        except grpc.RpcError as ex:
358            raise GNMIClientError(ex) from None
359
360        self._log_msg(reply)
361
362        # According to the comments in the current protobuf, I expect all error
363        # results to be raised as an exception. The only useful value in a
364        # successful response is the timestamp.
365
366        if reply.HasField("message"):
367            raise NotImplementedError("SetResponse error not supported")
368
369        for result in reply.response:
370            if result.HasField("message"):
371                raise NotImplementedError("SetResponse suberror not supported")
372
373        return reply.timestamp

Set value(s) using SetRequest.

Returns the timestamp from the successful SetResponse.

class GNMIPath:
 25class GNMIPath:
 26    """Concrete class for working with `gnmi.Path` objects.
 27
 28    A `GNMIPath` should be treated as an immutable object. You can access
 29    the wrapped `gnmi.Path` protobuf class using the `.path` property.
 30    `GNMIPath` objects are hashable, but you must be careful that you do not
 31    mutate them via the underlying `gnmi.Path`.
 32
 33    You can construct a GNMIPath from a string:
 34    ```
 35    path = GNMIPath("interfaces/interface[name=eth1]/state")
 36    ```
 37
 38    You can construct a GNMIPath object from a `gnmi.Path` directly.
 39    ```
 40    path = GNMIPath(update.path)
 41    ```
 42
 43    You can create paths by using an existing path as a template, without
 44    modifying the original path. Use the `set` method:
 45    ```
 46    operStatus = GNMIPath("interfaces/interface/state/oper-status")
 47    path = operStatus.set("interface", name="eth1")
 48    ```
 49
 50    Use [] to access the name/key of path elements:
 51    ```
 52    path[1] == "interface"
 53    path["interface", "name"] == "eth1"
 54    path["name"] == "eth1"
 55    ```
 56    """
 57
 58    path: gnmi.Path
 59    "The underlying `gnmi.Path` protobuf representation."
 60
 61    def __init__(
 62        self,
 63        path: "gnmi.Path | str | GNMIPath" = "",
 64        *,
 65        origin: str = "",
 66        target: str = "",
 67    ):
 68        "Construct a `GNMIPath` from a string or `gnmi.Path`."
 69        if isinstance(path, str):
 70            path = gnmistring.parse(path)
 71        elif isinstance(path, GNMIPath):
 72            path = _copy_path(path.path)
 73
 74        assert isinstance(path, gnmi.Path)
 75
 76        if origin:
 77            path.origin = origin
 78
 79        if target:
 80            path.target = target
 81
 82        self.path = path
 83
 84    @property
 85    def first(self) -> str:
 86        "Return the first element of the path."
 87        return self.path.elem[0].name
 88
 89    @property
 90    def last(self) -> str:
 91        "Return the last element of the path."
 92        return self.path.elem[-1].name
 93
 94    @property
 95    def origin(self) -> str:
 96        "Return the path's origin."
 97        return self.path.origin
 98
 99    @property
100    def target(self) -> str:
101        "Return the path's target."
102        return self.path.target
103
104    def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath":
105        "Construct a new GNMIPath with keys set for the given elem."
106        if __elem is None:
107            return self._rekey(kwds)
108
109        if isinstance(__elem, str):
110            elem = _find_index(__elem, self.path)
111        else:
112            elem = __elem
113
114        result = self.copy()
115        keys = result.path.elem[elem].key
116        keys.clear()
117        keys.update({key: str(val) for key, val in kwds.items()})
118        return result
119
120    def copy(self) -> "GNMIPath":
121        "Return a copy of the path."
122        return GNMIPath(_copy_path(self.path))
123
124    @overload
125    def __getitem__(
126        self, key: int | str | tuple[int | str, str]
127    ) -> str: ...  # pragma: no cover
128
129    @overload
130    def __getitem__(self, key: slice) -> "GNMIPath": ...  # pragma: no cover
131
132    def __getitem__(
133        self,
134        key: int | str | tuple[int | str, str] | slice,
135    ) -> "str | GNMIPath":
136        "Return the specified element or key value."
137        match key:
138            case int(idx):
139                return self.path.elem[idx].name
140            case str(name):
141                return _retrieve_key(name, self.path)
142            case (int(idx), str(k)):
143                result = self.path.elem[idx].key.get(k)
144                if result is None:
145                    raise KeyError(k)
146                return result
147            case (str(name), str(k)):
148                result = self.path.elem[_find_index(name, self.path)].key.get(k)
149                if result is None:
150                    raise KeyError(k)
151                return result
152            case slice() as s:
153                return self._slice(s.start, s.stop, s.step)
154            case other:  # pyright: ignore[reportUnnecessaryComparison]
155                raise TypeError(f"invalid key type: {other!r}")
156
157    def __eq__(self, rhs: Any) -> bool:
158        "Return True if path's are equal."
159        if not isinstance(rhs, GNMIPath):
160            return False
161        return self.path == rhs.path  # pyright: ignore[reportUnknownVariableType]
162
163    def __hash__(self) -> int:
164        "Return hash of path."
165        return hash(tuple(_to_tuple(elem) for elem in self.path.elem))
166
167    def __contains__(self, name: str) -> bool:
168        "Return True if element name is in path."
169        for elem in self.path.elem:
170            if elem.name == name:
171                return True
172        return False
173
174    def __repr__(self) -> str:
175        "Return string representation of path."
176        path = gnmistring.to_str(self.path)
177        if self.target or self.origin:
178            return f"GNMIPath({path!r}, origin={self.origin!r}, target={self.target!r})"
179        return f"GNMIPath({path!r})"
180
181    def __str__(self) -> str:
182        "Return path as string."
183        return gnmistring.to_str(self.path)
184
185    def __len__(self) -> int:
186        "Return the length of the path."
187        return len(self.path.elem)
188
189    def __truediv__(self, rhs: "GNMIPath | str") -> "GNMIPath":
190        "Append values to end of path."
191        if not isinstance(rhs, GNMIPath):
192            rhs = GNMIPath(rhs)
193
194        result = gnmi.Path(
195            elem=itertools.chain(self.path.elem, rhs.path.elem),
196            origin=self.origin,
197            target=self.target,
198        )
199        return GNMIPath(result)
200
201    def __rtruediv__(self, lhs: str) -> "GNMIPath":
202        "Prepend values to the beginning of the path."
203        path = GNMIPath(lhs)
204
205        result = gnmi.Path(elem=itertools.chain(path.path.elem, self.path.elem))
206        return GNMIPath(result)
207
208    def _slice(
209        self, start: int | None, stop: int | None, step: int | None
210    ) -> "GNMIPath":
211        "Return specified slice of GNMIPath."
212        if start is None:
213            start = 0
214
215        if stop is None:
216            stop = len(self)
217        elif stop < 0:
218            stop = len(self) + stop
219
220        if step is None:
221            step = 1
222
223        path = gnmi.Path()
224        for i in range(start, stop, step):
225            elem = self.path.elem[i]
226            path.elem.append(gnmi.PathElem(name=elem.name, key=elem.key))
227
228        return GNMIPath(path)
229
230    def _rekey(self, keys: dict[str, Any]) -> "GNMIPath":
231        """Construct a new path with specified keys replaced."""
232        if not keys:
233            raise ValueError("empty keys")
234
235        result = self.copy()
236
237        found = False
238        for elem in result.path.elem:
239            for key, value in keys.items():
240                if key in elem.key:
241                    elem.key[key] = str(value)
242                    found = True
243
244        if not found:
245            raise ValueError(f"no keys found in path: {keys!r}")
246
247        return result

Concrete class for working with gnmi.Path objects.

A GNMIPath should be treated as an immutable object. You can access the wrapped gnmi.Path protobuf class using the .path property. GNMIPath objects are hashable, but you must be careful that you do not mutate them via the underlying gnmi.Path.

You can construct a GNMIPath from a string:

path = GNMIPath("interfaces/interface[name=eth1]/state")

You can construct a GNMIPath object from a gnmi.Path directly.

path = GNMIPath(update.path)

You can create paths by using an existing path as a template, without modifying the original path. Use the set method:

operStatus = GNMIPath("interfaces/interface/state/oper-status")
path = operStatus.set("interface", name="eth1")

Use [] to access the name/key of path elements:

path[1] == "interface"
path["interface", "name"] == "eth1"
path["name"] == "eth1"
GNMIPath( path: gnmi1.gnmi_pb2.Path | str | GNMIPath = '', *, origin: str = '', target: str = '')
61    def __init__(
62        self,
63        path: "gnmi.Path | str | GNMIPath" = "",
64        *,
65        origin: str = "",
66        target: str = "",
67    ):
68        "Construct a `GNMIPath` from a string or `gnmi.Path`."
69        if isinstance(path, str):
70            path = gnmistring.parse(path)
71        elif isinstance(path, GNMIPath):
72            path = _copy_path(path.path)
73
74        assert isinstance(path, gnmi.Path)
75
76        if origin:
77            path.origin = origin
78
79        if target:
80            path.target = target
81
82        self.path = path

Construct a GNMIPath from a string or gnmi.Path.

path: gnmi1.gnmi_pb2.Path

The underlying gnmi.Path protobuf representation.

first: str
84    @property
85    def first(self) -> str:
86        "Return the first element of the path."
87        return self.path.elem[0].name

Return the first element of the path.

last: str
89    @property
90    def last(self) -> str:
91        "Return the last element of the path."
92        return self.path.elem[-1].name

Return the last element of the path.

origin: str
94    @property
95    def origin(self) -> str:
96        "Return the path's origin."
97        return self.path.origin

Return the path's origin.

target: str
 99    @property
100    def target(self) -> str:
101        "Return the path's target."
102        return self.path.target

Return the path's target.

def set( self, _GNMIPath__elem: str | int | None = None, **kwds: Any) -> GNMIPath:
104    def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath":
105        "Construct a new GNMIPath with keys set for the given elem."
106        if __elem is None:
107            return self._rekey(kwds)
108
109        if isinstance(__elem, str):
110            elem = _find_index(__elem, self.path)
111        else:
112            elem = __elem
113
114        result = self.copy()
115        keys = result.path.elem[elem].key
116        keys.clear()
117        keys.update({key: str(val) for key, val in kwds.items()})
118        return result

Construct a new GNMIPath with keys set for the given elem.

def copy(self) -> GNMIPath:
120    def copy(self) -> "GNMIPath":
121        "Return a copy of the path."
122        return GNMIPath(_copy_path(self.path))

Return a copy of the path.

def __getitem__( self, key: int | str | tuple[int | str, str] | slice) -> str | GNMIPath:
132    def __getitem__(
133        self,
134        key: int | str | tuple[int | str, str] | slice,
135    ) -> "str | GNMIPath":
136        "Return the specified element or key value."
137        match key:
138            case int(idx):
139                return self.path.elem[idx].name
140            case str(name):
141                return _retrieve_key(name, self.path)
142            case (int(idx), str(k)):
143                result = self.path.elem[idx].key.get(k)
144                if result is None:
145                    raise KeyError(k)
146                return result
147            case (str(name), str(k)):
148                result = self.path.elem[_find_index(name, self.path)].key.get(k)
149                if result is None:
150                    raise KeyError(k)
151                return result
152            case slice() as s:
153                return self._slice(s.start, s.stop, s.step)
154            case other:  # pyright: ignore[reportUnnecessaryComparison]
155                raise TypeError(f"invalid key type: {other!r}")

Return the specified element or key value.

def __contains__(self, name: str) -> bool:
167    def __contains__(self, name: str) -> bool:
168        "Return True if element name is in path."
169        for elem in self.path.elem:
170            if elem.name == name:
171                return True
172        return False

Return True if element name is in path.

def __len__(self) -> int:
185    def __len__(self) -> int:
186        "Return the length of the path."
187        return len(self.path.elem)

Return the length of the path.

class GNMISubscription:
385class GNMISubscription:
386    """Represents a gNMI subscription stream.
387
388    Returned from `GNMIClient.subscribe()`.
389    """
390
391    _client: GNMIClient
392    _stream: _StreamTypeAlias | None
393
394    def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None):
395        "Initialize a GNMISubscription."
396        self._client = client
397        self._stream = None
398        self._sublist = gnmi.SubscriptionList(
399            mode=gnmi.SubscriptionList.Mode.STREAM,
400        )
401        if prefix is not None:
402            self._sublist.prefix.CopyFrom(prefix.path)
403
404    def once(self, *paths: GNMIPath) -> None:
405        "Subscribe in `ONCE` mode to given paths."
406        self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE
407
408        for path in paths:
409            sub = gnmi.Subscription(path=path.path)
410            self._sublist.subscription.append(sub)
411
412    def on_change(self, *paths: GNMIPath) -> None:
413        "Subscribe in `ON_CHANGE` mode to given paths."
414        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
415
416        for path in paths:
417            sub = gnmi.Subscription(
418                path=path.path,
419                mode=gnmi.SubscriptionMode.ON_CHANGE,
420            )
421            self._sublist.subscription.append(sub)
422
423    def sample(
424        self,
425        *paths: GNMIPath,
426        sample_interval: int,
427        suppress_redundant: bool = False,
428        heartbeat_interval: int = 0,
429    ) -> None:
430        "Subscribe in `SAMPLE` mode to given paths."
431        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
432
433        for path in paths:
434            sub = gnmi.Subscription(
435                path=path.path,
436                mode=gnmi.SubscriptionMode.SAMPLE,
437                sample_interval=sample_interval,
438                suppress_redundant=suppress_redundant,
439                heartbeat_interval=heartbeat_interval,
440            )
441            self._sublist.subscription.append(sub)
442
443    async def synchronize(self) -> AsyncIterator[GNMIUpdate]:
444        """Async iterator for initial subscription updates.
445
446        Retrieve all updates up to `sync_response` message.
447        """
448        if self._stream is None:
449            await self._subscribe()
450
451        try:
452            async for result in self._read(True):
453                yield result
454        except grpc.RpcError as ex:
455            raise GNMIClientError(ex) from None
456
457    async def updates(self) -> AsyncIterator[GNMIUpdate]:
458        "Async iterator for all remaining subscription updates."
459        if self._stream is None:
460            await self._subscribe()
461
462        try:
463            async for result in self._read(False):
464                yield result
465        except grpc.RpcError as ex:
466            raise GNMIClientError(ex) from None
467
468    def cancel(self) -> None:
469        "Cancel the subscription."
470        if self._stream is not None:
471            self._stream.cancel()
472            self._stream = None
473
474    async def _subscribe(self):
475        assert self._stream is None
476        assert self._client._stub is not None
477
478        self._stream = cast(
479            _StreamTypeAlias,
480            self._client._stub.Subscribe(wait_for_ready=True),  # type: ignore
481        )
482
483        request = gnmi.SubscribeRequest(subscribe=self._sublist)
484        self._client._log_msg(request)
485        try:
486            await self._stream.write(request)
487        except grpc.RpcError as ex:
488            raise GNMIClientError(ex) from None
489
490    async def _read(self, stop_at_sync: bool) -> AsyncIterator[GNMIUpdate]:
491        assert self._stream is not None
492
493        while True:
494            msg = cast(
495                gnmi.SubscribeResponse,
496                await self._stream.read(),  # type: ignore
497            )
498            if msg is GRPC_EOF:
499                LOGGER.warning("gNMI _read: unexpected EOF")
500                return
501
502            self._client._log_msg(msg)
503
504            match msg.WhichOneof("response"):
505                case "update":
506                    for update in _read_updates(msg.update):
507                        yield update
508                case "sync_response":
509                    if stop_at_sync:
510                        if self._is_once():
511                            # FIXME? I expected Stratum to issue an EOF after
512                            # the sync_response in "ONCE" mode, but it doesn't.
513                            # For now, cancel the stream explictly.
514                            self.cancel()
515                            # Let grpc react to the cancellation immediately.
516                            await asyncio.sleep(0)
517                        return  # All done!
518                    LOGGER.warning("gNMI _read: ignored sync_response")
519                case other:
520                    LOGGER.warning("gNMI _read: unexpected oneof: %s", other)
521
522    def _is_once(self) -> bool:
523        "Return true if the subscription is in ONCE mode."
524        return self._sublist.mode == gnmi.SubscriptionList.Mode.ONCE

Represents a gNMI subscription stream.

Returned from GNMIClient.subscribe().

GNMISubscription( client: GNMIClient, prefix: GNMIPath | None = None)
394    def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None):
395        "Initialize a GNMISubscription."
396        self._client = client
397        self._stream = None
398        self._sublist = gnmi.SubscriptionList(
399            mode=gnmi.SubscriptionList.Mode.STREAM,
400        )
401        if prefix is not None:
402            self._sublist.prefix.CopyFrom(prefix.path)

Initialize a GNMISubscription.

def once(self, *paths: GNMIPath) -> None:
404    def once(self, *paths: GNMIPath) -> None:
405        "Subscribe in `ONCE` mode to given paths."
406        self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE
407
408        for path in paths:
409            sub = gnmi.Subscription(path=path.path)
410            self._sublist.subscription.append(sub)

Subscribe in ONCE mode to given paths.

def on_change(self, *paths: GNMIPath) -> None:
412    def on_change(self, *paths: GNMIPath) -> None:
413        "Subscribe in `ON_CHANGE` mode to given paths."
414        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
415
416        for path in paths:
417            sub = gnmi.Subscription(
418                path=path.path,
419                mode=gnmi.SubscriptionMode.ON_CHANGE,
420            )
421            self._sublist.subscription.append(sub)

Subscribe in ON_CHANGE mode to given paths.

def sample( self, *paths: GNMIPath, sample_interval: int, suppress_redundant: bool = False, heartbeat_interval: int = 0) -> None:
423    def sample(
424        self,
425        *paths: GNMIPath,
426        sample_interval: int,
427        suppress_redundant: bool = False,
428        heartbeat_interval: int = 0,
429    ) -> None:
430        "Subscribe in `SAMPLE` mode to given paths."
431        assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM
432
433        for path in paths:
434            sub = gnmi.Subscription(
435                path=path.path,
436                mode=gnmi.SubscriptionMode.SAMPLE,
437                sample_interval=sample_interval,
438                suppress_redundant=suppress_redundant,
439                heartbeat_interval=heartbeat_interval,
440            )
441            self._sublist.subscription.append(sub)

Subscribe in SAMPLE mode to given paths.

async def synchronize(self) -> AsyncIterator[GNMIUpdate]:
443    async def synchronize(self) -> AsyncIterator[GNMIUpdate]:
444        """Async iterator for initial subscription updates.
445
446        Retrieve all updates up to `sync_response` message.
447        """
448        if self._stream is None:
449            await self._subscribe()
450
451        try:
452            async for result in self._read(True):
453                yield result
454        except grpc.RpcError as ex:
455            raise GNMIClientError(ex) from None

Async iterator for initial subscription updates.

Retrieve all updates up to sync_response message.

async def updates(self) -> AsyncIterator[GNMIUpdate]:
457    async def updates(self) -> AsyncIterator[GNMIUpdate]:
458        "Async iterator for all remaining subscription updates."
459        if self._stream is None:
460            await self._subscribe()
461
462        try:
463            async for result in self._read(False):
464                yield result
465        except grpc.RpcError as ex:
466            raise GNMIClientError(ex) from None

Async iterator for all remaining subscription updates.

def cancel(self) -> None:
468    def cancel(self) -> None:
469        "Cancel the subscription."
470        if self._stream is not None:
471            self._stream.cancel()
472            self._stream = None

Cancel the subscription.

@dataclass
class GNMIUpdate:
63@dataclass
64class GNMIUpdate:
65    "Represents a gNMI update returned from GNMIClient."
66
67    timestamp: int
68    path: GNMIPath
69    typed_value: gnmi.TypedValue | None
70
71    @property
72    def value(self) -> Any:
73        "Return the value as a Python value."
74        if self.typed_value is None:
75            return None
76
77        attr = self.typed_value.WhichOneof("value")
78        if attr is None:
79            raise ValueError("typed_value is not set")
80        return getattr(self.typed_value, attr)
81
82    def __repr__(self) -> str:
83        "Override repr to strip newline from end of `TypedValue`."
84        value = repr(self.typed_value).rstrip()
85        return f"GNMIUpdate(timestamp={self.timestamp!r}, path={self.path!r}, typed_value=`{value}`)"

Represents a gNMI update returned from GNMIClient.

GNMIUpdate( timestamp: int, path: GNMIPath, typed_value: gnmi1.gnmi_pb2.TypedValue | None)
timestamp: int
path: GNMIPath
typed_value: gnmi1.gnmi_pb2.TypedValue | None
value: Any
71    @property
72    def value(self) -> Any:
73        "Return the value as a Python value."
74        if self.typed_value is None:
75            return None
76
77        attr = self.typed_value.WhichOneof("value")
78        if attr is None:
79            raise ValueError("typed_value is not set")
80        return getattr(self.typed_value, attr)

Return the value as a Python value.

@dataclass(kw_only=True)
class GRPCCredentialsTLS:
117@dataclass(kw_only=True)
118class GRPCCredentialsTLS:
119    "GRPC channel credentials for Transport Layer Security (TLS)."
120
121    cacert: Path | bytes | None
122    """Certificate authority used to authenticate the certificate at the other
123    end of the connection."""
124
125    cert: Path | bytes | None
126    "Certificate that identifies this side of the connection."
127
128    private_key: Path | bytes | None
129    "Private key associated with this side's certificate identity."
130
131    target_name_override: str = ""
132    "Override the target name used for TLS host name checking (useful for testing)."
133
134    call_credentials: grpc.AuthMetadataPlugin | None = None
135    """Optional GRPC call credentials for the client channel. Be aware that the
136    auth plugin's callback takes place in a different thread."""
137
138    def to_client_credentials(self) -> grpc.ChannelCredentials:
139        "Create native SSL client credentials object."
140        root_certificates = _coerce_tls_path(self.cacert)
141        certificate_chain = _coerce_tls_path(self.cert)
142        private_key = _coerce_tls_path(self.private_key)
143
144        return self._compose_credentials(
145            grpc.ssl_channel_credentials(  # pyright: ignore[reportUnknownMemberType]
146                root_certificates=root_certificates,
147                private_key=private_key,
148                certificate_chain=certificate_chain,
149            )
150        )
151
152    def to_server_credentials(self) -> grpc.ServerCredentials:
153        """Create native SSL server credentials object.
154
155        On the server side, we ignore the `call_credentials`.
156        """
157        root_certificates = _coerce_tls_path(self.cacert)
158        certificate_chain = _coerce_tls_path(self.cert)
159        private_key = _coerce_tls_path(self.private_key)
160
161        return grpc.ssl_server_credentials(  # pyright: ignore[reportUnknownMemberType]
162            private_key_certificate_chain_pairs=[(private_key, certificate_chain)],
163            root_certificates=root_certificates,
164            require_client_auth=True,
165        )
166
167    def _compose_credentials(
168        self, channel_cred: grpc.ChannelCredentials
169    ) -> grpc.ChannelCredentials:
170        "Compose call credentials with channel credentials."
171        if not self.call_credentials:
172            return channel_cred
173
174        call_cred = (
175            grpc.metadata_call_credentials(  # pyright: ignore[reportUnknownMemberType]
176                self.call_credentials
177            )
178        )
179        return grpc.composite_channel_credentials(  # pyright: ignore[reportUnknownMemberType]
180            channel_cred,
181            call_cred,
182        )

GRPC channel credentials for Transport Layer Security (TLS).

GRPCCredentialsTLS( *, cacert: pathlib.Path | bytes | None, cert: pathlib.Path | bytes | None, private_key: pathlib.Path | bytes | None, target_name_override: str = '', call_credentials: grpc.AuthMetadataPlugin | None = None)
cacert: pathlib.Path | bytes | None

Certificate authority used to authenticate the certificate at the other end of the connection.

cert: pathlib.Path | bytes | None

Certificate that identifies this side of the connection.

private_key: pathlib.Path | bytes | None

Private key associated with this side's certificate identity.

target_name_override: str = ''

Override the target name used for TLS host name checking (useful for testing).

call_credentials: grpc.AuthMetadataPlugin | None = None

Optional GRPC call credentials for the client channel. Be aware that the auth plugin's callback takes place in a different thread.

def to_client_credentials(self) -> grpc.ChannelCredentials:
138    def to_client_credentials(self) -> grpc.ChannelCredentials:
139        "Create native SSL client credentials object."
140        root_certificates = _coerce_tls_path(self.cacert)
141        certificate_chain = _coerce_tls_path(self.cert)
142        private_key = _coerce_tls_path(self.private_key)
143
144        return self._compose_credentials(
145            grpc.ssl_channel_credentials(  # pyright: ignore[reportUnknownMemberType]
146                root_certificates=root_certificates,
147                private_key=private_key,
148                certificate_chain=certificate_chain,
149            )
150        )

Create native SSL client credentials object.

def to_server_credentials(self) -> grpc.ServerCredentials:
152    def to_server_credentials(self) -> grpc.ServerCredentials:
153        """Create native SSL server credentials object.
154
155        On the server side, we ignore the `call_credentials`.
156        """
157        root_certificates = _coerce_tls_path(self.cacert)
158        certificate_chain = _coerce_tls_path(self.cert)
159        private_key = _coerce_tls_path(self.private_key)
160
161        return grpc.ssl_server_credentials(  # pyright: ignore[reportUnknownMemberType]
162            private_key_certificate_chain_pairs=[(private_key, certificate_chain)],
163            root_certificates=root_certificates,
164            require_client_auth=True,
165        )

Create native SSL server credentials object.

On the server side, we ignore the call_credentials.

class GRPCStatusCode(finsy.grpcutil._EnumBase):
41class GRPCStatusCode(_EnumBase):
42    "IntEnum equivalent to `grpc.StatusCode`."
43
44    OK = rpc_code.OK
45    CANCELLED = rpc_code.CANCELLED
46    UNKNOWN = rpc_code.UNKNOWN
47    FAILED_PRECONDITION = rpc_code.FAILED_PRECONDITION
48    INVALID_ARGUMENT = rpc_code.INVALID_ARGUMENT
49    DEADLINE_EXCEEDED = rpc_code.DEADLINE_EXCEEDED
50    NOT_FOUND = rpc_code.NOT_FOUND
51    ALREADY_EXISTS = rpc_code.ALREADY_EXISTS
52    PERMISSION_DENIED = rpc_code.PERMISSION_DENIED
53    UNAUTHENTICATED = rpc_code.UNAUTHENTICATED
54    RESOURCE_EXHAUSTED = rpc_code.RESOURCE_EXHAUSTED
55    ABORTED = rpc_code.ABORTED
56    OUT_OF_RANGE = rpc_code.OUT_OF_RANGE
57    UNIMPLEMENTED = rpc_code.UNIMPLEMENTED
58    INTERNAL = rpc_code.INTERNAL
59    UNAVAILABLE = rpc_code.UNAVAILABLE
60    DATA_LOSS = rpc_code.DATA_LOSS
61
62    @classmethod
63    def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode":
64        "Create corresponding GRPCStatusCode from a grpc.StatusCode object."
65        n: Any = val.value[0]  # pyright: ignore[reportUnknownMemberType]
66        assert isinstance(n, int)
67        return GRPCStatusCode(n)
68
69    @staticmethod
70    def _validate_enum() -> None:
71        "Verify that GRPCStatusCode covers every possible grpc.StatusCode."
72        for value in grpc.StatusCode:
73            n: Any = value.value[0]  # pyright: ignore[reportUnknownMemberType]
74            assert isinstance(n, int)
75            assert GRPCStatusCode[value.name].value == n, value.name

IntEnum equivalent to grpc.StatusCode.

FAILED_PRECONDITION = GRPCStatusCode.FAILED_PRECONDITION
INVALID_ARGUMENT = GRPCStatusCode.INVALID_ARGUMENT
DEADLINE_EXCEEDED = GRPCStatusCode.DEADLINE_EXCEEDED
PERMISSION_DENIED = GRPCStatusCode.PERMISSION_DENIED
UNAUTHENTICATED = GRPCStatusCode.UNAUTHENTICATED
RESOURCE_EXHAUSTED = GRPCStatusCode.RESOURCE_EXHAUSTED
@classmethod
def from_status_code(cls, val: grpc.StatusCode) -> GRPCStatusCode:
62    @classmethod
63    def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode":
64        "Create corresponding GRPCStatusCode from a grpc.StatusCode object."
65        n: Any = val.value[0]  # pyright: ignore[reportUnknownMemberType]
66        assert isinstance(n, int)
67        return GRPCStatusCode(n)

Create corresponding GRPCStatusCode from a grpc.StatusCode object.