finsy
Finsy P4Runtime Controller Library
Finsy is a P4Runtime controller library written in Python using asyncio. Finsy includes support for gNMI.
Check out the examples directory for some demonstration programs.
Installation
Finsy requires Python 3.10 or later. To install the latest version, type pip install finsy.
P4Runtime Scripts
With Finsy, you can write a Python script that reads/writes P4Runtime entities for a single switch.
Here is a complete example that retrieves the P4Info from a switch:
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Print out a description of the switch's P4Info, if one is configured.
print(sw1.p4info)
fy.run(main())
Here is another example that prints out all non-default table entries.
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Do a wildcard read for table entries.
async for entry in sw1.read(fy.P4TableEntry()):
print(entry)
fy.run(main())
P4Runtime Controller
You can also write a P4Runtime controller that manages multiple switches independently. Your controller can react to events from the Switch by changing the contents of P4 tables.
Each switch is managed by an async ready_handler function. Your ready_handler function can read or
update various P4Runtime entities in the switch. It can also create tasks to listen for
packets or digests.
When you write P4Runtime updates to the switch, you use a unary operator (+, -, \~) to specify the operation: INSERT (+), DELETE (-) or MODIFY (\~).
import finsy as fy
async def ready_handler(sw: fy.Switch):
await sw.delete_all()
await sw.write(
[
# Insert (+) multicast group with ports 1, 2, 3 and CONTROLLER.
+fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3, 255]),
# Modify (~) default table entry to flood all unmatched packets.
~fy.P4TableEntry(
"ipv4",
action=fy.Action("flood"),
is_default_action=True,
),
]
)
async for packet in sw.read_packets():
print(f"{sw.name}: {packet}")
Use the SwitchOptions class to specify each switch's settings, including the p4info/p4blob and
ready_handler. Use the Controller class to drive multiple switch connections. Each switch will call back
into your ready_handler function after the P4Runtime connection is established.
from pathlib import Path
options = fy.SwitchOptions(
p4info=Path("hello.p4info.txt"),
p4blob=Path("hello.json"),
ready_handler=ready_handler,
)
controller = fy.Controller([
fy.Switch("sw1", "127.0.0.1:50001", options),
fy.Switch("sw2", "127.0.0.1:50002", options),
fy.Switch("sw3", "127.0.0.1:50003", options),
])
fy.run(controller.run())
Your ready_handler can spawn concurrent tasks with the Switch.create_task method. Tasks
created this way will have their lifetimes managed by the switch object.
If the switch disconnects or its role changes to backup, the task running your ready_handler
(and any tasks it spawned) will be cancelled and the ready_handler will begin again.
For more examples, see the examples directory.
Switch Read/Write API
The Switch class provides the API for interacting with P4Runtime switches. You will control
a Switch object with a "ready handler" function. The ready handler is an
async function that is called when the switch is ready to accept commands.
Your ready handler will typically write some control entities to the switch, then
listen for incoming events and react to them with more writes. You may occasionally read entities
from the switch.
When your ready handler is invoked, there is already a P4Runtime channel established, with client
arbitration completed, and pipeline configured as specified in SwitchOptions.
Here is an example skeleton program. The ready handler is named ready().
async def ready(switch: fy.Switch):
# Check if switch is the primary. If not, we may want to proceed
# in read-only mode. In this example, ignore switch if it's a backup.
if not switch.is_primary:
return
# If we're reconnecting to a switch, it will already have runtime state.
# In this example, we just delete all entities and start over.
await switch.delete_all()
# Provision the pipeline with one or more `write` transactions. Each
# `write` is a single WriteRequest which may contain multiple updates.
await switch.write(
# [Next section will cover what goes here.]
)
# Listen for events and respond to them. This "infinite" loop will
# continue until the Switch disconnects, changes primary/backup status,
# or the controller is stopped.
async for packet in switch.read_packets():
await handle_packet(switch, packet)
The Switch class provides a switch.create_task method to start a managed task.
Tasks allow you to perform concurrent operations on the same switch. We could have
written the last stanza above that reads packets in an infinite loop as a separate
task. It's okay for the ready handler function to return early; any tasks it
created will still run.
Writes
Use the write() method to write one or more P4Runtime updates and packets.
A P4Runtime update supports one of three operations: INSERT, MODIFY or DELETE. Some entities support all three operations. Other entities only support MODIFY.
| Entity | Operations Permitted | Related Classes |
|---|---|---|
P4TableEntry |
INSERT, MODIFY, DELETE | Match, Action, IndirectAction, P4MeterConfig, P4CounterData, P4MeterCounterData |
P4ActionProfileMember |
INSERT, MODIFY, DELETE | |
P4ActionProfileGroup |
INSERT, MODIFY, DELETE | P4Member |
P4MulticastGroupEntry |
INSERT, MODIFY, DELETE | |
P4CloneSessionEntry |
INSERT, MODIFY, DELETE | |
P4DigestEntry |
INSERT, MODIFY, DELETE | |
P4ExternEntry |
INSERT, MODIFY, DELETE | |
P4RegisterEntry |
MODIFY | |
P4CounterEntry |
MODIFY | P4CounterData |
P4DirectCounterEntry |
MODIFY | P4CounterData |
P4MeterEntry |
MODIFY | P4MeterConfig, P4MeterCounterData |
P4DirectMeterEntry |
MODIFY | |
P4ValueSetEntry |
MODIFY | P4ValueSetMember |
Insert/Modify/Delete Updates
To specify the operation, use a unary + (INSERT), ~ (MODIFY), or - (DELETE). If you
do not specify the operation, write will raise a ValueError exception.
Here is an example showing how to insert and delete two different entities in the same WriteRequest.
await switch.write([
+fy.P4TableEntry( # unary + means INSERT
"ipv4",
match=fy.Match(dest="192.168.1.0/24"),
action=fy.Action("forward", port=1),
),
-fy.P4TableEntry( # unary - means DELETE
"ipv4",
match=fy.Match(dest="192.168.2.0/24"),
action=fy.Action("forward", port=2),
),
])
You should not insert, modify or delete the same entry in the same WriteRequest.
If you are performing the same operation on all entities, you can use the Switch
insert, delete, or modify methods.
await switch.insert([
fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3]),
fy.P4MulticastGroupEntry(2, replicas=[4, 5, 6]),
])
Modify-Only Updates
For entities that only support the modify operation, you do not need to specify the operation. (You can
optionally use ~.)
await switch.write([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
You can also use the modify method:
await switch.modify([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
If you pass a modify-only entity to the insert or delete methods, the P4Runtime server will
return an error.
Sending Packets
Use the write method to send a packet.
await switch.write([fy.P4PacketOut(b"a payload.....", port=3)])
You can include other entities in the same call. Any non-update objects (e.g. P4PacketOut, P4DigestListAck) will be sent before the WriteRequest.
Listening for Packets
To receive packets, use the async iterator Switch.read_packets().
In this example, pkt is a P4PacketIn object.
read_packets can filter for a specific eth_type.
# Read packets filtering only for ARP (eth_type == 0x0806).
async for pkt in switch.read_packets(eth_types={0x0806}):
# You can access the packet payload `pkt.payload` or any metadata value,
# e.g. `pkt['ingress_port']`
print(pkt.payload)
print(pkt['ingress_port'])
Listening for Digests
To receive digests, use the async iterator Switch.read_digests. You must specify
the name of the digest from your P4 program.
async for digest in switch.read_digests("digest_t"):
# You can access the digest metadata e.g. `digest['ingress_port']`
# Your code may need to update table entries based on the digest data.
# To ack the digest, write `digest.ack()`.
await switch.write([entry, ...])
await switch.write([digest.ack()])
To acknowledge the digest entry, you can write digest.ack().
Listening for Idle Timeouts
To receive idle timeout notifications, use the async iterator Switch.read_idle_timeouts.
You will receive a P4IdleTimeoutNotification which contains multiple table
entries -- one for each entry that timed out.
async for timeout in switch.read_idle_timeouts():
for entry in timeout.table_entry:
print(timeout.timestamp, entry)
Other Events
A P4 switch may report other events using the EventEmitter API. See
the SwitchEvent class for the event types. Each switch has a switch.ee
attribute that lets your code register for event callbacks.
Development and Testing
Perform these steps to set up your local environment for Finsy development, or try the codespace. Finsy requires Python 3.10 or later. If poetry is not installed, follow these directions to install it.
Clone and Prepare a Virtual Environment
The poetry install command installs all development dependencies into the
virtual environment (venv).
$ git clone https://github.com/byllyfish/finsy.git
$ cd finsy
$ python3 -m venv .venv
$ poetry install
Run Unit Tests
When you run pytest from the top level of the repository, you will run the unit tests.
$ poetry run pytest
Run Integration Tests
When you run pytest from within the examples directory, you will run the integration
tests instead of the unit tests. The integration tests run the example programs against a
Mininet network. Docker or podman are required.
$ cd examples
$ poetry run pytest
API Documentation
1""" 2.. include:: ../README.md 3 4# API Documentation 5""" 6 7# Copyright (c) 2022-2023 Bill Fisher 8# 9# Licensed under the Apache License, Version 2.0 (the "License"); 10# you may not use this file except in compliance with the License. 11# You may obtain a copy of the License at 12# 13# http://www.apache.org/licenses/LICENSE-2.0 14# 15# Unless required by applicable law or agreed to in writing, software 16# distributed under the License is distributed on an "AS IS" BASIS, 17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18# See the License for the specific language governing permissions and 19# limitations under the License. 20 21__version__ = "0.28.0" 22 23import sys 24 25if sys.version_info < (3, 10): # pragma: no cover 26 raise RuntimeError("Requires Python 3.10 or later.") 27 28from .controller import Controller 29from .gnmiclient import GNMIClient, GNMISubscription, GNMIUpdate 30from .gnmipath import GNMIPath 31from .grpcutil import GRPCCredentialsTLS, GRPCStatusCode 32from .log import LoggerAdapter 33from .macaddr import MACAddress 34from .p4client import P4Client, P4ClientError, P4Error 35from .p4entity import ( 36 P4ActionProfileGroup, 37 P4ActionProfileMember, 38 P4CloneSessionEntry, 39 P4CounterData, 40 P4CounterEntry, 41 P4DigestEntry, 42 P4DigestList, 43 P4DigestListAck, 44 P4DirectCounterEntry, 45 P4DirectMeterEntry, 46 P4ExternEntry, 47 P4IndirectAction, 48 P4Member, 49 P4MeterConfig, 50 P4MeterCounterData, 51 P4MeterEntry, 52 P4MulticastGroupEntry, 53 P4PacketIn, 54 P4PacketOut, 55 P4RegisterEntry, 56 P4TableAction, 57 P4TableEntry, 58 P4TableMatch, 59 P4ValueSetEntry, 60) 61from .p4schema import ( 62 P4ActionSelectionMode, 63 P4ActionSizeSemantics, 64 P4ConfigAction, 65 P4CounterUnit, 66 P4RuntimeVersion, 67 P4Schema, 68) 69from .ports import SwitchPort, SwitchPortList 70from .runner import run 71from .switch import Switch, SwitchEvent, SwitchOptions 72 73Match = P4TableMatch 74"`Match` is an alias for P4TableMatch." 75 76Action = P4TableAction 77"`Action` is an alias for P4TableAction." 78 79IndirectAction = P4IndirectAction 80"`IndirectAction` is an alias for P4IndirectAction." 81 82__all__ = [ 83 "run", 84 "Controller", 85 "LoggerAdapter", 86 "MACAddress", 87 "P4ActionProfileGroup", 88 "P4ActionProfileMember", 89 "P4Client", 90 "P4ClientError", 91 "P4CloneSessionEntry", 92 "P4CounterData", 93 "P4CounterEntry", 94 "P4CounterUnit", 95 "P4DigestEntry", 96 "P4DigestList", 97 "P4DigestListAck", 98 "P4DirectCounterEntry", 99 "P4DirectMeterEntry", 100 "P4Error", 101 "P4ExternEntry", 102 "IndirectAction", # alias for P4IndirectAction 103 "P4IndirectAction", 104 "P4Member", 105 "P4MeterConfig", 106 "P4MeterCounterData", 107 "P4MeterEntry", 108 "P4MulticastGroupEntry", 109 "P4PacketIn", 110 "P4PacketOut", 111 "P4RegisterEntry", 112 "Action", # alias for P4TableAction 113 "P4TableAction", 114 "P4TableEntry", 115 "Match", # alias for P4TableMatch 116 "P4TableMatch", 117 "P4ValueSetEntry", 118 "P4ActionSelectionMode", 119 "P4ActionSizeSemantics", 120 "P4ConfigAction", 121 "P4RuntimeVersion", 122 "P4Schema", 123 "Switch", 124 "SwitchEvent", 125 "SwitchOptions", 126 "SwitchPort", 127 "SwitchPortList", 128 "GNMIClient", 129 "GNMIPath", 130 "GNMISubscription", 131 "GNMIUpdate", 132 "GRPCCredentialsTLS", 133 "GRPCStatusCode", 134]
53def run(coro: Coroutine[Any, Any, None]) -> None: 54 """`finsy.run` provides a useful wrapper around `asyncio.run`. 55 56 This function implements common boilerplate for running a Finsy application. 57 58 - Set up basic logging to stderr at the INFO log level. 59 - Set up a signal handler for SIGTERM that shuts down gracefully. 60 - Set up caching of P4Info data so common definitions are re-used. 61 62 Example: 63 64 ```python 65 import finsy as fy 66 67 async def main(): 68 async with fy.Switch("sw", "127.0.0.1:50001") as sw: 69 print(sw.p4info) 70 71 if __name__ == "__main__": 72 fy.run(main()) 73 ``` 74 75 If you choose to use `asyncio.run` instead, your P4Schema/P4Info objects 76 will not be eligible for sharing. You can create your own `P4SchemaCache` 77 context manager to implement this. 78 """ 79 try: 80 asyncio.run(_finsy_boilerplate(coro)) 81 except (KeyboardInterrupt, asyncio.CancelledError): 82 pass
finsy.run provides a useful wrapper around asyncio.run.
This function implements common boilerplate for running a Finsy application.
- Set up basic logging to stderr at the INFO log level.
- Set up a signal handler for SIGTERM that shuts down gracefully.
- Set up caching of P4Info data so common definitions are re-used.
Example:
import finsy as fy
async def main():
async with fy.Switch("sw", "127.0.0.1:50001") as sw:
print(sw.p4info)
if __name__ == "__main__":
fy.run(main())
If you choose to use asyncio.run instead, your P4Schema/P4Info objects
will not be eligible for sharing. You can create your own P4SchemaCache
context manager to implement this.
30@final 31class Controller: 32 """Represents a collection of P4Runtime switches. 33 34 Each `Switch` in the Controller is identified by its name. Each name must 35 be unique. 36 37 ``` 38 switches = [ 39 fy.Switch("sw1", "10.0.0.1:50000"), 40 fy.Switch("sw2", "10.0.0.2:50000"), 41 ] 42 controller = fy.Controller(switches) 43 await controller.run() 44 ``` 45 46 A Controller can be running or stopped. There are two ways to run a 47 Controller. You can use the `Controller.run()` method, or you can use 48 a Controller as a context manager. 49 50 ``` 51 controller = fy.Controller(switches) 52 async with controller: 53 # Let controller run for 60 seconds. 54 await asyncio.sleep(60) 55 ``` 56 57 You can `add` or `remove` switches regardless of whether a Controller is 58 running or not. If the Controller is not running, adding or removing a Switch is 59 instantaneous. If the Controller is running, adding a Switch will start 60 it running asynchronously. Removing a Switch will schedule the Switch to stop, 61 but defer actual removal until the Switch has stopped asynchronously. 62 63 When a switch starts inside a Controller, it fires the `CONTROLLER_ENTER` 64 event. When a switch stops inside a Controller, it fires the `CONTROLLER_LEAVE` 65 event. 66 67 A Controller supports these methods to access its contents: 68 69 - len(controller): Return number of switches in the controller. 70 - controller[name]: Return the switch with the given name. 71 - controller.get(name): Return the switch with the given name, or None if not found. 72 73 You can iterate over the switches in a Controller using a for loop: 74 75 ``` 76 for switch in controller: 77 print(switch.name) 78 ``` 79 80 Any task or sub-task running inside a controller can retrieve its 81 Controller object using the `Controller.current()` method. 82 """ 83 84 _switches: dict[str, Switch] 85 _pending_removal: set[Switch] 86 _task_count: CountdownFuture 87 88 _control_task: asyncio.Task[Any] | None = None 89 "Keep track of controller's main task." 90 91 def __init__(self, switches: Iterable[Switch] = ()): 92 """Initialize Controller object with an initial list of switches. 93 94 Args: 95 switches: a collection or other iterable of Switch objects. 96 """ 97 self._switches = {} 98 self._pending_removal = set() 99 self._task_count = CountdownFuture() 100 101 for switch in switches: 102 if switch.name in self._switches: 103 raise ValueError(f"Switch named {switch.name!r} already exists") 104 self._switches[switch.name] = switch 105 106 @property 107 def running(self) -> bool: 108 "True if Controller is running." 109 return self._control_task is not None 110 111 async def run(self) -> None: 112 "Run the controller." 113 async with self: 114 await wait_for_cancel() 115 116 def stop(self) -> None: 117 "Stop the controller if it is running." 118 if self._control_task is not None: 119 self._control_task.cancel() 120 121 async def __aenter__(self) -> Self: 122 "Run the controller as a context manager (see also run())." 123 assert not self.running, "Controller.__aenter__ is not re-entrant" 124 assert self._task_count.value() == 0 125 assert not self._pending_removal 126 127 self._control_task = asyncio.current_task() 128 _CONTROLLER.set(self) 129 130 try: 131 # Start each switch running. 132 for switch in self: 133 self._start_switch(switch) 134 except Exception: 135 self._control_task = None 136 _CONTROLLER.set(None) 137 raise 138 139 return self 140 141 async def __aexit__( 142 self, 143 _exc_type: type[BaseException] | None, 144 _exc_val: BaseException | None, 145 _exc_tb: TracebackType | None, 146 ) -> bool | None: 147 "Run the controller as a context manager (see also run())." 148 assert self.running 149 150 try: 151 # Stop all the switches. 152 for switch in self: 153 self._stop_switch(switch) 154 155 # Wait for switch tasks to finish. 156 await self._task_count.wait() 157 158 finally: 159 self._control_task = None 160 _CONTROLLER.set(None) 161 162 def add(self, switch: Switch) -> None: 163 """Add a switch to the controller. 164 165 If the controller is running, tell the switch to start. 166 167 Args: 168 switch: the Switch object. 169 """ 170 if switch.name in self._switches: 171 raise ValueError(f"Switch named {switch.name!r} already exists") 172 173 self._switches[switch.name] = switch 174 if self.running: 175 self._start_switch(switch) 176 177 def remove(self, switch: Switch) -> asyncio.Event: 178 """Remove a switch from the controller. 179 180 If the controller is running, tell the switch to stop and schedule it 181 for removal when it fully stops. 182 183 Args: 184 switch: the Switch object. 185 """ 186 name = switch.name 187 if self._switches.get(name, None) is not switch: 188 raise ValueError(f"Switch named {name!r} not found") 189 190 del self._switches[name] 191 192 event = asyncio.Event() 193 if self.running: 194 # When controller is running, event will complete when switch 195 # is actually stopped. 196 self._stop_switch(switch) 197 self._pending_removal.add(switch) 198 199 def _controller_leave(sw: Switch): 200 self._pending_removal.discard(sw) 201 event.set() 202 203 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 204 else: 205 # When controller is not running, event completes immediately. 206 event.set() 207 208 return event 209 210 def _start_switch(self, switch: Switch): 211 "Start the switch's control task." 212 LOGGER.debug("Controller._start_switch: %r", switch) 213 assert switch._control_task is None # pyright: ignore[reportPrivateUsage] 214 215 switch.ee.emit(SwitchEvent.CONTROLLER_ENTER, switch) 216 217 task = asyncio.create_task(switch.run(), name=f"fy:{switch.name}") 218 switch._control_task = task # pyright: ignore[reportPrivateUsage] 219 self._task_count.increment() 220 221 def _switch_done(done: asyncio.Task[Any]): 222 switch._control_task = None # pyright: ignore[reportPrivateUsage] 223 switch.ee.emit(SwitchEvent.CONTROLLER_LEAVE, switch) 224 self._task_count.decrement() 225 226 if not done.cancelled(): 227 ex = done.exception() 228 if ex is not None: 229 if not isinstance(ex, SwitchFailFastError): 230 # The `fail_fast` error has already been logged. If 231 # it's any other error, log it. (There shouldn't be 232 # any other error.) 233 LOGGER.critical( 234 "Controller task %r failed", 235 done.get_name(), 236 exc_info=ex, 237 ) 238 # Shutdown the program cleanly due to switch failure. 239 raise SystemExit(99) 240 241 task.add_done_callback(_switch_done) 242 243 def _stop_switch(self, switch: Switch): 244 "Stop the switch's control task." 245 LOGGER.debug("Controller._stop_switch: %r", switch) 246 247 if switch._control_task is not None: # pyright: ignore[reportPrivateUsage] 248 switch._control_task.cancel() # pyright: ignore[reportPrivateUsage] 249 250 def __len__(self) -> int: 251 "Return the number of switches." 252 return len(self._switches) 253 254 def __iter__(self) -> Iterator[Switch]: 255 "Iterate over the switches." 256 return iter(self._switches.values()) 257 258 def __getitem__(self, name: str) -> Switch: 259 "Retrieve a switch by name." 260 return self._switches[name] 261 262 def get(self, name: str) -> Switch | None: 263 "Retrieve a switch by name, or return None if not found." 264 return self._switches.get(name) 265 266 @staticmethod 267 def current() -> "Controller": 268 "Return the current Controller object." 269 result = _CONTROLLER.get() 270 if result is None: 271 raise RuntimeError("controller does not exist") 272 return result
Represents a collection of P4Runtime switches.
Each Switch in the Controller is identified by its name. Each name must
be unique.
switches = [
fy.Switch("sw1", "10.0.0.1:50000"),
fy.Switch("sw2", "10.0.0.2:50000"),
]
controller = fy.Controller(switches)
await controller.run()
A Controller can be running or stopped. There are two ways to run a
Controller. You can use the Controller.run() method, or you can use
a Controller as a context manager.
controller = fy.Controller(switches)
async with controller:
# Let controller run for 60 seconds.
await asyncio.sleep(60)
You can add or remove switches regardless of whether a Controller is
running or not. If the Controller is not running, adding or removing a Switch is
instantaneous. If the Controller is running, adding a Switch will start
it running asynchronously. Removing a Switch will schedule the Switch to stop,
but defer actual removal until the Switch has stopped asynchronously.
When a switch starts inside a Controller, it fires the CONTROLLER_ENTER
event. When a switch stops inside a Controller, it fires the CONTROLLER_LEAVE
event.
A Controller supports these methods to access its contents:
- len(controller): Return number of switches in the controller.
- controller[name]: Return the switch with the given name.
- controller.get(name): Return the switch with the given name, or None if not found.
You can iterate over the switches in a Controller using a for loop:
for switch in controller:
print(switch.name)
Any task or sub-task running inside a controller can retrieve its
Controller object using the Controller.current() method.
91 def __init__(self, switches: Iterable[Switch] = ()): 92 """Initialize Controller object with an initial list of switches. 93 94 Args: 95 switches: a collection or other iterable of Switch objects. 96 """ 97 self._switches = {} 98 self._pending_removal = set() 99 self._task_count = CountdownFuture() 100 101 for switch in switches: 102 if switch.name in self._switches: 103 raise ValueError(f"Switch named {switch.name!r} already exists") 104 self._switches[switch.name] = switch
Initialize Controller object with an initial list of switches.
Arguments:
- switches: a collection or other iterable of Switch objects.
106 @property 107 def running(self) -> bool: 108 "True if Controller is running." 109 return self._control_task is not None
True if Controller is running.
111 async def run(self) -> None: 112 "Run the controller." 113 async with self: 114 await wait_for_cancel()
Run the controller.
116 def stop(self) -> None: 117 "Stop the controller if it is running." 118 if self._control_task is not None: 119 self._control_task.cancel()
Stop the controller if it is running.
121 async def __aenter__(self) -> Self: 122 "Run the controller as a context manager (see also run())." 123 assert not self.running, "Controller.__aenter__ is not re-entrant" 124 assert self._task_count.value() == 0 125 assert not self._pending_removal 126 127 self._control_task = asyncio.current_task() 128 _CONTROLLER.set(self) 129 130 try: 131 # Start each switch running. 132 for switch in self: 133 self._start_switch(switch) 134 except Exception: 135 self._control_task = None 136 _CONTROLLER.set(None) 137 raise 138 139 return self
Run the controller as a context manager (see also run()).
162 def add(self, switch: Switch) -> None: 163 """Add a switch to the controller. 164 165 If the controller is running, tell the switch to start. 166 167 Args: 168 switch: the Switch object. 169 """ 170 if switch.name in self._switches: 171 raise ValueError(f"Switch named {switch.name!r} already exists") 172 173 self._switches[switch.name] = switch 174 if self.running: 175 self._start_switch(switch)
Add a switch to the controller.
If the controller is running, tell the switch to start.
Arguments:
- switch: the Switch object.
177 def remove(self, switch: Switch) -> asyncio.Event: 178 """Remove a switch from the controller. 179 180 If the controller is running, tell the switch to stop and schedule it 181 for removal when it fully stops. 182 183 Args: 184 switch: the Switch object. 185 """ 186 name = switch.name 187 if self._switches.get(name, None) is not switch: 188 raise ValueError(f"Switch named {name!r} not found") 189 190 del self._switches[name] 191 192 event = asyncio.Event() 193 if self.running: 194 # When controller is running, event will complete when switch 195 # is actually stopped. 196 self._stop_switch(switch) 197 self._pending_removal.add(switch) 198 199 def _controller_leave(sw: Switch): 200 self._pending_removal.discard(sw) 201 event.set() 202 203 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 204 else: 205 # When controller is not running, event completes immediately. 206 event.set() 207 208 return event
Remove a switch from the controller.
If the controller is running, tell the switch to stop and schedule it for removal when it fully stops.
Arguments:
- switch: the Switch object.
254 def __iter__(self) -> Iterator[Switch]: 255 "Iterate over the switches." 256 return iter(self._switches.values())
Iterate over the switches.
258 def __getitem__(self, name: str) -> Switch: 259 "Retrieve a switch by name." 260 return self._switches[name]
Retrieve a switch by name.
262 def get(self, name: str) -> Switch | None: 263 "Retrieve a switch by name, or return None if not found." 264 return self._switches.get(name)
Retrieve a switch by name, or return None if not found.
68class LoggerAdapter(_BaseLoggerAdapter): 69 """Custom log adapter to include the name of the current task.""" 70 71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs 81 82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name representation for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
Custom log adapter to include the name of the current task.
71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs
Process the logging message and keyword arguments passed in to a logging call to insert contextual information.
82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name representation for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
INFO level uses a concise task name representation for readability.
24@functools.total_ordering 25class MACAddress: 26 """Concrete class for a MAC address.""" 27 28 __slots__ = ("__weakref__", "_mac") 29 _mac: int 30 31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value)) 44 45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac) 49 50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH 54 55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0 59 60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0 64 65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private 69 70 @property 71 def is_unspecified(self) -> bool: 72 "Return true if MAC address is all zeros." 73 return self._mac == 0 74 75 @property 76 def is_broadcast(self) -> bool: 77 "Return true if MAC address is the broadcast address." 78 return self._mac == (1 << _BIT_WIDTH) - 1 79 80 def __int__(self) -> int: 81 return self._mac 82 83 def __eq__(self, rhs: object) -> bool: 84 if not isinstance(rhs, MACAddress): 85 return NotImplemented 86 return self._mac == rhs._mac 87 88 def __lt__(self, rhs: object) -> bool: 89 if not isinstance(rhs, MACAddress): 90 return NotImplemented 91 return self._mac < rhs._mac 92 93 def __repr__(self) -> str: 94 return f"MACAddress({_to_string(self._mac)!r})" 95 96 def __str__(self) -> str: 97 return _to_string(self._mac) 98 99 def __hash__(self) -> int: 100 return hash(hex(self._mac))
Concrete class for a MAC address.
31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value))
Construct a MAC address from a string, integer or bytes object.
45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac)
Return the MAC address a byte string.
50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH
Return the maximum prefix length (48 bits).
55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0
Return true if MAC address has the multicast bit set.
60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0
Return true if MAC address has the locally administered bit set.
65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private
Return true if the locally administered bit is not set.
1598@decodable("action_profile_group") 1599@dataclass(slots=True) 1600class P4ActionProfileGroup(_P4Writable): 1601 "Represents a P4Runtime ActionProfileGroup." 1602 1603 action_profile_id: str = "" 1604 _: KW_ONLY 1605 group_id: int = 0 1606 max_size: int = 0 1607 members: Sequence[P4Member] | None = None 1608 1609 def encode(self, schema: P4Schema) -> p4r.Entity: 1610 "Encode P4ActionProfileGroup as protobuf." 1611 if not self.action_profile_id: 1612 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1613 1614 profile = schema.action_profiles[self.action_profile_id] 1615 1616 if self.members is not None: 1617 members = [member.encode() for member in self.members] 1618 else: 1619 members = None 1620 1621 entry = p4r.ActionProfileGroup( 1622 action_profile_id=profile.id, 1623 group_id=self.group_id, 1624 members=members, 1625 max_size=self.max_size, 1626 ) 1627 return p4r.Entity(action_profile_group=entry) 1628 1629 @classmethod 1630 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1631 "Decode protobuf to ActionProfileGroup data." 1632 entry = msg.action_profile_group 1633 if entry.action_profile_id == 0: 1634 return cls() 1635 1636 profile = schema.action_profiles[entry.action_profile_id] 1637 1638 if entry.members: 1639 members = [P4Member.decode(member) for member in entry.members] 1640 else: 1641 members = None 1642 1643 return cls( 1644 action_profile_id=profile.alias, 1645 group_id=entry.group_id, 1646 max_size=entry.max_size, 1647 members=members, 1648 ) 1649 1650 def action_str(self, _schema: P4Schema) -> str: 1651 "Return string representation of the weighted members." 1652 if not self.members: 1653 return "" 1654 1655 return " ".join( 1656 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1657 )
Represents a P4Runtime ActionProfileGroup.
1609 def encode(self, schema: P4Schema) -> p4r.Entity: 1610 "Encode P4ActionProfileGroup as protobuf." 1611 if not self.action_profile_id: 1612 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1613 1614 profile = schema.action_profiles[self.action_profile_id] 1615 1616 if self.members is not None: 1617 members = [member.encode() for member in self.members] 1618 else: 1619 members = None 1620 1621 entry = p4r.ActionProfileGroup( 1622 action_profile_id=profile.id, 1623 group_id=self.group_id, 1624 members=members, 1625 max_size=self.max_size, 1626 ) 1627 return p4r.Entity(action_profile_group=entry)
Encode P4ActionProfileGroup as protobuf.
1629 @classmethod 1630 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1631 "Decode protobuf to ActionProfileGroup data." 1632 entry = msg.action_profile_group 1633 if entry.action_profile_id == 0: 1634 return cls() 1635 1636 profile = schema.action_profiles[entry.action_profile_id] 1637 1638 if entry.members: 1639 members = [P4Member.decode(member) for member in entry.members] 1640 else: 1641 members = None 1642 1643 return cls( 1644 action_profile_id=profile.alias, 1645 group_id=entry.group_id, 1646 max_size=entry.max_size, 1647 members=members, 1648 )
Decode protobuf to ActionProfileGroup data.
1650 def action_str(self, _schema: P4Schema) -> str: 1651 "Return string representation of the weighted members." 1652 if not self.members: 1653 return "" 1654 1655 return " ".join( 1656 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1657 )
Return string representation of the weighted members.
1496@decodable("action_profile_member") 1497@dataclass(slots=True) 1498class P4ActionProfileMember(_P4Writable): 1499 "Represents a P4Runtime ActionProfileMember." 1500 1501 action_profile_id: str = "" 1502 _: KW_ONLY 1503 member_id: int = 0 1504 action: P4TableAction | None = None 1505 1506 def encode(self, schema: P4Schema) -> p4r.Entity: 1507 "Encode P4ActionProfileMember as protobuf." 1508 if not self.action_profile_id: 1509 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1510 1511 profile = schema.action_profiles[self.action_profile_id] 1512 1513 if self.action: 1514 action = self.action.encode_action(schema) 1515 else: 1516 action = None 1517 1518 entry = p4r.ActionProfileMember( 1519 action_profile_id=profile.id, 1520 member_id=self.member_id, 1521 action=action, 1522 ) 1523 return p4r.Entity(action_profile_member=entry) 1524 1525 @classmethod 1526 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1527 "Decode protobuf to ActionProfileMember data." 1528 entry = msg.action_profile_member 1529 if entry.action_profile_id == 0: 1530 return cls() 1531 1532 profile = schema.action_profiles[entry.action_profile_id] 1533 1534 if entry.HasField("action"): 1535 action = P4TableAction.decode_action(entry.action, schema) 1536 else: 1537 action = None 1538 1539 return cls( 1540 action_profile_id=profile.alias, 1541 member_id=entry.member_id, 1542 action=action, 1543 ) 1544 1545 def action_str(self, schema: P4Schema) -> str: 1546 "Format the action as a human-readable, canonical string." 1547 if self.action is None: 1548 return NOACTION_STR 1549 return self.action.format_str(schema)
Represents a P4Runtime ActionProfileMember.
1506 def encode(self, schema: P4Schema) -> p4r.Entity: 1507 "Encode P4ActionProfileMember as protobuf." 1508 if not self.action_profile_id: 1509 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1510 1511 profile = schema.action_profiles[self.action_profile_id] 1512 1513 if self.action: 1514 action = self.action.encode_action(schema) 1515 else: 1516 action = None 1517 1518 entry = p4r.ActionProfileMember( 1519 action_profile_id=profile.id, 1520 member_id=self.member_id, 1521 action=action, 1522 ) 1523 return p4r.Entity(action_profile_member=entry)
Encode P4ActionProfileMember as protobuf.
1525 @classmethod 1526 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1527 "Decode protobuf to ActionProfileMember data." 1528 entry = msg.action_profile_member 1529 if entry.action_profile_id == 0: 1530 return cls() 1531 1532 profile = schema.action_profiles[entry.action_profile_id] 1533 1534 if entry.HasField("action"): 1535 action = P4TableAction.decode_action(entry.action, schema) 1536 else: 1537 action = None 1538 1539 return cls( 1540 action_profile_id=profile.alias, 1541 member_id=entry.member_id, 1542 action=action, 1543 )
Decode protobuf to ActionProfileMember data.
241class P4Client: 242 "Implements a P4Runtime client." 243 244 _address: str 245 _credentials: GRPCCredentialsTLS | None 246 _wait_for_ready: bool 247 _channel: grpc.aio.Channel | None = None 248 _stub: p4r_grpc.P4RuntimeStub | None = None 249 _stream: _P4StreamTypeAlias | None = None 250 _complete_request: Callable[[pbutil.PBMessage], None] | None = None 251 252 _schema: P4Schema | None = None 253 "Annotate log messages using this optional P4Info schema." 254 255 def __init__( 256 self, 257 address: str, 258 credentials: GRPCCredentialsTLS | None = None, 259 *, 260 wait_for_ready: bool = True, 261 ) -> None: 262 self._address = address 263 self._credentials = credentials 264 self._wait_for_ready = wait_for_ready 265 266 @property 267 def channel(self) -> grpc.aio.Channel | None: 268 "Return the GRPC channel object, or None if the channel is not open." 269 return self._channel 270 271 async def __aenter__(self) -> Self: 272 await self.open() 273 return self 274 275 async def __aexit__(self, *args: Any) -> bool | None: 276 await self.close() 277 278 async def open( 279 self, 280 *, 281 schema: P4Schema | None = None, 282 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 283 ) -> None: 284 """Open the client channel. 285 286 Note: This method is `async` for forward-compatible reasons. 287 """ 288 assert self._channel is None 289 assert self._stub is None 290 291 # Increase max_metadata_size from 8 KB to 64 KB. 292 options = GRPCOptions( 293 max_metadata_size=64 * 1024, # 64 kilobytes 294 max_reconnect_backoff_ms=15000, # 15.0 seconds 295 ) 296 297 self._channel = grpc_channel( 298 self._address, 299 credentials=self._credentials, 300 options=options, 301 client_type="P4Client", 302 ) 303 304 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 305 self._schema = schema 306 self._complete_request = complete_request 307 308 async def close(self) -> None: 309 "Close the client channel." 310 if self._channel is not None: 311 LOGGER.debug("P4Client: close channel %r", self._address) 312 313 if self._stream is not None: 314 self._stream.cancel() 315 self._stream = None 316 317 await self._channel.close() 318 self._channel = None 319 self._stub = None 320 self._schema = None 321 self._complete_request = None 322 323 async def send(self, msg: p4r.StreamMessageRequest) -> None: 324 """Send a message to the stream.""" 325 assert self._stub is not None 326 327 if not self._stream or self._stream.done(): 328 self._stream = cast( 329 _P4StreamTypeAlias, 330 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 331 ) 332 333 self._log_msg(msg) 334 335 try: 336 await self._stream.write(msg) 337 except grpc.RpcError as ex: 338 raise P4ClientError(ex, "send") from None 339 340 async def receive(self) -> p4r.StreamMessageResponse: 341 """Read a message from the stream.""" 342 assert self._stream is not None 343 344 try: 345 msg = cast( 346 p4r.StreamMessageResponse, 347 await self._stream.read(), 348 ) 349 if msg is GRPC_EOF: 350 # Treat EOF as a protocol violation. 351 raise RuntimeError("P4Client.receive got EOF!") 352 353 except grpc.RpcError as ex: 354 raise P4ClientError(ex, "receive") from None 355 356 self._log_msg(msg) 357 return msg 358 359 @overload 360 async def request( 361 self, msg: p4r.WriteRequest 362 ) -> p4r.WriteResponse: ... # pragma: no cover 363 364 @overload 365 async def request( 366 self, msg: p4r.GetForwardingPipelineConfigRequest 367 ) -> p4r.GetForwardingPipelineConfigResponse: ... # pragma: no cover 368 369 @overload 370 async def request( 371 self, msg: p4r.SetForwardingPipelineConfigRequest 372 ) -> p4r.SetForwardingPipelineConfigResponse: ... # pragma: no cover 373 374 @overload 375 async def request( 376 self, msg: p4r.CapabilitiesRequest 377 ) -> p4r.CapabilitiesResponse: ... # pragma: no cover 378 379 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 380 "Send a unary-unary P4Runtime request and wait for the response." 381 assert self._stub is not None 382 383 if self._complete_request: 384 self._complete_request(msg) 385 386 msg_type = type(msg).__name__ 387 assert msg_type.endswith("Request") 388 rpc_method = getattr(self._stub, msg_type[:-7]) 389 390 self._log_msg(msg) 391 try: 392 reply = await rpc_method( 393 msg, 394 timeout=_DEFAULT_RPC_TIMEOUT, 395 ) 396 except grpc.RpcError as ex: 397 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 398 399 self._log_msg(reply) 400 return reply 401 402 async def request_iter( 403 self, msg: p4r.ReadRequest 404 ) -> AsyncIterator[p4r.ReadResponse]: 405 "Send a unary-stream P4Runtime read request and wait for the responses." 406 assert self._stub is not None 407 408 if self._complete_request: 409 self._complete_request(msg) 410 411 msg_type = type(msg).__name__ 412 assert msg_type.endswith("Request") 413 rpc_method = getattr(self._stub, msg_type[:-7]) 414 415 self._log_msg(msg) 416 try: 417 async for reply in rpc_method( 418 msg, 419 timeout=_DEFAULT_RPC_TIMEOUT, 420 ): 421 self._log_msg(reply) 422 yield reply 423 except grpc.RpcError as ex: 424 raise P4ClientError(ex, msg_type) from None 425 426 def _log_msg(self, msg: pbutil.PBMessage) -> None: 427 "Log a P4Runtime request or response." 428 pbutil.log_msg(self._channel, msg, self._schema)
Implements a P4Runtime client.
266 @property 267 def channel(self) -> grpc.aio.Channel | None: 268 "Return the GRPC channel object, or None if the channel is not open." 269 return self._channel
Return the GRPC channel object, or None if the channel is not open.
278 async def open( 279 self, 280 *, 281 schema: P4Schema | None = None, 282 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 283 ) -> None: 284 """Open the client channel. 285 286 Note: This method is `async` for forward-compatible reasons. 287 """ 288 assert self._channel is None 289 assert self._stub is None 290 291 # Increase max_metadata_size from 8 KB to 64 KB. 292 options = GRPCOptions( 293 max_metadata_size=64 * 1024, # 64 kilobytes 294 max_reconnect_backoff_ms=15000, # 15.0 seconds 295 ) 296 297 self._channel = grpc_channel( 298 self._address, 299 credentials=self._credentials, 300 options=options, 301 client_type="P4Client", 302 ) 303 304 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 305 self._schema = schema 306 self._complete_request = complete_request
Open the client channel.
Note: This method is async for forward-compatible reasons.
308 async def close(self) -> None: 309 "Close the client channel." 310 if self._channel is not None: 311 LOGGER.debug("P4Client: close channel %r", self._address) 312 313 if self._stream is not None: 314 self._stream.cancel() 315 self._stream = None 316 317 await self._channel.close() 318 self._channel = None 319 self._stub = None 320 self._schema = None 321 self._complete_request = None
Close the client channel.
323 async def send(self, msg: p4r.StreamMessageRequest) -> None: 324 """Send a message to the stream.""" 325 assert self._stub is not None 326 327 if not self._stream or self._stream.done(): 328 self._stream = cast( 329 _P4StreamTypeAlias, 330 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 331 ) 332 333 self._log_msg(msg) 334 335 try: 336 await self._stream.write(msg) 337 except grpc.RpcError as ex: 338 raise P4ClientError(ex, "send") from None
Send a message to the stream.
340 async def receive(self) -> p4r.StreamMessageResponse: 341 """Read a message from the stream.""" 342 assert self._stream is not None 343 344 try: 345 msg = cast( 346 p4r.StreamMessageResponse, 347 await self._stream.read(), 348 ) 349 if msg is GRPC_EOF: 350 # Treat EOF as a protocol violation. 351 raise RuntimeError("P4Client.receive got EOF!") 352 353 except grpc.RpcError as ex: 354 raise P4ClientError(ex, "receive") from None 355 356 self._log_msg(msg) 357 return msg
Read a message from the stream.
379 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 380 "Send a unary-unary P4Runtime request and wait for the response." 381 assert self._stub is not None 382 383 if self._complete_request: 384 self._complete_request(msg) 385 386 msg_type = type(msg).__name__ 387 assert msg_type.endswith("Request") 388 rpc_method = getattr(self._stub, msg_type[:-7]) 389 390 self._log_msg(msg) 391 try: 392 reply = await rpc_method( 393 msg, 394 timeout=_DEFAULT_RPC_TIMEOUT, 395 ) 396 except grpc.RpcError as ex: 397 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 398 399 self._log_msg(reply) 400 return reply
Send a unary-unary P4Runtime request and wait for the response.
402 async def request_iter( 403 self, msg: p4r.ReadRequest 404 ) -> AsyncIterator[p4r.ReadResponse]: 405 "Send a unary-stream P4Runtime read request and wait for the responses." 406 assert self._stub is not None 407 408 if self._complete_request: 409 self._complete_request(msg) 410 411 msg_type = type(msg).__name__ 412 assert msg_type.endswith("Request") 413 rpc_method = getattr(self._stub, msg_type[:-7]) 414 415 self._log_msg(msg) 416 try: 417 async for reply in rpc_method( 418 msg, 419 timeout=_DEFAULT_RPC_TIMEOUT, 420 ): 421 self._log_msg(reply) 422 yield reply 423 except grpc.RpcError as ex: 424 raise P4ClientError(ex, msg_type) from None
Send a unary-stream P4Runtime read request and wait for the responses.
127class P4ClientError(Exception): 128 "Wrap `grpc.RpcError`." 129 130 _operation: str 131 _status: P4RpcStatus 132 _outer_code: GRPCStatusCode 133 _outer_message: str 134 _schema: P4Schema | None = None # for annotating sub-value details 135 136 def __init__( 137 self, 138 error: grpc.RpcError, 139 operation: str, 140 *, 141 msg: pbutil.PBMessage | None = None, 142 schema: P4Schema | None = None, 143 ): 144 super().__init__() 145 assert isinstance(error, grpc.aio.AioRpcError) 146 147 self._operation = operation 148 self._status = P4RpcStatus.from_rpc_error(error) 149 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 150 self._outer_message = error.details() or "" 151 152 if msg is not None and self.details: 153 self._attach_details(msg) 154 self._schema = schema 155 156 LOGGER.debug("%s failed: %s", operation, self) 157 158 @property 159 def code(self) -> GRPCStatusCode: 160 "GRPC status code." 161 return self._status.code 162 163 @property 164 def message(self) -> str: 165 "GRPC status message." 166 return self._status.message 167 168 @property 169 def details(self) -> dict[int, P4Error]: 170 "Optional details about P4Runtime Write updates that failed." 171 return self._status.details 172 173 @property 174 def is_not_found_only(self) -> bool: 175 """Return True if the only sub-errors are NOT_FOUND.""" 176 if self.code != GRPCStatusCode.UNKNOWN: 177 return False 178 179 for err in self.details.values(): 180 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 181 return False 182 return True 183 184 @property 185 def is_election_id_used(self) -> bool: 186 """Return true if error is that election ID is in use.""" 187 return ( 188 self.code == GRPCStatusCode.INVALID_ARGUMENT 189 and _ELECTION_ID_EXISTS.search(self.message) is not None 190 ) 191 192 @property 193 def is_pipeline_missing(self) -> bool: 194 "Return true if error is that no pipeline config is set." 195 return ( 196 self.code == GRPCStatusCode.FAILED_PRECONDITION 197 and _NO_PIPELINE_CONFIG.search(self.message) is not None 198 ) 199 200 def _attach_details(self, msg: pbutil.PBMessage): 201 "Attach the subvalue(s) from the message that caused the error." 202 if isinstance(msg, p4r.WriteRequest): 203 for key, value in self.details.items(): 204 value.subvalue = msg.updates[key] 205 206 def __str__(self) -> str: 207 "Return string representation of P4ClientError object." 208 if self.details: 209 210 def _show(value: P4Error): 211 s = repr(value) 212 if self._schema: 213 s = pbutil.log_annotate(s, self._schema) 214 s = s.replace("\n}\n)", "\n})") # tidy multiline repr 215 return s.replace("\n", "\n" + " " * 6) # indent 6 spaces 216 217 items = [""] + [ 218 f" [details.{key}] {_show(val)}" for key, val in self.details.items() 219 ] 220 details = "\n".join(items) 221 else: 222 details = "" 223 224 if self.code == self._outer_code and self.message == self._outer_message: 225 return ( 226 f"operation={self._operation} code={self.code!r} " 227 f"message={self.message!r} {details}" 228 ) 229 return ( 230 f"code={self.code!r} message={self.message!r} " 231 f"details={self.details!r} operation={self._operation} " 232 f"_outer_message={self._outer_message!r} _outer_code={self._outer_code!r}" 233 )
Wrap grpc.RpcError.
136 def __init__( 137 self, 138 error: grpc.RpcError, 139 operation: str, 140 *, 141 msg: pbutil.PBMessage | None = None, 142 schema: P4Schema | None = None, 143 ): 144 super().__init__() 145 assert isinstance(error, grpc.aio.AioRpcError) 146 147 self._operation = operation 148 self._status = P4RpcStatus.from_rpc_error(error) 149 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 150 self._outer_message = error.details() or "" 151 152 if msg is not None and self.details: 153 self._attach_details(msg) 154 self._schema = schema 155 156 LOGGER.debug("%s failed: %s", operation, self)
158 @property 159 def code(self) -> GRPCStatusCode: 160 "GRPC status code." 161 return self._status.code
GRPC status code.
163 @property 164 def message(self) -> str: 165 "GRPC status message." 166 return self._status.message
GRPC status message.
168 @property 169 def details(self) -> dict[int, P4Error]: 170 "Optional details about P4Runtime Write updates that failed." 171 return self._status.details
Optional details about P4Runtime Write updates that failed.
173 @property 174 def is_not_found_only(self) -> bool: 175 """Return True if the only sub-errors are NOT_FOUND.""" 176 if self.code != GRPCStatusCode.UNKNOWN: 177 return False 178 179 for err in self.details.values(): 180 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 181 return False 182 return True
Return True if the only sub-errors are NOT_FOUND.
184 @property 185 def is_election_id_used(self) -> bool: 186 """Return true if error is that election ID is in use.""" 187 return ( 188 self.code == GRPCStatusCode.INVALID_ARGUMENT 189 and _ELECTION_ID_EXISTS.search(self.message) is not None 190 )
Return true if error is that election ID is in use.
192 @property 193 def is_pipeline_missing(self) -> bool: 194 "Return true if error is that no pipeline config is set." 195 return ( 196 self.code == GRPCStatusCode.FAILED_PRECONDITION 197 and _NO_PIPELINE_CONFIG.search(self.message) is not None 198 )
Return true if error is that no pipeline config is set.
1404@decodable("clone_session_entry") 1405@dataclass(slots=True) 1406class P4CloneSessionEntry(_P4Writable): 1407 "Represents a P4Runtime CloneSessionEntry." 1408 1409 session_id: int = 0 1410 _: KW_ONLY 1411 class_of_service: int = 0 1412 packet_length_bytes: int = 0 1413 replicas: Sequence[_ReplicaType] = () 1414 1415 def encode(self, schema: P4Schema) -> p4r.Entity: 1416 "Encode CloneSessionEntry data as protobuf." 1417 p4r_version = schema.p4runtime_version 1418 entry = p4r.CloneSessionEntry( 1419 session_id=self.session_id, 1420 class_of_service=self.class_of_service, 1421 packet_length_bytes=self.packet_length_bytes, 1422 replicas=[ 1423 encode_replica(replica, p4r_version) for replica in self.replicas 1424 ], 1425 ) 1426 return p4r.Entity( 1427 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1428 clone_session_entry=entry 1429 ) 1430 ) 1431 1432 @classmethod 1433 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1434 "Decode protobuf to CloneSessionEntry data." 1435 entry = msg.packet_replication_engine_entry.clone_session_entry 1436 return cls( 1437 session_id=entry.session_id, 1438 class_of_service=entry.class_of_service, 1439 packet_length_bytes=entry.packet_length_bytes, 1440 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1441 ) 1442 1443 def replicas_str(self) -> str: 1444 "Format the replicas as a human-readable, canonical string." 1445 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime CloneSessionEntry.
1415 def encode(self, schema: P4Schema) -> p4r.Entity: 1416 "Encode CloneSessionEntry data as protobuf." 1417 p4r_version = schema.p4runtime_version 1418 entry = p4r.CloneSessionEntry( 1419 session_id=self.session_id, 1420 class_of_service=self.class_of_service, 1421 packet_length_bytes=self.packet_length_bytes, 1422 replicas=[ 1423 encode_replica(replica, p4r_version) for replica in self.replicas 1424 ], 1425 ) 1426 return p4r.Entity( 1427 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1428 clone_session_entry=entry 1429 ) 1430 )
Encode CloneSessionEntry data as protobuf.
1432 @classmethod 1433 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1434 "Decode protobuf to CloneSessionEntry data." 1435 entry = msg.packet_replication_engine_entry.clone_session_entry 1436 return cls( 1437 session_id=entry.session_id, 1438 class_of_service=entry.class_of_service, 1439 packet_length_bytes=entry.packet_length_bytes, 1440 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1441 )
Decode protobuf to CloneSessionEntry data.
960@dataclass(kw_only=True, slots=True) 961class P4CounterData: 962 """Represents a P4Runtime object that keeps statistics of bytes and packets. 963 964 Attributes: 965 byte_count (int): the number of octets 966 packet_count (int): the number of packets 967 968 See Also: 969 - P4TableEntry 970 - P4MeterCounterData 971 - P4CounterEntry 972 - P4DirectCounterEntry 973 """ 974 975 byte_count: int = 0 976 "The number of octets." 977 packet_count: int = 0 978 "The number of packets." 979 980 def encode(self) -> p4r.CounterData: 981 "Encode object as CounterData." 982 return p4r.CounterData( 983 byte_count=self.byte_count, packet_count=self.packet_count 984 ) 985 986 @classmethod 987 def decode(cls, msg: p4r.CounterData) -> Self: 988 "Decode CounterData." 989 return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)
Represents a P4Runtime object that keeps statistics of bytes and packets.
Attributes:
- byte_count (int): the number of octets
- packet_count (int): the number of packets
See Also:
- P4TableEntry
- P4MeterCounterData
- P4CounterEntry
- P4DirectCounterEntry
1793@decodable("counter_entry") 1794@dataclass(slots=True) 1795class P4CounterEntry(_P4ModifyOnly): 1796 "Represents a P4Runtime CounterEntry." 1797 1798 counter_id: str = "" 1799 _: KW_ONLY 1800 index: int | None = None 1801 data: P4CounterData | None = None 1802 1803 @property 1804 def packet_count(self) -> int: 1805 "Packet count from counter data (or 0 if there is no data)." 1806 if self.data is not None: 1807 return self.data.packet_count 1808 return 0 1809 1810 @property 1811 def byte_count(self) -> int: 1812 "Byte count from counter data (or 0 if there is no data)." 1813 if self.data is not None: 1814 return self.data.byte_count 1815 return 0 1816 1817 def encode(self, schema: P4Schema) -> p4r.Entity: 1818 "Encode P4CounterEntry as protobuf." 1819 if not self.counter_id: 1820 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1821 1822 counter = schema.counters[self.counter_id] 1823 1824 if self.index is not None: 1825 index = p4r.Index(index=self.index) 1826 else: 1827 index = None 1828 1829 if self.data is not None: 1830 data = self.data.encode() 1831 else: 1832 data = None 1833 1834 entry = p4r.CounterEntry( 1835 counter_id=counter.id, 1836 index=index, 1837 data=data, 1838 ) 1839 return p4r.Entity(counter_entry=entry) 1840 1841 @classmethod 1842 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1843 "Decode protobuf to P4CounterEntry." 1844 entry = msg.counter_entry 1845 if not entry.counter_id: 1846 return cls() 1847 1848 counter = schema.counters[entry.counter_id] 1849 1850 if entry.HasField("index"): 1851 index = entry.index.index 1852 else: 1853 index = None 1854 1855 if entry.HasField("data"): 1856 data = P4CounterData.decode(entry.data) 1857 else: 1858 data = None 1859 1860 return cls(counter_id=counter.alias, index=index, data=data)
Represents a P4Runtime CounterEntry.
1803 @property 1804 def packet_count(self) -> int: 1805 "Packet count from counter data (or 0 if there is no data)." 1806 if self.data is not None: 1807 return self.data.packet_count 1808 return 0
Packet count from counter data (or 0 if there is no data).
1810 @property 1811 def byte_count(self) -> int: 1812 "Byte count from counter data (or 0 if there is no data)." 1813 if self.data is not None: 1814 return self.data.byte_count 1815 return 0
Byte count from counter data (or 0 if there is no data).
1817 def encode(self, schema: P4Schema) -> p4r.Entity: 1818 "Encode P4CounterEntry as protobuf." 1819 if not self.counter_id: 1820 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1821 1822 counter = schema.counters[self.counter_id] 1823 1824 if self.index is not None: 1825 index = p4r.Index(index=self.index) 1826 else: 1827 index = None 1828 1829 if self.data is not None: 1830 data = self.data.encode() 1831 else: 1832 data = None 1833 1834 entry = p4r.CounterEntry( 1835 counter_id=counter.id, 1836 index=index, 1837 data=data, 1838 ) 1839 return p4r.Entity(counter_entry=entry)
Encode P4CounterEntry as protobuf.
1841 @classmethod 1842 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1843 "Decode protobuf to P4CounterEntry." 1844 entry = msg.counter_entry 1845 if not entry.counter_id: 1846 return cls() 1847 1848 counter = schema.counters[entry.counter_id] 1849 1850 if entry.HasField("index"): 1851 index = entry.index.index 1852 else: 1853 index = None 1854 1855 if entry.HasField("data"): 1856 data = P4CounterData.decode(entry.data) 1857 else: 1858 data = None 1859 1860 return cls(counter_id=counter.alias, index=index, data=data)
Decode protobuf to P4CounterEntry.
84class P4CounterUnit(_EnumBase): 85 "IntEnum equivalent to `p4i.CounterSpec.Unit`." 86 87 UNSPECIFIED = p4i.CounterSpec.Unit.UNSPECIFIED 88 BYTES = p4i.CounterSpec.Unit.BYTES 89 PACKETS = p4i.CounterSpec.Unit.PACKETS 90 BOTH = p4i.CounterSpec.Unit.BOTH
IntEnum equivalent to p4i.CounterSpec.Unit.
1448@decodable("digest_entry") 1449@dataclass(slots=True) 1450class P4DigestEntry(_P4Writable): 1451 "Represents a P4Runtime DigestEntry." 1452 1453 digest_id: str = "" 1454 _: KW_ONLY 1455 max_list_size: int = 0 1456 max_timeout_ns: int = 0 1457 ack_timeout_ns: int = 0 1458 1459 def encode(self, schema: P4Schema) -> p4r.Entity: 1460 "Encode DigestEntry data as protobuf." 1461 if not self.digest_id: 1462 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1463 1464 digest = schema.digests[self.digest_id] 1465 1466 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1467 config = None 1468 else: 1469 config = p4r.DigestEntry.Config( 1470 max_timeout_ns=self.max_timeout_ns, 1471 max_list_size=self.max_list_size, 1472 ack_timeout_ns=self.ack_timeout_ns, 1473 ) 1474 1475 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1476 return p4r.Entity(digest_entry=entry) 1477 1478 @classmethod 1479 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1480 "Decode protobuf to DigestEntry data." 1481 entry = msg.digest_entry 1482 if entry.digest_id == 0: 1483 return cls() 1484 1485 digest = schema.digests[entry.digest_id] 1486 1487 config = entry.config 1488 return cls( 1489 digest.alias, 1490 max_list_size=config.max_list_size, 1491 max_timeout_ns=config.max_timeout_ns, 1492 ack_timeout_ns=config.ack_timeout_ns, 1493 )
Represents a P4Runtime DigestEntry.
1459 def encode(self, schema: P4Schema) -> p4r.Entity: 1460 "Encode DigestEntry data as protobuf." 1461 if not self.digest_id: 1462 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1463 1464 digest = schema.digests[self.digest_id] 1465 1466 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1467 config = None 1468 else: 1469 config = p4r.DigestEntry.Config( 1470 max_timeout_ns=self.max_timeout_ns, 1471 max_list_size=self.max_list_size, 1472 ack_timeout_ns=self.ack_timeout_ns, 1473 ) 1474 1475 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1476 return p4r.Entity(digest_entry=entry)
Encode DigestEntry data as protobuf.
1478 @classmethod 1479 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1480 "Decode protobuf to DigestEntry data." 1481 entry = msg.digest_entry 1482 if entry.digest_id == 0: 1483 return cls() 1484 1485 digest = schema.digests[entry.digest_id] 1486 1487 config = entry.config 1488 return cls( 1489 digest.alias, 1490 max_list_size=config.max_list_size, 1491 max_timeout_ns=config.max_timeout_ns, 1492 ack_timeout_ns=config.ack_timeout_ns, 1493 )
Decode protobuf to DigestEntry data.
2107@decodable("digest") 2108@dataclass(slots=True) 2109class P4DigestList: 2110 "Represents a P4Runtime DigestList." 2111 2112 digest_id: str 2113 _: KW_ONLY 2114 list_id: int 2115 timestamp: int 2116 data: list[_DataValueType] 2117 2118 @classmethod 2119 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 2120 "Decode protobuf to DigestList data." 2121 digest_list = msg.digest 2122 digest = schema.digests[digest_list.digest_id] 2123 2124 type_spec = digest.type_spec 2125 return cls( 2126 digest_id=digest.alias, 2127 list_id=digest_list.list_id, 2128 timestamp=digest_list.timestamp, 2129 data=[type_spec.decode_data(item) for item in digest_list.data], 2130 ) 2131 2132 def __len__(self) -> int: 2133 "Return number of values in digest list." 2134 return len(self.data) 2135 2136 def __getitem__(self, key: int) -> _DataValueType: 2137 "Retrieve value at given index from digest list." 2138 return self.data[key] 2139 2140 def __iter__(self) -> Iterator[_DataValueType]: 2141 "Iterate over values in digest list." 2142 return iter(self.data) 2143 2144 def ack(self) -> "P4DigestListAck": 2145 "Return the corresponding DigestListAck message." 2146 return P4DigestListAck(self.digest_id, self.list_id)
Represents a P4Runtime DigestList.
2118 @classmethod 2119 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 2120 "Decode protobuf to DigestList data." 2121 digest_list = msg.digest 2122 digest = schema.digests[digest_list.digest_id] 2123 2124 type_spec = digest.type_spec 2125 return cls( 2126 digest_id=digest.alias, 2127 list_id=digest_list.list_id, 2128 timestamp=digest_list.timestamp, 2129 data=[type_spec.decode_data(item) for item in digest_list.data], 2130 )
Decode protobuf to DigestList data.
2132 def __len__(self) -> int: 2133 "Return number of values in digest list." 2134 return len(self.data)
Return number of values in digest list.
2136 def __getitem__(self, key: int) -> _DataValueType: 2137 "Retrieve value at given index from digest list." 2138 return self.data[key]
Retrieve value at given index from digest list.
2140 def __iter__(self) -> Iterator[_DataValueType]: 2141 "Iterate over values in digest list." 2142 return iter(self.data)
Iterate over values in digest list.
2149@dataclass(slots=True) 2150class P4DigestListAck: 2151 "Represents a P4Runtime DigestListAck." 2152 2153 digest_id: str 2154 list_id: int 2155 2156 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2157 "Encode DigestListAck data as protobuf." 2158 digest = schema.digests[self.digest_id] 2159 2160 return p4r.StreamMessageRequest( 2161 digest_ack=p4r.DigestListAck( 2162 digest_id=digest.id, 2163 list_id=self.list_id, 2164 ) 2165 )
Represents a P4Runtime DigestListAck.
2156 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2157 "Encode DigestListAck data as protobuf." 2158 digest = schema.digests[self.digest_id] 2159 2160 return p4r.StreamMessageRequest( 2161 digest_ack=p4r.DigestListAck( 2162 digest_id=digest.id, 2163 list_id=self.list_id, 2164 ) 2165 )
Encode DigestListAck data as protobuf.
1863@decodable("direct_counter_entry") 1864@dataclass(slots=True) 1865class P4DirectCounterEntry(_P4ModifyOnly): 1866 "Represents a P4Runtime DirectCounterEntry." 1867 1868 counter_id: str = "" 1869 _: KW_ONLY 1870 table_entry: P4TableEntry | None = None 1871 data: P4CounterData | None = None 1872 1873 @property 1874 def table_id(self) -> str: 1875 "Return table_id of related table." 1876 if self.table_entry is None: 1877 return "" 1878 return self.table_entry.table_id 1879 1880 @property 1881 def packet_count(self) -> int: 1882 "Packet count from counter data (or 0 if there is no data)." 1883 if self.data is not None: 1884 return self.data.packet_count 1885 return 0 1886 1887 @property 1888 def byte_count(self) -> int: 1889 "Byte count from counter data (or 0 if there is no data)." 1890 if self.data is not None: 1891 return self.data.byte_count 1892 return 0 1893 1894 def encode(self, schema: P4Schema) -> p4r.Entity: 1895 "Encode P4DirectCounterEntry as protobuf." 1896 if self.table_entry is None: 1897 # Use `counter_id` to construct a `P4TableEntry` with the proper 1898 # table name. 1899 if self.counter_id: 1900 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1901 table_entry = P4TableEntry(tb_name) 1902 else: 1903 table_entry = P4TableEntry() 1904 else: 1905 table_entry = self.table_entry 1906 1907 if self.data is not None: 1908 data = self.data.encode() 1909 else: 1910 data = None 1911 1912 entry = p4r.DirectCounterEntry( 1913 table_entry=table_entry.encode_entry(schema), 1914 data=data, 1915 ) 1916 return p4r.Entity(direct_counter_entry=entry) 1917 1918 @classmethod 1919 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1920 "Decode protobuf to P4DirectCounterEntry." 1921 entry = msg.direct_counter_entry 1922 1923 if entry.HasField("table_entry"): 1924 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1925 else: 1926 table_entry = None 1927 1928 if entry.HasField("data"): 1929 data = P4CounterData.decode(entry.data) 1930 else: 1931 data = None 1932 1933 # Determine `counter_id` from table_entry. 1934 counter_id = "" 1935 if table_entry is not None and table_entry.table_id: 1936 direct_counter = schema.tables[table_entry.table_id].direct_counter 1937 assert direct_counter is not None 1938 counter_id = direct_counter.alias 1939 1940 return cls(counter_id, table_entry=table_entry, data=data)
Represents a P4Runtime DirectCounterEntry.
1873 @property 1874 def table_id(self) -> str: 1875 "Return table_id of related table." 1876 if self.table_entry is None: 1877 return "" 1878 return self.table_entry.table_id
Return table_id of related table.
1880 @property 1881 def packet_count(self) -> int: 1882 "Packet count from counter data (or 0 if there is no data)." 1883 if self.data is not None: 1884 return self.data.packet_count 1885 return 0
Packet count from counter data (or 0 if there is no data).
1887 @property 1888 def byte_count(self) -> int: 1889 "Byte count from counter data (or 0 if there is no data)." 1890 if self.data is not None: 1891 return self.data.byte_count 1892 return 0
Byte count from counter data (or 0 if there is no data).
1894 def encode(self, schema: P4Schema) -> p4r.Entity: 1895 "Encode P4DirectCounterEntry as protobuf." 1896 if self.table_entry is None: 1897 # Use `counter_id` to construct a `P4TableEntry` with the proper 1898 # table name. 1899 if self.counter_id: 1900 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1901 table_entry = P4TableEntry(tb_name) 1902 else: 1903 table_entry = P4TableEntry() 1904 else: 1905 table_entry = self.table_entry 1906 1907 if self.data is not None: 1908 data = self.data.encode() 1909 else: 1910 data = None 1911 1912 entry = p4r.DirectCounterEntry( 1913 table_entry=table_entry.encode_entry(schema), 1914 data=data, 1915 ) 1916 return p4r.Entity(direct_counter_entry=entry)
Encode P4DirectCounterEntry as protobuf.
1918 @classmethod 1919 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1920 "Decode protobuf to P4DirectCounterEntry." 1921 entry = msg.direct_counter_entry 1922 1923 if entry.HasField("table_entry"): 1924 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1925 else: 1926 table_entry = None 1927 1928 if entry.HasField("data"): 1929 data = P4CounterData.decode(entry.data) 1930 else: 1931 data = None 1932 1933 # Determine `counter_id` from table_entry. 1934 counter_id = "" 1935 if table_entry is not None and table_entry.table_id: 1936 direct_counter = schema.tables[table_entry.table_id].direct_counter 1937 assert direct_counter is not None 1938 counter_id = direct_counter.alias 1939 1940 return cls(counter_id, table_entry=table_entry, data=data)
Decode protobuf to P4DirectCounterEntry.
1733@decodable("direct_meter_entry") 1734@dataclass(kw_only=True, slots=True) 1735class P4DirectMeterEntry(_P4ModifyOnly): 1736 "Represents a P4Runtime DirectMeterEntry." 1737 1738 table_entry: P4TableEntry | None = None 1739 config: P4MeterConfig | None = None 1740 counter_data: P4MeterCounterData | None = None 1741 1742 def encode(self, schema: P4Schema) -> p4r.Entity: 1743 "Encode P4DirectMeterEntry as protobuf." 1744 if self.table_entry is not None: 1745 table_entry = self.table_entry.encode_entry(schema) 1746 else: 1747 table_entry = None 1748 1749 if self.config is not None: 1750 config = self.config.encode() 1751 else: 1752 config = None 1753 1754 if self.counter_data is not None: 1755 counter_data = self.counter_data.encode() 1756 else: 1757 counter_data = None 1758 1759 entry = p4r.DirectMeterEntry( 1760 table_entry=table_entry, 1761 config=config, 1762 counter_data=counter_data, 1763 ) 1764 return p4r.Entity(direct_meter_entry=entry) 1765 1766 @classmethod 1767 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1768 "Decode protobuf to P4DirectMeterEntry." 1769 entry = msg.direct_meter_entry 1770 1771 if entry.HasField("table_entry"): 1772 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1773 else: 1774 table_entry = None 1775 1776 if entry.HasField("config"): 1777 config = P4MeterConfig.decode(entry.config) 1778 else: 1779 config = None 1780 1781 if entry.HasField("counter_data"): 1782 counter_data = P4MeterCounterData.decode(entry.counter_data) 1783 else: 1784 counter_data = None 1785 1786 return cls( 1787 table_entry=table_entry, 1788 config=config, 1789 counter_data=counter_data, 1790 )
Represents a P4Runtime DirectMeterEntry.
1742 def encode(self, schema: P4Schema) -> p4r.Entity: 1743 "Encode P4DirectMeterEntry as protobuf." 1744 if self.table_entry is not None: 1745 table_entry = self.table_entry.encode_entry(schema) 1746 else: 1747 table_entry = None 1748 1749 if self.config is not None: 1750 config = self.config.encode() 1751 else: 1752 config = None 1753 1754 if self.counter_data is not None: 1755 counter_data = self.counter_data.encode() 1756 else: 1757 counter_data = None 1758 1759 entry = p4r.DirectMeterEntry( 1760 table_entry=table_entry, 1761 config=config, 1762 counter_data=counter_data, 1763 ) 1764 return p4r.Entity(direct_meter_entry=entry)
Encode P4DirectMeterEntry as protobuf.
1766 @classmethod 1767 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1768 "Decode protobuf to P4DirectMeterEntry." 1769 entry = msg.direct_meter_entry 1770 1771 if entry.HasField("table_entry"): 1772 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1773 else: 1774 table_entry = None 1775 1776 if entry.HasField("config"): 1777 config = P4MeterConfig.decode(entry.config) 1778 else: 1779 config = None 1780 1781 if entry.HasField("counter_data"): 1782 counter_data = P4MeterCounterData.decode(entry.counter_data) 1783 else: 1784 counter_data = None 1785 1786 return cls( 1787 table_entry=table_entry, 1788 config=config, 1789 counter_data=counter_data, 1790 )
Decode protobuf to P4DirectMeterEntry.
58@dataclass 59class P4Error: 60 "P4Runtime Error message used to report a single P4-entity error." 61 62 canonical_code: GRPCStatusCode 63 message: str 64 space: str 65 code: int 66 subvalue: pbutil.PBMessage | None = None
P4Runtime Error message used to report a single P4-entity error.
2200@decodable("extern_entry") 2201@dataclass(kw_only=True, slots=True) 2202class P4ExternEntry(_P4Writable): 2203 "Represents a P4Runtime ExternEntry." 2204 2205 extern_type_id: str 2206 extern_id: str 2207 entry: pbutil.PBAny 2208 2209 def encode(self, schema: P4Schema) -> p4r.Entity: 2210 "Encode ExternEntry data as protobuf." 2211 extern = schema.externs[self.extern_type_id, self.extern_id] 2212 entry = p4r.ExternEntry( 2213 extern_type_id=extern.extern_type_id, 2214 extern_id=extern.id, 2215 entry=self.entry, 2216 ) 2217 return p4r.Entity(extern_entry=entry) 2218 2219 @classmethod 2220 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2221 "Decode protobuf to ExternEntry data." 2222 entry = msg.extern_entry 2223 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2224 return cls( 2225 extern_type_id=extern.extern_type_name, 2226 extern_id=extern.name, 2227 entry=entry.entry, 2228 )
Represents a P4Runtime ExternEntry.
2209 def encode(self, schema: P4Schema) -> p4r.Entity: 2210 "Encode ExternEntry data as protobuf." 2211 extern = schema.externs[self.extern_type_id, self.extern_id] 2212 entry = p4r.ExternEntry( 2213 extern_type_id=extern.extern_type_id, 2214 extern_id=extern.id, 2215 entry=self.entry, 2216 ) 2217 return p4r.Entity(extern_entry=entry)
Encode ExternEntry data as protobuf.
2219 @classmethod 2220 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2221 "Decode protobuf to ExternEntry data." 2222 entry = msg.extern_entry 2223 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2224 return cls( 2225 extern_type_id=extern.extern_type_name, 2226 extern_id=extern.name, 2227 entry=entry.entry, 2228 )
Decode protobuf to ExternEntry data.
IndirectAction is an alias for P4IndirectAction.
702@dataclass(slots=True) 703class P4IndirectAction: 704 """Represents a P4Runtime Action reference for an indirect table. 705 706 An indirect action can be either: 707 708 1. a "one-shot" action (action_set) 709 2. a reference to an action profile member (member_id) 710 3. a reference to an action profile group (group_id) 711 712 Only one of action_set, member_id or group_id may be configured. The other 713 values must be None. 714 715 Attributes: 716 action_set: sequence of weighted actions for one-shot 717 member_id: ID of action profile member 718 group_id: ID of action profile group 719 720 Examples: 721 722 ```python 723 # Construct a one-shot action profile. 724 one_shot = P4IndirectAction( 725 2 * P4TableAction("forward", port=1), 726 1 * P4TableAction("forward", port=2), 727 ) 728 729 # Refer to an action profile member by ID. 730 member_action = P4IndirectAction(member_id=1) 731 732 # Refer to an action profile group by ID. 733 group_action = P4IndirectAction(group_id=2) 734 ``` 735 736 References: 737 - "9.1.2. Action Specification", 738 - "9.2.3. One Shot Action Selector Programming" 739 740 See Also: 741 - P4TableEntry 742 """ 743 744 action_set: Sequence[P4WeightedAction] | None = None 745 "Sequence of weighted actions defining one-shot action profile." 746 _: KW_ONLY 747 member_id: int | None = None 748 "ID of action profile member." 749 group_id: int | None = None 750 "ID of action profile group." 751 selection_mode: P4ActionSelectionMode = ( 752 P4ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 753 ) 754 "Action selection mode for one-shot action profile (1.5.0)." 755 size_semantics: P4ActionSizeSemantics = ( 756 P4ActionSizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 757 ) 758 "Action size semantics for one-shot action profile (1.5.0)." 759 760 def __post_init__(self) -> None: 761 if not self._check_invariant(): 762 raise ValueError( 763 "exactly one of action_set, member_id, or group_id must be set" 764 ) 765 766 def _check_invariant(self) -> bool: 767 "Return true if instance satisfies class invariant." 768 if self.action_set is not None: 769 return self.member_id is None and self.group_id is None 770 if self.member_id is not None: 771 return self.group_id is None 772 return self.group_id is not None 773 774 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 775 "Encode object as a TableAction." 776 if self.action_set is not None: 777 return p4r.TableAction( 778 action_profile_action_set=self.encode_action_set(table) 779 ) 780 781 if self.member_id is not None: 782 return p4r.TableAction(action_profile_member_id=self.member_id) 783 784 assert self.group_id is not None 785 return p4r.TableAction(action_profile_group_id=self.group_id) 786 787 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 788 "Encode object as an ActionProfileActionSet." 789 assert self.action_set is not None 790 791 profile_actions: list[p4r.ActionProfileAction] = [] 792 for weight, table_action in self.action_set: 793 action = table_action.encode_action(table) 794 795 match weight: 796 case int(weight_value): 797 watch_port = None 798 case (weight_value, int(watch)): 799 watch_port = encode_port(watch) 800 case _: 801 raise ValueError(f"unexpected action weight: {weight!r}") 802 803 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 804 if watch_port is not None: 805 profile.watch_port = watch_port 806 profile_actions.append(profile) 807 808 return p4r.ActionProfileActionSet( 809 action_profile_actions=profile_actions, 810 action_selection_mode=self.selection_mode.vt(), 811 size_semantics=self.size_semantics.vt(), 812 ) 813 814 @classmethod 815 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 816 "Decode ActionProfileActionSet." 817 action_set = list[P4WeightedAction]() 818 819 for action in msg.action_profile_actions: 820 match action.WhichOneof("watch_kind"): 821 case "watch_port": 822 weight = (action.weight, decode_port(action.watch_port)) 823 case None: 824 weight = action.weight 825 case other: 826 # "watch" (deprecated) is not supported 827 raise ValueError(f"unexpected oneof: {other!r}") 828 829 table_action = P4TableAction.decode_action(action.action, table) 830 action_set.append((weight, table_action)) 831 832 return cls( 833 action_set, 834 selection_mode=P4ActionSelectionMode(msg.action_selection_mode), 835 size_semantics=P4ActionSizeSemantics(msg.size_semantics), 836 ) 837 838 def format_str(self, table: P4Table) -> str: 839 """Format the indirect table action as a human-readable string.""" 840 if self.action_set is not None: 841 weighted_actions = [ 842 f"{weight}*{action.format_str(table)}" 843 for weight, action in self.action_set 844 ] 845 props = self._format_properties() 846 if props: 847 props = f" <{props}>" 848 return " ".join(weighted_actions) + props 849 850 # Use the name of the action_profile, if we can get it. If not, just 851 # use the placeholder "__indirect". 852 if table.action_profile is not None: 853 profile_name = f"@{table.action_profile.alias}" 854 else: 855 profile_name = "__indirect" 856 857 if self.member_id is not None: 858 return f"{profile_name}[[{self.member_id:#x}]]" 859 860 return f"{profile_name}[{self.group_id:#x}]" 861 862 def __repr__(self) -> str: 863 "Customize representation to make it more concise." 864 if self.action_set is not None: 865 props = self._repr_properties() 866 if props: 867 return f"P4IndirectAction(action_set={self.action_set!r}, {props})" 868 else: 869 return f"P4IndirectAction(action_set={self.action_set!r})" 870 if self.member_id is not None: 871 return f"P4IndirectAction(member_id={self.member_id!r})" 872 return f"P4IndirectAction(group_id={self.group_id!r})" 873 874 def _repr_properties(self) -> str: 875 "Return string description of properties used in __repr__." 876 result: list[str] = [] 877 if ( 878 self.selection_mode 879 != P4ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 880 ): 881 result.append(f"selection_mode={self.selection_mode!r}") 882 if ( 883 self.size_semantics 884 != P4ActionSizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 885 ): 886 result.append(f"size_semantics={self.size_semantics!r}") 887 return ", ".join(result) 888 889 def _format_properties(self) -> str: 890 "Return string description of properties used in format_str." 891 result: list[str] = [] 892 if ( 893 self.selection_mode 894 != P4ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 895 ): 896 result.append(f"{self.selection_mode.name}") 897 if ( 898 self.size_semantics 899 != P4ActionSizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 900 ): 901 result.append(f"{self.size_semantics.name}") 902 return ", ".join(result)
Represents a P4Runtime Action reference for an indirect table.
An indirect action can be either:
- a "one-shot" action (action_set)
- a reference to an action profile member (member_id)
- a reference to an action profile group (group_id)
Only one of action_set, member_id or group_id may be configured. The other values must be None.
Attributes:
- action_set: sequence of weighted actions for one-shot
- member_id: ID of action profile member
- group_id: ID of action profile group
Examples:
# Construct a one-shot action profile.
one_shot = P4IndirectAction(
2 * P4TableAction("forward", port=1),
1 * P4TableAction("forward", port=2),
)
# Refer to an action profile member by ID.
member_action = P4IndirectAction(member_id=1)
# Refer to an action profile group by ID.
group_action = P4IndirectAction(group_id=2)
References:
- "9.1.2. Action Specification",
- "9.2.3. One Shot Action Selector Programming"
See Also:
- P4TableEntry
Sequence of weighted actions defining one-shot action profile.
774 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 775 "Encode object as a TableAction." 776 if self.action_set is not None: 777 return p4r.TableAction( 778 action_profile_action_set=self.encode_action_set(table) 779 ) 780 781 if self.member_id is not None: 782 return p4r.TableAction(action_profile_member_id=self.member_id) 783 784 assert self.group_id is not None 785 return p4r.TableAction(action_profile_group_id=self.group_id)
Encode object as a TableAction.
787 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 788 "Encode object as an ActionProfileActionSet." 789 assert self.action_set is not None 790 791 profile_actions: list[p4r.ActionProfileAction] = [] 792 for weight, table_action in self.action_set: 793 action = table_action.encode_action(table) 794 795 match weight: 796 case int(weight_value): 797 watch_port = None 798 case (weight_value, int(watch)): 799 watch_port = encode_port(watch) 800 case _: 801 raise ValueError(f"unexpected action weight: {weight!r}") 802 803 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 804 if watch_port is not None: 805 profile.watch_port = watch_port 806 profile_actions.append(profile) 807 808 return p4r.ActionProfileActionSet( 809 action_profile_actions=profile_actions, 810 action_selection_mode=self.selection_mode.vt(), 811 size_semantics=self.size_semantics.vt(), 812 )
Encode object as an ActionProfileActionSet.
814 @classmethod 815 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 816 "Decode ActionProfileActionSet." 817 action_set = list[P4WeightedAction]() 818 819 for action in msg.action_profile_actions: 820 match action.WhichOneof("watch_kind"): 821 case "watch_port": 822 weight = (action.weight, decode_port(action.watch_port)) 823 case None: 824 weight = action.weight 825 case other: 826 # "watch" (deprecated) is not supported 827 raise ValueError(f"unexpected oneof: {other!r}") 828 829 table_action = P4TableAction.decode_action(action.action, table) 830 action_set.append((weight, table_action)) 831 832 return cls( 833 action_set, 834 selection_mode=P4ActionSelectionMode(msg.action_selection_mode), 835 size_semantics=P4ActionSizeSemantics(msg.size_semantics), 836 )
Decode ActionProfileActionSet.
838 def format_str(self, table: P4Table) -> str: 839 """Format the indirect table action as a human-readable string.""" 840 if self.action_set is not None: 841 weighted_actions = [ 842 f"{weight}*{action.format_str(table)}" 843 for weight, action in self.action_set 844 ] 845 props = self._format_properties() 846 if props: 847 props = f" <{props}>" 848 return " ".join(weighted_actions) + props 849 850 # Use the name of the action_profile, if we can get it. If not, just 851 # use the placeholder "__indirect". 852 if table.action_profile is not None: 853 profile_name = f"@{table.action_profile.alias}" 854 else: 855 profile_name = "__indirect" 856 857 if self.member_id is not None: 858 return f"{profile_name}[[{self.member_id:#x}]]" 859 860 return f"{profile_name}[{self.group_id:#x}]"
Format the indirect table action as a human-readable string.
1552@dataclass(slots=True) 1553class P4Member: 1554 """Represents an ActionProfileGroup Member. 1555 1556 See Also: 1557 - P4ActionProfileGroup 1558 """ 1559 1560 member_id: int 1561 _: KW_ONLY 1562 weight: P4Weight 1563 1564 def encode(self) -> p4r.ActionProfileGroup.Member: 1565 "Encode P4Member as protobuf." 1566 match self.weight: 1567 case int(weight): 1568 watch_port = None 1569 case (int(weight), int(watch)): 1570 watch_port = encode_port(watch) 1571 case other: # pyright: ignore[reportUnnecessaryComparison] 1572 raise ValueError(f"unexpected weight: {other!r}") 1573 1574 member = p4r.ActionProfileGroup.Member( 1575 member_id=self.member_id, 1576 weight=weight, 1577 ) 1578 1579 if watch_port is not None: 1580 member.watch_port = watch_port 1581 return member 1582 1583 @classmethod 1584 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1585 "Decode protobuf to P4Member." 1586 match msg.WhichOneof("watch_kind"): 1587 case "watch_port": 1588 weight = (msg.weight, decode_port(msg.watch_port)) 1589 case None: 1590 weight = msg.weight 1591 case other: 1592 # "watch" (deprecated) is not supported 1593 raise ValueError(f"unknown oneof: {other!r}") 1594 1595 return cls(member_id=msg.member_id, weight=weight)
Represents an ActionProfileGroup Member.
See Also:
- P4ActionProfileGroup
1564 def encode(self) -> p4r.ActionProfileGroup.Member: 1565 "Encode P4Member as protobuf." 1566 match self.weight: 1567 case int(weight): 1568 watch_port = None 1569 case (int(weight), int(watch)): 1570 watch_port = encode_port(watch) 1571 case other: # pyright: ignore[reportUnnecessaryComparison] 1572 raise ValueError(f"unexpected weight: {other!r}") 1573 1574 member = p4r.ActionProfileGroup.Member( 1575 member_id=self.member_id, 1576 weight=weight, 1577 ) 1578 1579 if watch_port is not None: 1580 member.watch_port = watch_port 1581 return member
Encode P4Member as protobuf.
1583 @classmethod 1584 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1585 "Decode protobuf to P4Member." 1586 match msg.WhichOneof("watch_kind"): 1587 case "watch_port": 1588 weight = (msg.weight, decode_port(msg.watch_port)) 1589 case None: 1590 weight = msg.weight 1591 case other: 1592 # "watch" (deprecated) is not supported 1593 raise ValueError(f"unknown oneof: {other!r}") 1594 1595 return cls(member_id=msg.member_id, weight=weight)
Decode protobuf to P4Member.
905@dataclass(kw_only=True, slots=True) 906class P4MeterConfig: 907 """Represents a P4Runtime MeterConfig. 908 909 Attributes: 910 cir (int): Committed information rate (units/sec). 911 cburst (int): Committed burst size. 912 pir (int): Peak information rate (units/sec). 913 pburst (int): Peak burst size. 914 eburst (int): Excess burst size (only used by srTCM). [default=0] 915 916 Example: 917 ``` 918 config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20) 919 ``` 920 921 See Also: 922 - P4TableEntry 923 - P4MeterEntry 924 - P4DirectMeterEntry 925 """ 926 927 cir: int 928 "Committed information rate (units/sec)." 929 cburst: int 930 "Committed burst size." 931 pir: int 932 "Peak information rate (units/sec)." 933 pburst: int 934 "Peak burst size." 935 eburst: int = 0 936 "Excess burst size (only used by srTCM)." 937 938 def encode(self) -> p4r.MeterConfig: 939 "Encode object as MeterConfig." 940 return p4r.MeterConfig( 941 cir=self.cir, 942 cburst=self.cburst, 943 pir=self.pir, 944 pburst=self.pburst, 945 eburst=self.eburst, 946 ) 947 948 @classmethod 949 def decode(cls, msg: p4r.MeterConfig) -> Self: 950 "Decode MeterConfig." 951 return cls( 952 cir=msg.cir, 953 cburst=msg.cburst, 954 pir=msg.pir, 955 pburst=msg.pburst, 956 eburst=msg.eburst, 957 )
Represents a P4Runtime MeterConfig.
Attributes:
- cir (int): Committed information rate (units/sec).
- cburst (int): Committed burst size.
- pir (int): Peak information rate (units/sec).
- pburst (int): Peak burst size.
- eburst (int): Excess burst size (only used by srTCM). [default=0]
Example:
config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)
See Also:
- P4TableEntry
- P4MeterEntry
- P4DirectMeterEntry
938 def encode(self) -> p4r.MeterConfig: 939 "Encode object as MeterConfig." 940 return p4r.MeterConfig( 941 cir=self.cir, 942 cburst=self.cburst, 943 pir=self.pir, 944 pburst=self.pburst, 945 eburst=self.eburst, 946 )
Encode object as MeterConfig.
948 @classmethod 949 def decode(cls, msg: p4r.MeterConfig) -> Self: 950 "Decode MeterConfig." 951 return cls( 952 cir=msg.cir, 953 cburst=msg.cburst, 954 pir=msg.pir, 955 pburst=msg.pburst, 956 eburst=msg.eburst, 957 )
Decode MeterConfig.
992@dataclass(kw_only=True, slots=True) 993class P4MeterCounterData: 994 """Represents a P4Runtime MeterCounterData that stores per-color counters. 995 996 Attributes: 997 green (CounterData): counter data for packets marked GREEN. 998 yellow (CounterData): counter data for packets marked YELLOW. 999 red (CounterData): counter data for packets marked RED. 1000 1001 See Also: 1002 - P4TableEntry 1003 - P4MeterEntry 1004 - P4DirectMeterEntry 1005 """ 1006 1007 green: P4CounterData 1008 "Counter of packets marked GREEN." 1009 yellow: P4CounterData 1010 "Counter of packets marked YELLOW." 1011 red: P4CounterData 1012 "Counter of packets marked RED." 1013 1014 def encode(self) -> p4r.MeterCounterData: 1015 "Encode object as MeterCounterData." 1016 return p4r.MeterCounterData( 1017 green=self.green.encode(), 1018 yellow=self.yellow.encode(), 1019 red=self.red.encode(), 1020 ) 1021 1022 @classmethod 1023 def decode(cls, msg: p4r.MeterCounterData) -> Self: 1024 "Decode MeterCounterData." 1025 return cls( 1026 green=P4CounterData.decode(msg.green), 1027 yellow=P4CounterData.decode(msg.yellow), 1028 red=P4CounterData.decode(msg.red), 1029 )
Represents a P4Runtime MeterCounterData that stores per-color counters.
Attributes:
- green (CounterData): counter data for packets marked GREEN.
- yellow (CounterData): counter data for packets marked YELLOW.
- red (CounterData): counter data for packets marked RED.
See Also:
- P4TableEntry
- P4MeterEntry
- P4DirectMeterEntry
1014 def encode(self) -> p4r.MeterCounterData: 1015 "Encode object as MeterCounterData." 1016 return p4r.MeterCounterData( 1017 green=self.green.encode(), 1018 yellow=self.yellow.encode(), 1019 red=self.red.encode(), 1020 )
Encode object as MeterCounterData.
1022 @classmethod 1023 def decode(cls, msg: p4r.MeterCounterData) -> Self: 1024 "Decode MeterCounterData." 1025 return cls( 1026 green=P4CounterData.decode(msg.green), 1027 yellow=P4CounterData.decode(msg.yellow), 1028 red=P4CounterData.decode(msg.red), 1029 )
Decode MeterCounterData.
1660@decodable("meter_entry") 1661@dataclass(slots=True) 1662class P4MeterEntry(_P4ModifyOnly): 1663 "Represents a P4Runtime MeterEntry." 1664 1665 meter_id: str = "" 1666 _: KW_ONLY 1667 index: int | None = None 1668 config: P4MeterConfig | None = None 1669 counter_data: P4MeterCounterData | None = None 1670 1671 def encode(self, schema: P4Schema) -> p4r.Entity: 1672 "Encode P4MeterEntry to protobuf." 1673 if not self.meter_id: 1674 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1675 1676 meter = schema.meters[self.meter_id] 1677 1678 if self.index is not None: 1679 index = p4r.Index(index=self.index) 1680 else: 1681 index = None 1682 1683 if self.config is not None: 1684 config = self.config.encode() 1685 else: 1686 config = None 1687 1688 if self.counter_data is not None: 1689 counter_data = self.counter_data.encode() 1690 else: 1691 counter_data = None 1692 1693 entry = p4r.MeterEntry( 1694 meter_id=meter.id, 1695 index=index, 1696 config=config, 1697 counter_data=counter_data, 1698 ) 1699 return p4r.Entity(meter_entry=entry) 1700 1701 @classmethod 1702 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1703 "Decode protobuf to P4MeterEntry." 1704 entry = msg.meter_entry 1705 if not entry.meter_id: 1706 return cls() 1707 1708 meter = schema.meters[entry.meter_id] 1709 1710 if entry.HasField("index"): 1711 index = entry.index.index 1712 else: 1713 index = None 1714 1715 if entry.HasField("config"): 1716 config = P4MeterConfig.decode(entry.config) 1717 else: 1718 config = None 1719 1720 if entry.HasField("counter_data"): 1721 counter_data = P4MeterCounterData.decode(entry.counter_data) 1722 else: 1723 counter_data = None 1724 1725 return cls( 1726 meter_id=meter.alias, 1727 index=index, 1728 config=config, 1729 counter_data=counter_data, 1730 )
Represents a P4Runtime MeterEntry.
1671 def encode(self, schema: P4Schema) -> p4r.Entity: 1672 "Encode P4MeterEntry to protobuf." 1673 if not self.meter_id: 1674 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1675 1676 meter = schema.meters[self.meter_id] 1677 1678 if self.index is not None: 1679 index = p4r.Index(index=self.index) 1680 else: 1681 index = None 1682 1683 if self.config is not None: 1684 config = self.config.encode() 1685 else: 1686 config = None 1687 1688 if self.counter_data is not None: 1689 counter_data = self.counter_data.encode() 1690 else: 1691 counter_data = None 1692 1693 entry = p4r.MeterEntry( 1694 meter_id=meter.id, 1695 index=index, 1696 config=config, 1697 counter_data=counter_data, 1698 ) 1699 return p4r.Entity(meter_entry=entry)
Encode P4MeterEntry to protobuf.
1701 @classmethod 1702 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1703 "Decode protobuf to P4MeterEntry." 1704 entry = msg.meter_entry 1705 if not entry.meter_id: 1706 return cls() 1707 1708 meter = schema.meters[entry.meter_id] 1709 1710 if entry.HasField("index"): 1711 index = entry.index.index 1712 else: 1713 index = None 1714 1715 if entry.HasField("config"): 1716 config = P4MeterConfig.decode(entry.config) 1717 else: 1718 config = None 1719 1720 if entry.HasField("counter_data"): 1721 counter_data = P4MeterCounterData.decode(entry.counter_data) 1722 else: 1723 counter_data = None 1724 1725 return cls( 1726 meter_id=meter.alias, 1727 index=index, 1728 config=config, 1729 counter_data=counter_data, 1730 )
Decode protobuf to P4MeterEntry.
1363@decodable("multicast_group_entry") 1364@dataclass(slots=True) 1365class P4MulticastGroupEntry(_P4Writable): 1366 "Represents a P4Runtime MulticastGroupEntry." 1367 1368 multicast_group_id: int = 0 1369 _: KW_ONLY 1370 replicas: Sequence[_ReplicaType] = () 1371 metadata: bytes = b"" 1372 1373 def encode(self, schema: P4Schema) -> p4r.Entity: 1374 "Encode MulticastGroupEntry data as protobuf." 1375 p4r_version = schema.p4runtime_version 1376 entry = p4r.MulticastGroupEntry( 1377 multicast_group_id=self.multicast_group_id, 1378 replicas=[ 1379 encode_replica(replica, p4r_version) for replica in self.replicas 1380 ], 1381 metadata=self.metadata, 1382 ) 1383 return p4r.Entity( 1384 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1385 multicast_group_entry=entry 1386 ) 1387 ) 1388 1389 @classmethod 1390 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1391 "Decode protobuf to MulticastGroupEntry data." 1392 entry = msg.packet_replication_engine_entry.multicast_group_entry 1393 return cls( 1394 multicast_group_id=entry.multicast_group_id, 1395 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1396 metadata=entry.metadata, 1397 ) 1398 1399 def replicas_str(self) -> str: 1400 "Format the replicas as a human-readable, canonical string." 1401 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime MulticastGroupEntry.
1373 def encode(self, schema: P4Schema) -> p4r.Entity: 1374 "Encode MulticastGroupEntry data as protobuf." 1375 p4r_version = schema.p4runtime_version 1376 entry = p4r.MulticastGroupEntry( 1377 multicast_group_id=self.multicast_group_id, 1378 replicas=[ 1379 encode_replica(replica, p4r_version) for replica in self.replicas 1380 ], 1381 metadata=self.metadata, 1382 ) 1383 return p4r.Entity( 1384 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1385 multicast_group_entry=entry 1386 ) 1387 )
Encode MulticastGroupEntry data as protobuf.
1389 @classmethod 1390 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1391 "Decode protobuf to MulticastGroupEntry data." 1392 entry = msg.packet_replication_engine_entry.multicast_group_entry 1393 return cls( 1394 multicast_group_id=entry.multicast_group_id, 1395 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1396 metadata=entry.metadata, 1397 )
Decode protobuf to MulticastGroupEntry data.
2037@decodable("packet") 2038@dataclass(slots=True) 2039class P4PacketIn: 2040 "Represents a P4Runtime PacketIn." 2041 2042 payload: bytes 2043 _: KW_ONLY 2044 metadata: _MetadataDictType 2045 2046 @classmethod 2047 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 2048 "Decode protobuf to PacketIn data." 2049 packet = msg.packet 2050 cpm = schema.controller_packet_metadata.get("packet_in") 2051 if cpm is None: 2052 # There is no controller metadata. Warn if message has any. 2053 pkt_meta = packet.metadata 2054 if pkt_meta: 2055 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 2056 return cls(packet.payload, metadata={}) 2057 2058 return cls( 2059 packet.payload, 2060 metadata=cpm.decode(packet.metadata), 2061 ) 2062 2063 def __getitem__(self, key: str) -> Any: 2064 "Retrieve metadata value." 2065 return self.metadata[key] 2066 2067 def __repr__(self) -> str: 2068 "Return friendlier hexadecimal description of packet." 2069 if self.metadata: 2070 return f"P4PacketIn(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 2071 return f"P4PacketIn(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketIn.
2046 @classmethod 2047 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 2048 "Decode protobuf to PacketIn data." 2049 packet = msg.packet 2050 cpm = schema.controller_packet_metadata.get("packet_in") 2051 if cpm is None: 2052 # There is no controller metadata. Warn if message has any. 2053 pkt_meta = packet.metadata 2054 if pkt_meta: 2055 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 2056 return cls(packet.payload, metadata={}) 2057 2058 return cls( 2059 packet.payload, 2060 metadata=cpm.decode(packet.metadata), 2061 )
Decode protobuf to PacketIn data.
2074@dataclass(slots=True) 2075class P4PacketOut: 2076 "Represents a P4Runtime PacketOut." 2077 2078 payload: bytes 2079 _: KW_ONLY 2080 metadata: _MetadataDictType 2081 2082 def __init__(self, __payload: bytes, /, **metadata: Any): 2083 self.payload = __payload 2084 self.metadata = metadata 2085 2086 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2087 "Encode PacketOut data as protobuf." 2088 cpm = schema.controller_packet_metadata["packet_out"] 2089 return p4r.StreamMessageRequest( 2090 packet=p4r.PacketOut( 2091 payload=self.payload, 2092 metadata=cpm.encode(self.metadata), 2093 ) 2094 ) 2095 2096 def __getitem__(self, key: str) -> Any: 2097 "Retrieve metadata value." 2098 return self.metadata[key] 2099 2100 def __repr__(self) -> str: 2101 "Return friendlier hexadecimal description of packet." 2102 if self.metadata: 2103 return f"P4PacketOut(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 2104 return f"P4PacketOut(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketOut.
2086 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2087 "Encode PacketOut data as protobuf." 2088 cpm = schema.controller_packet_metadata["packet_out"] 2089 return p4r.StreamMessageRequest( 2090 packet=p4r.PacketOut( 2091 payload=self.payload, 2092 metadata=cpm.encode(self.metadata), 2093 ) 2094 )
Encode PacketOut data as protobuf.
1303@decodable("register_entry") 1304@dataclass(slots=True) 1305class P4RegisterEntry(_P4ModifyOnly): 1306 "Represents a P4Runtime RegisterEntry." 1307 1308 register_id: str = "" 1309 _: KW_ONLY 1310 index: int | None = None 1311 data: _DataValueType | None = None 1312 1313 def encode(self, schema: P4Schema) -> p4r.Entity: 1314 "Encode RegisterEntry data as protobuf." 1315 if not self.register_id: 1316 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1317 1318 register = schema.registers[self.register_id] 1319 1320 if self.index is not None: 1321 index = p4r.Index(index=self.index) 1322 else: 1323 index = None 1324 1325 if self.data is not None: 1326 data = register.type_spec.encode_data(self.data) 1327 else: 1328 data = None 1329 1330 entry = p4r.RegisterEntry( 1331 register_id=register.id, 1332 index=index, 1333 data=data, 1334 ) 1335 return p4r.Entity(register_entry=entry) 1336 1337 @classmethod 1338 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1339 "Decode protobuf to RegisterEntry data." 1340 entry = msg.register_entry 1341 if entry.register_id == 0: 1342 return cls() 1343 1344 register = schema.registers[entry.register_id] 1345 1346 if entry.HasField("index"): 1347 index = entry.index.index 1348 else: 1349 index = None 1350 1351 if entry.HasField("data"): 1352 data = register.type_spec.decode_data(entry.data) 1353 else: 1354 data = None 1355 1356 return cls( 1357 register.alias, 1358 index=index, 1359 data=data, 1360 )
Represents a P4Runtime RegisterEntry.
1313 def encode(self, schema: P4Schema) -> p4r.Entity: 1314 "Encode RegisterEntry data as protobuf." 1315 if not self.register_id: 1316 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1317 1318 register = schema.registers[self.register_id] 1319 1320 if self.index is not None: 1321 index = p4r.Index(index=self.index) 1322 else: 1323 index = None 1324 1325 if self.data is not None: 1326 data = register.type_spec.encode_data(self.data) 1327 else: 1328 data = None 1329 1330 entry = p4r.RegisterEntry( 1331 register_id=register.id, 1332 index=index, 1333 data=data, 1334 ) 1335 return p4r.Entity(register_entry=entry)
Encode RegisterEntry data as protobuf.
1337 @classmethod 1338 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1339 "Decode protobuf to RegisterEntry data." 1340 entry = msg.register_entry 1341 if entry.register_id == 0: 1342 return cls() 1343 1344 register = schema.registers[entry.register_id] 1345 1346 if entry.HasField("index"): 1347 index = entry.index.index 1348 else: 1349 index = None 1350 1351 if entry.HasField("data"): 1352 data = register.type_spec.decode_data(entry.data) 1353 else: 1354 data = None 1355 1356 return cls( 1357 register.alias, 1358 index=index, 1359 data=data, 1360 )
Decode protobuf to RegisterEntry data.
Action is an alias for P4TableAction.
523@dataclass(init=False, slots=True) 524class P4TableAction: 525 """Represents a P4Runtime Action reference for a direct table. 526 527 Attributes: 528 name (str): the name of the action. 529 args (dict[str, Any]): the action's arguments as a dictionary. 530 531 Example: 532 If the name of the action is "ipv4_forward" and it takes a single 533 "port" parameter, you can construct the action as: 534 535 ``` 536 action = P4TableAction("ipv4_forward", port=1) 537 ``` 538 539 Reference "9.1.2 Action Specification": 540 The Action Protobuf has fields: (action_id, params). Finsy translates 541 `name` to the appropriate `action_id` as determined by P4Info. It also 542 translates each named argument in `args` to the appropriate `param_id`. 543 544 See Also: 545 To specify an action for an indirect table, use `P4IndirectAction`. 546 Note that P4TableAction will automatically be promoted to an "indirect" 547 action if needed. 548 549 Operators: 550 A `P4TableAction` supports the multiplication operator (*) for 551 constructing "weighted actions". A weighted action is used in specifying 552 indirect actions. Here is an action with a weight of 3: 553 554 ``` 555 weighted_action = 3 * P4TableAction("ipv4_forward", port=1) 556 ``` 557 558 To specify a weight with a `watch_port`, use a tuple `(weight, port)`. 559 The weight is always a positive integer. 560 561 See Also: 562 - P4TableEntry 563 """ 564 565 name: str 566 "The name of the action." 567 args: dict[str, Any] 568 "The action's arguments as a dictionary." 569 570 def __init__(self, __name: str, /, **args: Any): 571 self.name = __name 572 self.args = args 573 574 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 575 """Encode TableAction data as protobuf. 576 577 If the table is indirect, promote the action to a "one-shot" indirect 578 action. 579 """ 580 try: 581 action = table.actions[self.name] 582 except Exception as ex: 583 raise ValueError(f"{table.name!r}: {ex}") from ex 584 585 action_p4 = self._encode_action(action) 586 587 if table.action_profile is not None: 588 # Promote action to ActionProfileActionSet entry with weight=1. 589 return p4r.TableAction( 590 action_profile_action_set=p4r.ActionProfileActionSet( 591 action_profile_actions=[ 592 p4r.ActionProfileAction(action=action_p4, weight=1) 593 ] 594 ) 595 ) 596 597 return p4r.TableAction(action=action_p4) 598 599 def _fail_missing_params(self, action: P4ActionRef | P4Action) -> NoReturn: 600 "Report missing parameters." 601 seen = {param.name for param in action.params} 602 for name in self.args: 603 param = action.params[name] 604 seen.remove(param.name) 605 606 raise ValueError(f"Action {action.alias!r}: missing parameters {seen}") 607 608 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 609 "Encode Action data as protobuf." 610 action = schema.actions[self.name] 611 return self._encode_action(action) 612 613 def _encode_action(self, action: P4ActionRef | P4Action) -> p4r.Action: 614 "Helper to encode an action." 615 aps = action.params 616 try: 617 params = [ 618 aps[name].encode_param(value) for name, value in self.args.items() 619 ] 620 except ValueError as ex: 621 raise ValueError(f"{action.alias!r}: {ex}") from ex 622 623 # Check for missing action parameters. We always accept an action with 624 # no parameters (for wildcard ReadRequests). 625 param_count = len(params) 626 if param_count > 0 and param_count != len(aps): 627 self._fail_missing_params(action) 628 629 return p4r.Action(action_id=action.id, params=params) 630 631 @classmethod 632 def decode_table_action( 633 cls, msg: p4r.TableAction, table: P4Table 634 ) -> Self | "P4IndirectAction": 635 "Decode protobuf to TableAction data." 636 match msg.WhichOneof("type"): 637 case "action": 638 return cls.decode_action(msg.action, table) 639 case "action_profile_member_id": 640 return P4IndirectAction(member_id=msg.action_profile_member_id) 641 case "action_profile_group_id": 642 return P4IndirectAction(group_id=msg.action_profile_group_id) 643 case "action_profile_action_set": 644 return P4IndirectAction.decode_action_set( 645 msg.action_profile_action_set, table 646 ) 647 case other: 648 raise ValueError(f"unknown oneof: {other!r}") 649 650 @classmethod 651 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 652 "Decode protobuf to Action data." 653 action = parent.actions[msg.action_id] 654 args = {} 655 for param in msg.params: 656 action_param = action.params[param.param_id] 657 value = action_param.decode_param(param) 658 args[action_param.name] = value 659 660 return cls(action.alias, **args) 661 662 def format_str(self, schema: P4Schema | P4Table) -> str: 663 """Format the table action as a human-readable string. 664 665 The result is formatted to look like a function call: 666 667 ``` 668 name(param1=value1, ...) 669 ``` 670 671 Where `name` is the action name, and `(param<N>, value<N>)` are the 672 action parameters. The format of `value<N>` is schema-dependent. 673 """ 674 aps = schema.actions[self.name].params 675 args = [ 676 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 677 ] 678 679 return f"{self.name}({', '.join(args)})" 680 681 def __mul__(self, weight: P4Weight) -> P4WeightedAction: 682 "Make a weighted action." 683 if not isinstance( 684 weight, (int, tuple) 685 ): # pyright: ignore[reportUnnecessaryIsInstance] 686 raise NotImplementedError("expected P4Weight") 687 return (weight, self) 688 689 def __rmul__(self, weight: P4Weight) -> P4WeightedAction: 690 "Make a weighted action." 691 if not isinstance( 692 weight, (int, tuple) 693 ): # pyright: ignore[reportUnnecessaryIsInstance] 694 raise NotImplementedError("expected P4Weight") 695 return (weight, self) 696 697 def __call__(self, **params: Any) -> Self: 698 "Return a new action with the updated parameters." 699 return self.__class__(self.name, **(self.args | params))
Represents a P4Runtime Action reference for a direct table.
Attributes:
- name (str): the name of the action.
- args (dict[str, Any]): the action's arguments as a dictionary.
Example:
If the name of the action is "ipv4_forward" and it takes a single "port" parameter, you can construct the action as:
action = P4TableAction("ipv4_forward", port=1)
Reference "9.1.2 Action Specification":
The Action Protobuf has fields: (action_id, params). Finsy translates
name to the appropriate action_id as determined by P4Info. It also
translates each named argument in args to the appropriate param_id.
See Also:
To specify an action for an indirect table, use
P4IndirectAction. Note that P4TableAction will automatically be promoted to an "indirect" action if needed.
Operators:
A
P4TableActionsupports the multiplication operator (*) for constructing "weighted actions". A weighted action is used in specifying indirect actions. Here is an action with a weight of 3:weighted_action = 3 * P4TableAction("ipv4_forward", port=1)To specify a weight with a
watch_port, use a tuple(weight, port). The weight is always a positive integer.
See Also:
- P4TableEntry
574 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 575 """Encode TableAction data as protobuf. 576 577 If the table is indirect, promote the action to a "one-shot" indirect 578 action. 579 """ 580 try: 581 action = table.actions[self.name] 582 except Exception as ex: 583 raise ValueError(f"{table.name!r}: {ex}") from ex 584 585 action_p4 = self._encode_action(action) 586 587 if table.action_profile is not None: 588 # Promote action to ActionProfileActionSet entry with weight=1. 589 return p4r.TableAction( 590 action_profile_action_set=p4r.ActionProfileActionSet( 591 action_profile_actions=[ 592 p4r.ActionProfileAction(action=action_p4, weight=1) 593 ] 594 ) 595 ) 596 597 return p4r.TableAction(action=action_p4)
Encode TableAction data as protobuf.
If the table is indirect, promote the action to a "one-shot" indirect action.
608 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 609 "Encode Action data as protobuf." 610 action = schema.actions[self.name] 611 return self._encode_action(action)
Encode Action data as protobuf.
631 @classmethod 632 def decode_table_action( 633 cls, msg: p4r.TableAction, table: P4Table 634 ) -> Self | "P4IndirectAction": 635 "Decode protobuf to TableAction data." 636 match msg.WhichOneof("type"): 637 case "action": 638 return cls.decode_action(msg.action, table) 639 case "action_profile_member_id": 640 return P4IndirectAction(member_id=msg.action_profile_member_id) 641 case "action_profile_group_id": 642 return P4IndirectAction(group_id=msg.action_profile_group_id) 643 case "action_profile_action_set": 644 return P4IndirectAction.decode_action_set( 645 msg.action_profile_action_set, table 646 ) 647 case other: 648 raise ValueError(f"unknown oneof: {other!r}")
Decode protobuf to TableAction data.
650 @classmethod 651 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 652 "Decode protobuf to Action data." 653 action = parent.actions[msg.action_id] 654 args = {} 655 for param in msg.params: 656 action_param = action.params[param.param_id] 657 value = action_param.decode_param(param) 658 args[action_param.name] = value 659 660 return cls(action.alias, **args)
Decode protobuf to Action data.
662 def format_str(self, schema: P4Schema | P4Table) -> str: 663 """Format the table action as a human-readable string. 664 665 The result is formatted to look like a function call: 666 667 ``` 668 name(param1=value1, ...) 669 ``` 670 671 Where `name` is the action name, and `(param<N>, value<N>)` are the 672 action parameters. The format of `value<N>` is schema-dependent. 673 """ 674 aps = schema.actions[self.name].params 675 args = [ 676 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 677 ] 678 679 return f"{self.name}({', '.join(args)})"
Format the table action as a human-readable string.
The result is formatted to look like a function call:
name(param1=value1, ...)
Where name is the action name, and (param<N>, value<N>) are the
action parameters. The format of value<N> is schema-dependent.
1032@decodable("table_entry") 1033@dataclass(slots=True) 1034class P4TableEntry(_P4Writable): 1035 """Represents a P4Runtime table entry. 1036 1037 Attributes: 1038 table_id (str): Name of the table. 1039 match (P4TableMatch | None): Entry's match fields. 1040 action (P4TableAction | P4IndirectAction | None): Entry's action. 1041 is_default_action (bool): True if entry is the default table entry. 1042 priority (int): Priority of a table entry when match implies TCAM lookup. 1043 metadata (bytes): Arbitrary controller cookie (1.2.0). 1044 controller_metadata (int): Deprecated controller cookie (< 1.2.0). 1045 meter_config (P4MeterConfig | None): Meter configuration. 1046 counter_data (P4CounterData | None): Counter data for table entry. 1047 meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0). 1048 idle_timeout_ns (int): Idle timeout in nanoseconds. 1049 time_since_last_hit (int | None): Nanoseconds since entry last matched. 1050 is_const (bool): True if entry is constant (1.4.0). 1051 1052 The most commonly used fields are table_id, match, action, is_default_action, 1053 and priority. See the P4Runtime Spec for usage examples regarding the other 1054 attributes. 1055 1056 When writing a P4TableEntry, you can specify the type of update using '+', 1057 '-', and '~'. 1058 1059 Examples: 1060 ``` 1061 # Specify all tables when using "read". 1062 entry = fy.P4TableEntry() 1063 1064 # Specify the table named "ipv4" when using "read". 1065 entry = fy.P4TableEntry("ipv4") 1066 1067 # Specify the default entry in the "ipv4" table when using "read". 1068 entry = fy.P4TableEntry("ipv4", is_default_action=True) 1069 1070 # Insert an entry into the "ipv4" table. 1071 update = +fy.P4TableEntry( 1072 "ipv4", 1073 match=fy.Match(ipv4_dst="10.0.0.0/8"), 1074 action=fy.Action("forward", port=1), 1075 ) 1076 1077 # Modify the default action in the "ipv4" table. 1078 update = ~fy.P4TableEntry( 1079 "ipv4", 1080 action=fy.Action("forward", port=5), 1081 is_default_action=True 1082 ) 1083 ``` 1084 1085 Operators: 1086 You can retrieve a match field from a table entry using `[]`. For 1087 example, `entry["ipv4_dst"]` is the same as `entry.match["ipv4_dst"]`. 1088 1089 Formatting Helpers: 1090 The `match_str` and `action_str` methods provide P4Info-aware formatting 1091 of the match and action attributes. 1092 """ 1093 1094 table_id: str = "" 1095 "Name of the table." 1096 _: KW_ONLY 1097 match: P4TableMatch | None = None 1098 "Entry's match fields." 1099 action: P4TableAction | P4IndirectAction | None = None 1100 "Entry's action." 1101 is_default_action: bool = False 1102 "True if entry is the default table entry." 1103 priority: int = 0 1104 "Priority of a table entry when match implies TCAM lookup." 1105 metadata: bytes = b"" 1106 "Arbitrary controller cookie. (1.2.0)." 1107 controller_metadata: int = 0 1108 "Deprecated controller cookie (< 1.2.0)." 1109 meter_config: P4MeterConfig | None = None 1110 "Meter configuration." 1111 counter_data: P4CounterData | None = None 1112 "Counter data for table entry." 1113 meter_counter_data: P4MeterCounterData | None = None 1114 "Meter counter data (1.4.0)." 1115 idle_timeout_ns: int = 0 1116 "Idle timeout in nanoseconds." 1117 time_since_last_hit: int | None = None 1118 "Nanoseconds since entry last matched." 1119 is_const: bool = False 1120 "True if entry is constant (1.4.0)." 1121 1122 def __getitem__(self, key: str) -> Any: 1123 "Convenience accessor to retrieve a value from the `match` property." 1124 if self.match is not None: 1125 return self.match[key] 1126 raise KeyError(key) 1127 1128 def encode(self, schema: P4Schema) -> p4r.Entity: 1129 "Encode TableEntry data as protobuf." 1130 return p4r.Entity(table_entry=self.encode_entry(schema)) 1131 1132 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 1133 "Encode TableEntry data as protobuf." 1134 if not self.table_id: 1135 return self._encode_empty() 1136 1137 table = schema.tables[self.table_id] 1138 1139 if self.match: 1140 match = self.match.encode(table) 1141 else: 1142 match = None 1143 1144 if self.action: 1145 action = self.action.encode_table_action(table) 1146 else: 1147 action = None 1148 1149 if self.meter_config: 1150 meter_config = self.meter_config.encode() 1151 else: 1152 meter_config = None 1153 1154 if self.counter_data: 1155 counter_data = self.counter_data.encode() 1156 else: 1157 counter_data = None 1158 1159 if self.meter_counter_data: 1160 meter_counter_data = self.meter_counter_data.encode() 1161 else: 1162 meter_counter_data = None 1163 1164 if self.time_since_last_hit is not None: 1165 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1166 elapsed_ns=self.time_since_last_hit 1167 ) 1168 else: 1169 time_since_last_hit = None 1170 1171 return p4r.TableEntry( 1172 table_id=table.id, 1173 match=match, 1174 action=action, 1175 priority=self.priority, 1176 controller_metadata=self.controller_metadata, 1177 meter_config=meter_config, 1178 counter_data=counter_data, 1179 meter_counter_data=meter_counter_data, 1180 is_default_action=self.is_default_action, 1181 idle_timeout_ns=self.idle_timeout_ns, 1182 time_since_last_hit=time_since_last_hit, 1183 metadata=self.metadata, 1184 is_const=self.is_const, 1185 ) 1186 1187 def _encode_empty(self) -> p4r.TableEntry: 1188 "Encode an empty wildcard request." 1189 if self.counter_data is not None: 1190 counter_data = self.counter_data.encode() 1191 else: 1192 counter_data = None 1193 1194 # FIXME: time_since_last_hit not supported for wildcard reads? 1195 if self.time_since_last_hit is not None: 1196 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1197 elapsed_ns=self.time_since_last_hit 1198 ) 1199 else: 1200 time_since_last_hit = None 1201 1202 return p4r.TableEntry( 1203 counter_data=counter_data, 1204 time_since_last_hit=time_since_last_hit, 1205 ) 1206 1207 @classmethod 1208 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1209 "Decode protobuf to TableEntry data." 1210 return cls.decode_entry(msg.table_entry, schema) 1211 1212 @classmethod 1213 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1214 "Decode protobuf to TableEntry data." 1215 if entry.table_id == 0: 1216 return cls("") 1217 1218 table = schema.tables[entry.table_id] 1219 1220 if entry.match: 1221 match = P4TableMatch.decode(entry.match, table) 1222 else: 1223 match = None 1224 1225 if entry.HasField("action"): 1226 action = P4TableAction.decode_table_action(entry.action, table) 1227 else: 1228 action = None 1229 1230 if entry.HasField("time_since_last_hit"): 1231 last_hit = entry.time_since_last_hit.elapsed_ns 1232 else: 1233 last_hit = None 1234 1235 if entry.HasField("meter_config"): 1236 meter_config = P4MeterConfig.decode(entry.meter_config) 1237 else: 1238 meter_config = None 1239 1240 if entry.HasField("counter_data"): 1241 counter_data = P4CounterData.decode(entry.counter_data) 1242 else: 1243 counter_data = None 1244 1245 if entry.HasField("meter_counter_data"): 1246 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1247 else: 1248 meter_counter_data = None 1249 1250 return cls( 1251 table_id=table.alias, 1252 match=match, 1253 action=action, 1254 priority=entry.priority, 1255 controller_metadata=entry.controller_metadata, 1256 meter_config=meter_config, 1257 counter_data=counter_data, 1258 meter_counter_data=meter_counter_data, 1259 is_default_action=entry.is_default_action, 1260 idle_timeout_ns=entry.idle_timeout_ns, 1261 time_since_last_hit=last_hit, 1262 metadata=entry.metadata, 1263 is_const=entry.is_const, 1264 ) 1265 1266 def match_dict( 1267 self, 1268 schema: P4Schema, 1269 *, 1270 wildcard: str | None = None, 1271 ) -> dict[str, str]: 1272 """Format the match fields as a dictionary of strings. 1273 1274 If `wildcard` is None, only include match fields that have values. If 1275 `wildcard` is set, include all field names but replace unset values with 1276 given wildcard value (e.g. "*") 1277 """ 1278 table = schema.tables[self.table_id] 1279 if self.match is not None: 1280 return self.match.format_dict(table, wildcard=wildcard) 1281 return P4TableMatch().format_dict(table, wildcard=wildcard) 1282 1283 def match_str( 1284 self, 1285 schema: P4Schema, 1286 *, 1287 wildcard: str | None = None, 1288 ) -> str: 1289 "Format the match fields as a human-readable, canonical string." 1290 table = schema.tables[self.table_id] 1291 if self.match is not None: 1292 return self.match.format_str(table, wildcard=wildcard) 1293 return P4TableMatch().format_str(table, wildcard=wildcard) 1294 1295 def action_str(self, schema: P4Schema) -> str: 1296 "Format the actions as a human-readable, canonical string." 1297 table = schema.tables[self.table_id] 1298 if self.action is None: 1299 return NOACTION_STR 1300 return self.action.format_str(table)
Represents a P4Runtime table entry.
Attributes:
- table_id (str): Name of the table.
- match (P4TableMatch | None): Entry's match fields.
- action (P4TableAction | P4IndirectAction | None): Entry's action.
- is_default_action (bool): True if entry is the default table entry.
- priority (int): Priority of a table entry when match implies TCAM lookup.
- metadata (bytes): Arbitrary controller cookie (1.2.0).
- controller_metadata (int): Deprecated controller cookie (< 1.2.0).
- meter_config (P4MeterConfig | None): Meter configuration.
- counter_data (P4CounterData | None): Counter data for table entry.
- meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0).
- idle_timeout_ns (int): Idle timeout in nanoseconds.
- time_since_last_hit (int | None): Nanoseconds since entry last matched.
- is_const (bool): True if entry is constant (1.4.0).
The most commonly used fields are table_id, match, action, is_default_action, and priority. See the P4Runtime Spec for usage examples regarding the other attributes.
When writing a P4TableEntry, you can specify the type of update using '+', '-', and '~'.
Examples:
# Specify all tables when using "read".
entry = fy.P4TableEntry()
# Specify the table named "ipv4" when using "read".
entry = fy.P4TableEntry("ipv4")
# Specify the default entry in the "ipv4" table when using "read".
entry = fy.P4TableEntry("ipv4", is_default_action=True)
# Insert an entry into the "ipv4" table.
update = +fy.P4TableEntry(
"ipv4",
match=fy.Match(ipv4_dst="10.0.0.0/8"),
action=fy.Action("forward", port=1),
)
# Modify the default action in the "ipv4" table.
update = ~fy.P4TableEntry(
"ipv4",
action=fy.Action("forward", port=5),
is_default_action=True
)
Operators:
You can retrieve a match field from a table entry using
[]. For example,entry["ipv4_dst"]is the same asentry.match["ipv4_dst"].
Formatting Helpers:
The
match_strandaction_strmethods provide P4Info-aware formatting of the match and action attributes.
1122 def __getitem__(self, key: str) -> Any: 1123 "Convenience accessor to retrieve a value from the `match` property." 1124 if self.match is not None: 1125 return self.match[key] 1126 raise KeyError(key)
Convenience accessor to retrieve a value from the match property.
1128 def encode(self, schema: P4Schema) -> p4r.Entity: 1129 "Encode TableEntry data as protobuf." 1130 return p4r.Entity(table_entry=self.encode_entry(schema))
Encode TableEntry data as protobuf.
1132 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 1133 "Encode TableEntry data as protobuf." 1134 if not self.table_id: 1135 return self._encode_empty() 1136 1137 table = schema.tables[self.table_id] 1138 1139 if self.match: 1140 match = self.match.encode(table) 1141 else: 1142 match = None 1143 1144 if self.action: 1145 action = self.action.encode_table_action(table) 1146 else: 1147 action = None 1148 1149 if self.meter_config: 1150 meter_config = self.meter_config.encode() 1151 else: 1152 meter_config = None 1153 1154 if self.counter_data: 1155 counter_data = self.counter_data.encode() 1156 else: 1157 counter_data = None 1158 1159 if self.meter_counter_data: 1160 meter_counter_data = self.meter_counter_data.encode() 1161 else: 1162 meter_counter_data = None 1163 1164 if self.time_since_last_hit is not None: 1165 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1166 elapsed_ns=self.time_since_last_hit 1167 ) 1168 else: 1169 time_since_last_hit = None 1170 1171 return p4r.TableEntry( 1172 table_id=table.id, 1173 match=match, 1174 action=action, 1175 priority=self.priority, 1176 controller_metadata=self.controller_metadata, 1177 meter_config=meter_config, 1178 counter_data=counter_data, 1179 meter_counter_data=meter_counter_data, 1180 is_default_action=self.is_default_action, 1181 idle_timeout_ns=self.idle_timeout_ns, 1182 time_since_last_hit=time_since_last_hit, 1183 metadata=self.metadata, 1184 is_const=self.is_const, 1185 )
Encode TableEntry data as protobuf.
1207 @classmethod 1208 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1209 "Decode protobuf to TableEntry data." 1210 return cls.decode_entry(msg.table_entry, schema)
Decode protobuf to TableEntry data.
1212 @classmethod 1213 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1214 "Decode protobuf to TableEntry data." 1215 if entry.table_id == 0: 1216 return cls("") 1217 1218 table = schema.tables[entry.table_id] 1219 1220 if entry.match: 1221 match = P4TableMatch.decode(entry.match, table) 1222 else: 1223 match = None 1224 1225 if entry.HasField("action"): 1226 action = P4TableAction.decode_table_action(entry.action, table) 1227 else: 1228 action = None 1229 1230 if entry.HasField("time_since_last_hit"): 1231 last_hit = entry.time_since_last_hit.elapsed_ns 1232 else: 1233 last_hit = None 1234 1235 if entry.HasField("meter_config"): 1236 meter_config = P4MeterConfig.decode(entry.meter_config) 1237 else: 1238 meter_config = None 1239 1240 if entry.HasField("counter_data"): 1241 counter_data = P4CounterData.decode(entry.counter_data) 1242 else: 1243 counter_data = None 1244 1245 if entry.HasField("meter_counter_data"): 1246 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1247 else: 1248 meter_counter_data = None 1249 1250 return cls( 1251 table_id=table.alias, 1252 match=match, 1253 action=action, 1254 priority=entry.priority, 1255 controller_metadata=entry.controller_metadata, 1256 meter_config=meter_config, 1257 counter_data=counter_data, 1258 meter_counter_data=meter_counter_data, 1259 is_default_action=entry.is_default_action, 1260 idle_timeout_ns=entry.idle_timeout_ns, 1261 time_since_last_hit=last_hit, 1262 metadata=entry.metadata, 1263 is_const=entry.is_const, 1264 )
Decode protobuf to TableEntry data.
1266 def match_dict( 1267 self, 1268 schema: P4Schema, 1269 *, 1270 wildcard: str | None = None, 1271 ) -> dict[str, str]: 1272 """Format the match fields as a dictionary of strings. 1273 1274 If `wildcard` is None, only include match fields that have values. If 1275 `wildcard` is set, include all field names but replace unset values with 1276 given wildcard value (e.g. "*") 1277 """ 1278 table = schema.tables[self.table_id] 1279 if self.match is not None: 1280 return self.match.format_dict(table, wildcard=wildcard) 1281 return P4TableMatch().format_dict(table, wildcard=wildcard)
Format the match fields as a dictionary of strings.
If wildcard is None, only include match fields that have values. If
wildcard is set, include all field names but replace unset values with
given wildcard value (e.g. "*")
1283 def match_str( 1284 self, 1285 schema: P4Schema, 1286 *, 1287 wildcard: str | None = None, 1288 ) -> str: 1289 "Format the match fields as a human-readable, canonical string." 1290 table = schema.tables[self.table_id] 1291 if self.match is not None: 1292 return self.match.format_str(table, wildcard=wildcard) 1293 return P4TableMatch().format_str(table, wildcard=wildcard)
Format the match fields as a human-readable, canonical string.
1295 def action_str(self, schema: P4Schema) -> str: 1296 "Format the actions as a human-readable, canonical string." 1297 table = schema.tables[self.table_id] 1298 if self.action is None: 1299 return NOACTION_STR 1300 return self.action.format_str(table)
Format the actions as a human-readable, canonical string.
Match is an alias for P4TableMatch.
376class P4TableMatch(dict[str, Any]): 377 """Represents a set of P4Runtime field matches. 378 379 Each match field is stored as a dictionary key, where the key is the name 380 of the match field. The field's value should be appropriate for the type 381 of match (EXACT, LPM, TERNARY, etc.) 382 383 Construct a match similar to a dictionary. 384 385 Example: 386 ``` 387 # Keyword arguments: 388 match = P4TableMatch(ipv4_dst="10.0.0.1") 389 390 # Dictionary argument: 391 match = P4TableMatch({"ipv4_dst": "10.0.0.1"}) 392 393 # List of 2-tuples: 394 match = P4TableMatch([("ipv4_dst", "10.0.0.1")]) 395 ``` 396 397 P4TableMatch is implemented as a subclass of `dict`. It supports all of the 398 standard dictionary methods: 399 ``` 400 match = P4TableMatch() 401 match["ipv4_dst"] = "10.0.0.1" 402 assert len(match) == 1 403 ``` 404 405 Reference "9.1.1 Match Format": 406 Each match field is translated to a FieldMatch Protobuf message by 407 translating the entry key to a `field_id`. The type of match (EXACT, 408 LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the 409 value is converted to the Protobuf representation. 410 411 Supported String Values: 412 ``` 413 EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01" 414 415 LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128", 416 "00:00:00:00:00:01/48" 417 (+ all exact formats are promoted to all-1 masks) 418 419 TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255", 420 "2000::1/&128", "00:00:00:00:00:01/&48" 421 (+ all exact formats are promoted to all-1 masks) 422 (+ all lpm formats are promoted to the specified contiguous mask) 423 424 RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9", 425 "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09" 426 (+ all exact formats are promoted to single-value ranges) 427 428 OPTIONAL: Same as exact format. 429 ``` 430 431 See the `p4values.py` module for all supported value classes. 432 433 TODO: 434 - Change range delimiter to '-' (and drop '-' delimited MAC's). 435 - Consider supporting ternary values with just '/' (and drop support 436 for decimal masks; mask must be hexadecimal number). 437 438 See Also: 439 - P4TableEntry 440 """ 441 442 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 443 "Encode TableMatch data as a list of Protobuf fields." 444 result: list[p4r.FieldMatch] = [] 445 match_fields = table.match_fields 446 447 for key, value in self.items(): 448 try: 449 field = match_fields[key].encode_field(value) 450 if field is not None: 451 result.append(field) 452 except Exception as ex: 453 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 454 455 return result 456 457 @classmethod 458 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 459 "Decode Protobuf fields as TableMatch data." 460 result: dict[str, Any] = {} 461 match_fields = table.match_fields 462 463 for field in msgs: 464 fld = match_fields[field.field_id] 465 result[fld.alias] = fld.decode_field(field) 466 467 return cls(result) 468 469 def format_dict( 470 self, 471 table: P4Table, 472 *, 473 wildcard: str | None = None, 474 ) -> dict[str, str]: 475 """Format the table match fields as a human-readable dictionary. 476 477 The result is a dictionary showing the TableMatch data for fields 478 included in the match. If `wildcard` is specified, all fields defined 479 in P4Info will be included with their value set to the wildcard string. 480 481 Values are formatted using the format/type specified in P4Info. 482 """ 483 result: dict[str, str] = {} 484 485 for fld in table.match_fields: 486 value = self.get(fld.alias, None) 487 if value is not None: 488 result[fld.alias] = fld.format_field(value) 489 elif wildcard is not None: 490 result[fld.alias] = wildcard 491 492 return result 493 494 def format_str( 495 self, 496 table: P4Table, 497 *, 498 wildcard: str | None = None, 499 ) -> str: 500 """Format the table match fields as a human-readable string. 501 502 The result is a string showing the TableMatch data for fields included 503 in the match. If `wildcard` is specified, all fields defined in P4Info 504 will be included with their value set to the wildcard string. 505 506 All fields are formatted as "name=value" and they are delimited by 507 spaces. 508 509 Values are formatted using the format/type specified in P4Info. 510 """ 511 result: list[str] = [] 512 513 for fld in table.match_fields: 514 value = self.get(fld.alias, None) 515 if value is not None: 516 result.append(f"{fld.alias}={fld.format_field(value)}") 517 elif wildcard is not None: 518 result.append(f"{fld.alias}={wildcard}") 519 520 return " ".join(result)
Represents a set of P4Runtime field matches.
Each match field is stored as a dictionary key, where the key is the name of the match field. The field's value should be appropriate for the type of match (EXACT, LPM, TERNARY, etc.)
Construct a match similar to a dictionary.
Example:
# Keyword arguments:
match = P4TableMatch(ipv4_dst="10.0.0.1")
# Dictionary argument:
match = P4TableMatch({"ipv4_dst": "10.0.0.1"})
# List of 2-tuples:
match = P4TableMatch([("ipv4_dst", "10.0.0.1")])
P4TableMatch is implemented as a subclass of dict. It supports all of the
standard dictionary methods:
match = P4TableMatch()
match["ipv4_dst"] = "10.0.0.1"
assert len(match) == 1
Reference "9.1.1 Match Format":
Each match field is translated to a FieldMatch Protobuf message by
translating the entry key to a field_id. The type of match (EXACT,
LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the
value is converted to the Protobuf representation.
Supported String Values:
EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"
LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
"00:00:00:00:00:01/48"
(+ all exact formats are promoted to all-1 masks)
TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
"2000::1/&128", "00:00:00:00:00:01/&48"
(+ all exact formats are promoted to all-1 masks)
(+ all lpm formats are promoted to the specified contiguous mask)
RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
"2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
(+ all exact formats are promoted to single-value ranges)
OPTIONAL: Same as exact format.
See the p4values.py module for all supported value classes.
TODO:
- Change range delimiter to '-' (and drop '-' delimited MAC's).
- Consider supporting ternary values with just '/' (and drop support for decimal masks; mask must be hexadecimal number).
See Also:
- P4TableEntry
442 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 443 "Encode TableMatch data as a list of Protobuf fields." 444 result: list[p4r.FieldMatch] = [] 445 match_fields = table.match_fields 446 447 for key, value in self.items(): 448 try: 449 field = match_fields[key].encode_field(value) 450 if field is not None: 451 result.append(field) 452 except Exception as ex: 453 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 454 455 return result
Encode TableMatch data as a list of Protobuf fields.
457 @classmethod 458 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 459 "Decode Protobuf fields as TableMatch data." 460 result: dict[str, Any] = {} 461 match_fields = table.match_fields 462 463 for field in msgs: 464 fld = match_fields[field.field_id] 465 result[fld.alias] = fld.decode_field(field) 466 467 return cls(result)
Decode Protobuf fields as TableMatch data.
469 def format_dict( 470 self, 471 table: P4Table, 472 *, 473 wildcard: str | None = None, 474 ) -> dict[str, str]: 475 """Format the table match fields as a human-readable dictionary. 476 477 The result is a dictionary showing the TableMatch data for fields 478 included in the match. If `wildcard` is specified, all fields defined 479 in P4Info will be included with their value set to the wildcard string. 480 481 Values are formatted using the format/type specified in P4Info. 482 """ 483 result: dict[str, str] = {} 484 485 for fld in table.match_fields: 486 value = self.get(fld.alias, None) 487 if value is not None: 488 result[fld.alias] = fld.format_field(value) 489 elif wildcard is not None: 490 result[fld.alias] = wildcard 491 492 return result
Format the table match fields as a human-readable dictionary.
The result is a dictionary showing the TableMatch data for fields
included in the match. If wildcard is specified, all fields defined
in P4Info will be included with their value set to the wildcard string.
Values are formatted using the format/type specified in P4Info.
494 def format_str( 495 self, 496 table: P4Table, 497 *, 498 wildcard: str | None = None, 499 ) -> str: 500 """Format the table match fields as a human-readable string. 501 502 The result is a string showing the TableMatch data for fields included 503 in the match. If `wildcard` is specified, all fields defined in P4Info 504 will be included with their value set to the wildcard string. 505 506 All fields are formatted as "name=value" and they are delimited by 507 spaces. 508 509 Values are formatted using the format/type specified in P4Info. 510 """ 511 result: list[str] = [] 512 513 for fld in table.match_fields: 514 value = self.get(fld.alias, None) 515 if value is not None: 516 result.append(f"{fld.alias}={fld.format_field(value)}") 517 elif wildcard is not None: 518 result.append(f"{fld.alias}={wildcard}") 519 520 return " ".join(result)
Format the table match fields as a human-readable string.
The result is a string showing the TableMatch data for fields included
in the match. If wildcard is specified, all fields defined in P4Info
will be included with their value set to the wildcard string.
All fields are formatted as "name=value" and they are delimited by spaces.
Values are formatted using the format/type specified in P4Info.
2001@decodable("value_set_entry") 2002@dataclass(slots=True) 2003class P4ValueSetEntry(_P4ModifyOnly): 2004 "Represents a P4Runtime ValueSetEntry." 2005 2006 value_set_id: str 2007 _: KW_ONLY 2008 members: list[P4ValueSetMember] 2009 2010 def encode(self, schema: P4Schema) -> p4r.Entity: 2011 "Encode P4ValueSetEntry as protobuf." 2012 value_set = schema.value_sets[self.value_set_id] 2013 members = [ 2014 p4r.ValueSetMember(match=member.encode(value_set)) 2015 for member in self.members 2016 ] 2017 2018 return p4r.Entity( 2019 value_set_entry=p4r.ValueSetEntry( 2020 value_set_id=value_set.id, members=members 2021 ) 2022 ) 2023 2024 @classmethod 2025 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2026 "Decode protobuf to P4ValueSetEntry." 2027 entry = msg.value_set_entry 2028 value_set = schema.value_sets[entry.value_set_id] 2029 2030 members = [ 2031 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 2032 ] 2033 2034 return cls(value_set.alias, members=members)
Represents a P4Runtime ValueSetEntry.
2010 def encode(self, schema: P4Schema) -> p4r.Entity: 2011 "Encode P4ValueSetEntry as protobuf." 2012 value_set = schema.value_sets[self.value_set_id] 2013 members = [ 2014 p4r.ValueSetMember(match=member.encode(value_set)) 2015 for member in self.members 2016 ] 2017 2018 return p4r.Entity( 2019 value_set_entry=p4r.ValueSetEntry( 2020 value_set_id=value_set.id, members=members 2021 ) 2022 )
Encode P4ValueSetEntry as protobuf.
2024 @classmethod 2025 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2026 "Decode protobuf to P4ValueSetEntry." 2027 entry = msg.value_set_entry 2028 value_set = schema.value_sets[entry.value_set_id] 2029 2030 members = [ 2031 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 2032 ] 2033 2034 return cls(value_set.alias, members=members)
Decode protobuf to P4ValueSetEntry.
164class P4ActionSelectionMode(_EnumBase): 165 "IntEnum equivalent to `p4r.ActionProfileActionSet.ActionSelectionMode` (1.5.0)." 166 167 DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR = ( 168 p4r.ActionProfileActionSet.ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 169 ) 170 HASH = p4r.ActionProfileActionSet.ActionSelectionMode.HASH 171 RANDOM = p4r.ActionProfileActionSet.ActionSelectionMode.RANDOM 172 173 def vt(self) -> p4r.ActionProfileActionSet.ActionSelectionMode.ValueType: 174 "Cast `self` to `ValueType`." 175 return cast(p4r.ActionProfileActionSet.ActionSelectionMode.ValueType, self)
IntEnum equivalent to p4r.ActionProfileActionSet.ActionSelectionMode (1.5.0).
178class P4ActionSizeSemantics(_EnumBase): 179 "IntEnum equivalent to `p4r.ActionProfileActionSet.SizeSemantics` (1.5.0)." 180 181 DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR = ( 182 p4r.ActionProfileActionSet.SizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 183 ) 184 SUM_OF_WEIGHTS = p4r.ActionProfileActionSet.SizeSemantics.SUM_OF_WEIGHTS 185 SUM_OF_MEMBERS = p4r.ActionProfileActionSet.SizeSemantics.SUM_OF_MEMBERS 186 187 def vt(self) -> p4r.ActionProfileActionSet.SizeSemantics.ValueType: 188 "Cast `self` to `ValueType`." 189 return cast(p4r.ActionProfileActionSet.SizeSemantics.ValueType, self)
IntEnum equivalent to p4r.ActionProfileActionSet.SizeSemantics (1.5.0).
126class P4ConfigAction(_EnumBase): 127 "IntEnum equivalent to `p4r.SetForwardingPipelineConfigRequest.Action`." 128 129 UNSPECIFIED = p4r.SetForwardingPipelineConfigRequest.Action.UNSPECIFIED 130 VERIFY = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY 131 VERIFY_AND_SAVE = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_SAVE 132 VERIFY_AND_COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT 133 COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.COMMIT 134 RECONCILE_AND_COMMIT = ( 135 p4r.SetForwardingPipelineConfigRequest.Action.RECONCILE_AND_COMMIT 136 ) 137 138 def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType: 139 "Cast `self` to `ValueType`." 140 return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)
IntEnum equivalent to p4r.SetForwardingPipelineConfigRequest.Action.
532class P4RuntimeVersion(NamedTuple): 533 "Represents the semantic version of the P4Runtime protocol." 534 535 major: int 536 minor: int 537 patch: int 538 extra: str # optional pre-release/build info 539 540 @classmethod 541 def parse(cls, version_str: str) -> Self: 542 "Parse the P4Runtime version string." 543 m = _P4RUNTIME_SEMVER_REGEX.match(version_str.strip()) 544 if not m: 545 raise ValueError(f"unexpected P4Runtime version: {version_str}") 546 return cls(int(m[1]), int(m[2]), int(m[3] or "0"), m[4]) 547 548 def __str__(self) -> str: 549 "Return the version string." 550 vers = self[:3] # pylint: disable=unsubscriptable-object 551 return ".".join(map(str, vers)) + self.extra
Represents the semantic version of the P4Runtime protocol.
Create new instance of P4RuntimeVersion(major, minor, patch, extra)
540 @classmethod 541 def parse(cls, version_str: str) -> Self: 542 "Parse the P4Runtime version string." 543 m = _P4RUNTIME_SEMVER_REGEX.match(version_str.strip()) 544 if not m: 545 raise ValueError(f"unexpected P4Runtime version: {version_str}") 546 return cls(int(m[1]), int(m[2]), int(m[3] or "0"), m[4])
Parse the P4Runtime version string.
554class P4Schema(_ReprMixin): 555 """Represents a P4Info file and its associated P4 blob (optional). 556 557 ``` 558 p4 = P4Schema(Path("basic.p4info.txtpb")) 559 ``` 560 561 This class parses the P4Info contents to produce an in-memory representation 562 of the tables, actions, types, etc. inside. This in-memory graph of the 563 contents of the P4Info file may be shared when we parse identical 564 P4Info files. The sharing of P4Info data is controlled by the 565 `P4SchemaCache` class. 566 """ 567 568 _p4info: p4i.P4Info | None 569 _p4blob: Path | bytes | SupportsBytes | None 570 _p4defs: _P4Defs # possibly shared in-memory representation 571 _p4cookie: int = 0 572 _p4runtime_version: P4RuntimeVersion = P4RuntimeVersion(1, 3, 0, "") 573 574 def __init__( 575 self, 576 p4info: p4i.P4Info | Path | None = None, 577 p4blob: Path | bytes | SupportsBytes | None = None, 578 ): 579 self._p4blob = p4blob 580 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 581 p4info, 582 self._p4blob, 583 ) 584 585 @property 586 def exists(self) -> bool: 587 "True if p4info is configured." 588 return self._p4info is not None 589 590 @property 591 def is_authoritative(self) -> bool: 592 "True if both p4info and p4blob are configured." 593 return self._p4info is not None and self._p4blob is not None 594 595 @property 596 def p4info(self) -> p4i.P4Info: 597 "P4Info value." 598 if self._p4info is None: 599 raise ValueError("No P4Info configured.") 600 return self._p4info 601 602 def set_p4info(self, p4info: p4i.P4Info) -> None: 603 "Set P4Info using value returned from switch." 604 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 605 p4info, 606 self._p4blob, 607 ) 608 609 def has_p4info(self, p4info: p4i.P4Info) -> bool: 610 "Return true if the current P4Info equals the given P4Info." 611 if self._p4info is None: 612 return False 613 return self._p4info.SerializeToString( 614 deterministic=True 615 ) == p4info.SerializeToString(deterministic=True) 616 617 @property 618 def p4blob(self) -> bytes: 619 "P4Blob value a.k.a p4_device_config." 620 return _blob_bytes(self._p4blob) 621 622 @property 623 def p4cookie(self) -> int: 624 """Cookie value for p4info and p4blob.""" 625 return self._p4cookie 626 627 @property 628 def p4runtime_version(self) -> P4RuntimeVersion: 629 "Current P4Runtime protocol version to assume." 630 return self._p4runtime_version 631 632 def set_p4runtime_version(self, version: P4RuntimeVersion): 633 "Set P4Runtime protocol version using value returned from switch." 634 self._p4runtime_version = version 635 636 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 637 """The forwarding pipeline configuration.""" 638 return p4r.ForwardingPipelineConfig( 639 p4info=self.p4info, 640 p4_device_config=self.p4blob, 641 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 642 ) 643 644 def get_pipeline_info(self) -> str: 645 "Concise string description of the pipeline (suitable for logging)." 646 if self.exists: 647 pipeline = self.name 648 version = self.version 649 arch = self.arch 650 return f"{pipeline=} {version=} {arch=}" 651 652 return "<No pipeline exists>" 653 654 @property 655 def name(self) -> str: 656 "Name from pkg_info." 657 if self._p4info is None: 658 return "" 659 return self._p4info.pkg_info.name 660 661 @property 662 def version(self) -> str: 663 "Version from pkg_info." 664 if self._p4info is None: 665 return "" 666 return self._p4info.pkg_info.version 667 668 @property 669 def arch(self) -> str: 670 "Arch from pkg_info." 671 if self._p4info is None: 672 return "" 673 return self._p4info.pkg_info.arch 674 675 @property 676 def pkg_info(self) -> p4i.PkgInfo: 677 """Protobuf message containing original `PkgInfo` header. 678 679 Use this to access less frequently used fields like `contact`, `url`, 680 and `platform_properties`. 681 """ 682 if self._p4info is None: 683 raise ValueError("P4Info: No pipeline configured") 684 return self._p4info.pkg_info 685 686 @property 687 def tables(self) -> P4EntityMap["P4Table"]: 688 "Collection of P4 tables." 689 return self._p4defs.tables 690 691 @property 692 def actions(self) -> P4EntityMap["P4Action"]: 693 "Collection of P4 actions." 694 return self._p4defs.actions 695 696 @property 697 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 698 "Collection of P4 action profiles." 699 return self._p4defs.action_profiles 700 701 @property 702 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 703 "Collection of P4 controller packet metadata." 704 return self._p4defs.controller_packet_metadata 705 706 @property 707 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 708 "Collection of P4 direct counters." 709 return self._p4defs.direct_counters 710 711 @property 712 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 713 "Collection of P4 direct meters." 714 return self._p4defs.direct_meters 715 716 @property 717 def counters(self) -> P4EntityMap["P4Counter"]: 718 "Collection of P4 counters." 719 return self._p4defs.counters 720 721 @property 722 def meters(self) -> P4EntityMap["P4Meter"]: 723 "Collection of P4 meters." 724 return self._p4defs.meters 725 726 @property 727 def registers(self) -> P4EntityMap["P4Register"]: 728 "Collection of P4 registers." 729 return self._p4defs.registers 730 731 @property 732 def digests(self) -> P4EntityMap["P4Digest"]: 733 "Collection of P4 digests." 734 return self._p4defs.digests 735 736 @property 737 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 738 "Collection of P4 value sets." 739 return self._p4defs.value_sets 740 741 @property 742 def type_info(self) -> "P4TypeInfo": 743 "Type Info object." 744 return self._p4defs.type_info 745 746 @property 747 def externs(self) -> "P4ExternMap": 748 "Collection of P4 extern instances." 749 return self._p4defs.externs 750 751 def __str__(self) -> str: 752 if self._p4info is None: 753 return "<P4Info: No pipeline configured>" 754 return str(P4SchemaDescription(self))
Represents a P4Info file and its associated P4 blob (optional).
p4 = P4Schema(Path("basic.p4info.txtpb"))
This class parses the P4Info contents to produce an in-memory representation
of the tables, actions, types, etc. inside. This in-memory graph of the
contents of the P4Info file may be shared when we parse identical
P4Info files. The sharing of P4Info data is controlled by the
P4SchemaCache class.
585 @property 586 def exists(self) -> bool: 587 "True if p4info is configured." 588 return self._p4info is not None
True if p4info is configured.
595 @property 596 def p4info(self) -> p4i.P4Info: 597 "P4Info value." 598 if self._p4info is None: 599 raise ValueError("No P4Info configured.") 600 return self._p4info
P4Info value.
602 def set_p4info(self, p4info: p4i.P4Info) -> None: 603 "Set P4Info using value returned from switch." 604 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 605 p4info, 606 self._p4blob, 607 )
Set P4Info using value returned from switch.
609 def has_p4info(self, p4info: p4i.P4Info) -> bool: 610 "Return true if the current P4Info equals the given P4Info." 611 if self._p4info is None: 612 return False 613 return self._p4info.SerializeToString( 614 deterministic=True 615 ) == p4info.SerializeToString(deterministic=True)
Return true if the current P4Info equals the given P4Info.
617 @property 618 def p4blob(self) -> bytes: 619 "P4Blob value a.k.a p4_device_config." 620 return _blob_bytes(self._p4blob)
P4Blob value a.k.a p4_device_config.
627 @property 628 def p4runtime_version(self) -> P4RuntimeVersion: 629 "Current P4Runtime protocol version to assume." 630 return self._p4runtime_version
Current P4Runtime protocol version to assume.
632 def set_p4runtime_version(self, version: P4RuntimeVersion): 633 "Set P4Runtime protocol version using value returned from switch." 634 self._p4runtime_version = version
Set P4Runtime protocol version using value returned from switch.
636 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 637 """The forwarding pipeline configuration.""" 638 return p4r.ForwardingPipelineConfig( 639 p4info=self.p4info, 640 p4_device_config=self.p4blob, 641 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 642 )
The forwarding pipeline configuration.
644 def get_pipeline_info(self) -> str: 645 "Concise string description of the pipeline (suitable for logging)." 646 if self.exists: 647 pipeline = self.name 648 version = self.version 649 arch = self.arch 650 return f"{pipeline=} {version=} {arch=}" 651 652 return "<No pipeline exists>"
Concise string description of the pipeline (suitable for logging).
654 @property 655 def name(self) -> str: 656 "Name from pkg_info." 657 if self._p4info is None: 658 return "" 659 return self._p4info.pkg_info.name
Name from pkg_info.
661 @property 662 def version(self) -> str: 663 "Version from pkg_info." 664 if self._p4info is None: 665 return "" 666 return self._p4info.pkg_info.version
Version from pkg_info.
668 @property 669 def arch(self) -> str: 670 "Arch from pkg_info." 671 if self._p4info is None: 672 return "" 673 return self._p4info.pkg_info.arch
Arch from pkg_info.
675 @property 676 def pkg_info(self) -> p4i.PkgInfo: 677 """Protobuf message containing original `PkgInfo` header. 678 679 Use this to access less frequently used fields like `contact`, `url`, 680 and `platform_properties`. 681 """ 682 if self._p4info is None: 683 raise ValueError("P4Info: No pipeline configured") 684 return self._p4info.pkg_info
Protobuf message containing original PkgInfo header.
Use this to access less frequently used fields like contact, url,
and platform_properties.
686 @property 687 def tables(self) -> P4EntityMap["P4Table"]: 688 "Collection of P4 tables." 689 return self._p4defs.tables
Collection of P4 tables.
691 @property 692 def actions(self) -> P4EntityMap["P4Action"]: 693 "Collection of P4 actions." 694 return self._p4defs.actions
Collection of P4 actions.
696 @property 697 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 698 "Collection of P4 action profiles." 699 return self._p4defs.action_profiles
Collection of P4 action profiles.
701 @property 702 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 703 "Collection of P4 controller packet metadata." 704 return self._p4defs.controller_packet_metadata
Collection of P4 controller packet metadata.
706 @property 707 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 708 "Collection of P4 direct counters." 709 return self._p4defs.direct_counters
Collection of P4 direct counters.
711 @property 712 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 713 "Collection of P4 direct meters." 714 return self._p4defs.direct_meters
Collection of P4 direct meters.
716 @property 717 def counters(self) -> P4EntityMap["P4Counter"]: 718 "Collection of P4 counters." 719 return self._p4defs.counters
Collection of P4 counters.
721 @property 722 def meters(self) -> P4EntityMap["P4Meter"]: 723 "Collection of P4 meters." 724 return self._p4defs.meters
Collection of P4 meters.
726 @property 727 def registers(self) -> P4EntityMap["P4Register"]: 728 "Collection of P4 registers." 729 return self._p4defs.registers
Collection of P4 registers.
731 @property 732 def digests(self) -> P4EntityMap["P4Digest"]: 733 "Collection of P4 digests." 734 return self._p4defs.digests
Collection of P4 digests.
736 @property 737 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 738 "Collection of P4 value sets." 739 return self._p4defs.value_sets
Collection of P4 value sets.
134@final 135class Switch: 136 """Represents a P4Runtime Switch. 137 138 A `Switch` is constructed with a `name`, `address` and an optional 139 `SwitchOptions` configuration. 140 141 The `name` is up to the user but should uniquely identify the switch. 142 143 The `address` identifies the target endpoint of the GRPC channel. It should 144 have the format `<ADDRESS>:<PORT>` where `<ADDRESS>` can be a domain name, 145 IPv4 address, or IPv6 address in square brackets, and `<PORT>` is the TCP 146 port number. 147 148 The `options` is a `SwitchOptions` object that specifies how the `Switch` 149 will behave. 150 151 ``` 152 opts = SwitchOptions(p4info=..., p4blob=...) 153 sw1 = Switch('sw1', '10.0.0.1:50000', opts) 154 ``` 155 156 The `stash` is an optional dictionary for storing arbitrary per-switch 157 information. The dictionary keys are strings. You can use this to store 158 anything that you need to manage the switch. 159 160 Each switch object has an event emitter `ee`. Use the EventEmitter to listen 161 for port change events like PORT_UP and PORT_DOWN. See the `SwitchEvent` 162 class for a list of other switch events. 163 """ 164 165 _name: str 166 _address: str 167 _options: SwitchOptions 168 _stash: dict[str, Any] 169 _ee: "SwitchEmitter" 170 _p4client: P4Client | None 171 _p4schema: P4Schema 172 _tasks: "SwitchTasks | None" 173 _packet_queues: list[tuple[Callable[[bytes], bool], Queue[p4entity.P4PacketIn]]] 174 _digest_queues: dict[str, Queue[p4entity.P4DigestList]] 175 _timeout_queue: Queue[p4entity.P4IdleTimeoutNotification] | None 176 _arbitrator: "Arbitrator" 177 _gnmi_client: GNMIClient | None 178 _ports: SwitchPortList 179 _is_channel_up: bool = False 180 _control_task: asyncio.Task[Any] | None = None 181 182 def __init__( 183 self, 184 name: str, 185 address: str, 186 options: SwitchOptions | None = None, 187 *, 188 stash: dict[str, Any] | None = None, 189 ) -> None: 190 """Initialize switch with name, address and options. 191 192 Args: 193 name: A human-readable name to uniquely identify the switch. 194 address: The target address of the P4Runtime GRPC channel. 195 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or 196 IP address, and `<PORT>` is the TCP port number. 197 options: Configuration options for the switch. 198 stash: Optional user-controlled dictionary. 199 Used to store information that user code needs to access or share. 200 """ 201 if options is None: 202 options = SwitchOptions() 203 204 self._name = name 205 self._address = address 206 self._options = options 207 self._stash = stash or {} 208 self._ee = SwitchEmitter(self) 209 self._p4client = None 210 self._p4schema = P4Schema(options.p4info, options.p4blob) 211 self._tasks = None 212 self._packet_queues = [] 213 self._digest_queues = {} 214 self._timeout_queue = None 215 self._arbitrator = Arbitrator( 216 options.initial_election_id, options.role_name, options.role_config 217 ) 218 self._gnmi_client = None 219 self._ports = SwitchPortList() 220 221 @property 222 def name(self) -> str: 223 "Name of the switch." 224 return self._name 225 226 @property 227 def address(self) -> str: 228 """Address of the switch formatted as a GRPC target. 229 230 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP 231 address, and `<PORT>` is the TCP port number. 232 """ 233 return self._address 234 235 @property 236 def options(self) -> SwitchOptions: 237 "Switch options." 238 return self._options 239 240 @options.setter 241 def options(self, opts: SwitchOptions) -> None: 242 "Set switch options to a new value." 243 if self._p4client is not None: 244 raise RuntimeError("Cannot change switch options while client is open.") 245 246 self._options = opts 247 self._p4schema = P4Schema(opts.p4info, opts.p4blob) 248 self._arbitrator = Arbitrator( 249 opts.initial_election_id, opts.role_name, opts.role_config 250 ) 251 252 @property 253 def stash(self) -> dict[str, Any]: 254 "Switch stash. Used to store per-switch data for any purpose." 255 return self._stash 256 257 @property 258 def ee(self) -> "SwitchEmitter": 259 "Switch event emitter. See `SwitchEvent` for more details on events." 260 return self._ee 261 262 @property 263 def device_id(self) -> int: 264 "Switch's device ID." 265 return self._options.device_id 266 267 @property 268 def is_up(self) -> bool: 269 "True if switch is UP." 270 return self._is_channel_up 271 272 @property 273 def is_primary(self) -> bool: 274 "True if switch is primary." 275 return self._arbitrator.is_primary 276 277 @property 278 def primary_id(self) -> int: 279 "Election ID of switch that is currently primary." 280 return self._arbitrator.primary_id 281 282 @property 283 def election_id(self) -> int: 284 "Switch's current election ID." 285 return self._arbitrator.election_id 286 287 @property 288 def role_name(self) -> str: 289 "Switch's current role name." 290 return self._arbitrator.role_name 291 292 @property 293 def p4info(self) -> P4Schema: 294 "Switch's P4 schema." 295 return self._p4schema 296 297 @property 298 def gnmi_client(self) -> GNMIClient | None: 299 "Switch's gNMI client." 300 return self._gnmi_client 301 302 @property 303 def ports(self) -> SwitchPortList: 304 "Switch's list of interfaces." 305 return self._ports 306 307 @property 308 def p4runtime_version(self) -> P4RuntimeVersion: 309 "P4Runtime protocol version." 310 return self._p4schema.p4runtime_version 311 312 @overload 313 async def read( 314 self, 315 entities: _ET, 316 ) -> AsyncGenerator[_ET, None]: 317 "Overload for read of a single P4Entity subtype." 318 ... # pragma: no cover 319 320 @overload 321 async def read( 322 self, 323 entities: Iterable[_ET], 324 ) -> AsyncGenerator[_ET, None]: 325 "Overload for read of an iterable of the same P4Entity subtype." 326 ... # pragma: no cover 327 328 @overload 329 async def read( 330 self, 331 entities: Iterable[p4entity.P4EntityList], 332 ) -> AsyncGenerator[p4entity.P4Entity, None]: 333 "Most general overload: we can't determine the return type exactly." 334 ... # pragma: no cover 335 336 async def read( 337 self, 338 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 339 ) -> AsyncGenerator[p4entity.P4Entity, None]: 340 "Async iterator that reads entities from the switch." 341 assert self._p4client is not None 342 343 if not entities: 344 return 345 346 if isinstance(entities, p4entity.P4Entity): 347 entities = [entities] 348 349 request = p4r.ReadRequest( 350 device_id=self.device_id, 351 entities=p4entity.encode_entities(entities, self.p4info), 352 ) 353 354 async for reply in self._p4client.request_iter(request): 355 for ent in reply.entities: 356 yield p4entity.decode_entity(ent, self.p4info) 357 358 async def read_packets( 359 self, 360 *, 361 queue_size: int = _DEFAULT_QUEUE_SIZE, 362 eth_types: Iterable[int] | None = None, 363 ) -> AsyncIterator["p4entity.P4PacketIn"]: 364 "Async iterator for incoming packets (P4PacketIn)." 365 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 366 367 if eth_types is None: 368 369 def _pkt_filter(_payload: bytes) -> bool: 370 return True 371 372 else: 373 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 374 375 def _pkt_filter(_payload: bytes) -> bool: 376 return _payload[12:14] in _filter 377 378 queue = Queue[p4entity.P4PacketIn](queue_size) 379 queue_filter = (_pkt_filter, queue) 380 self._packet_queues.append(queue_filter) 381 382 try: 383 while True: 384 yield await queue.get() 385 finally: 386 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 387 self._packet_queues.remove(queue_filter) 388 389 async def read_digests( 390 self, 391 digest_id: str, 392 *, 393 queue_size: int = _DEFAULT_QUEUE_SIZE, 394 ) -> AsyncIterator["p4entity.P4DigestList"]: 395 "Async iterator for incoming digest lists (P4DigestList)." 396 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 397 398 if digest_id in self._digest_queues: 399 raise ValueError(f"queue for digest_id {digest_id!r} already open") 400 401 queue = Queue[p4entity.P4DigestList](queue_size) 402 self._digest_queues[digest_id] = queue 403 try: 404 while True: 405 yield await queue.get() 406 finally: 407 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 408 del self._digest_queues[digest_id] 409 410 async def read_idle_timeouts( 411 self, 412 *, 413 queue_size: int = _DEFAULT_QUEUE_SIZE, 414 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 415 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 416 LOGGER.debug("read_idle_timeouts: opening queue") 417 418 if self._timeout_queue is not None: 419 raise ValueError("timeout queue already open") 420 421 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 422 self._timeout_queue = queue 423 try: 424 while True: 425 yield await queue.get() 426 finally: 427 LOGGER.debug("read_idle_timeouts: closing queue") 428 self._timeout_queue = None 429 430 async def write( 431 self, 432 entities: Iterable[p4entity.P4UpdateList], 433 *, 434 strict: bool = True, 435 warn_only: bool = False, 436 ) -> None: 437 """Write updates and stream messages to the switch. 438 439 If `strict` is False, MODIFY and DELETE operations will NOT raise an 440 error if the entity does not exist (NOT_FOUND). 441 442 If `warn_only` is True, no operations will raise an error. Instead, 443 the exception will be logged as a WARNING and the method will return 444 normally. 445 """ 446 assert self._p4client is not None 447 448 if not entities: 449 return 450 451 msgs = p4entity.encode_updates(entities, self.p4info) 452 453 updates: list[p4r.Update] = [] 454 for msg in msgs: 455 if isinstance(msg, p4r.StreamMessageRequest): 456 # StreamMessageRequests are transmitted immediately. 457 # TODO: Understand what happens with backpressure? 458 await self._p4client.send(msg) 459 else: 460 updates.append(msg) 461 462 if updates: 463 await self._write_request(updates, strict, warn_only) 464 465 async def insert( 466 self, 467 entities: Iterable[p4entity.P4EntityList], 468 *, 469 warn_only: bool = False, 470 ) -> None: 471 """Insert the specified entities. 472 473 If `warn_only` is True, errors will be logged as warnings instead of 474 raising an exception. 475 """ 476 if entities: 477 await self._write_request( 478 [ 479 p4r.Update(type=p4r.Update.INSERT, entity=ent) 480 for ent in p4entity.encode_entities(entities, self.p4info) 481 ], 482 True, 483 warn_only, 484 ) 485 486 async def modify( 487 self, 488 entities: Iterable[p4entity.P4EntityList], 489 *, 490 strict: bool = True, 491 warn_only: bool = False, 492 ) -> None: 493 """Modify the specified entities. 494 495 If `strict` is False, NOT_FOUND errors will be ignored. 496 497 If `warn_only` is True, errors will be logged as warnings instead of 498 raising an exception. 499 """ 500 if entities: 501 await self._write_request( 502 [ 503 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 504 for ent in p4entity.encode_entities(entities, self.p4info) 505 ], 506 strict, 507 warn_only, 508 ) 509 510 async def delete( 511 self, 512 entities: Iterable[p4entity.P4EntityList], 513 *, 514 strict: bool = True, 515 warn_only: bool = False, 516 ) -> None: 517 """Delete the specified entities. 518 519 If `strict` is False, NOT_FOUND errors will be ignored. 520 521 If `warn_only` is True, errors will be logged as warnings instead of 522 raising an exception. 523 """ 524 if entities: 525 await self._write_request( 526 [ 527 p4r.Update(type=p4r.Update.DELETE, entity=ent) 528 for ent in p4entity.encode_entities(entities, self.p4info) 529 ], 530 strict, 531 warn_only, 532 ) 533 534 async def delete_all(self) -> None: 535 """Delete all entities if no parameter is passed. Otherwise, delete 536 items that match `entities`. 537 538 This method does not attempt to delete entries in const tables. 539 540 TODO: This method does not affect indirect counters, meters or 541 value_sets. 542 """ 543 await self.delete_many( 544 [ 545 p4entity.P4TableEntry(), 546 p4entity.P4MulticastGroupEntry(), 547 p4entity.P4CloneSessionEntry(), 548 ] 549 ) 550 551 # Reset all default table entries. 552 default_entries = [ 553 p4entity.P4TableEntry(table.alias, is_default_action=True) 554 for table in self.p4info.tables 555 if table.const_default_action is None and table.action_profile is None 556 ] 557 if default_entries: 558 await self.modify(default_entries) 559 560 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 561 # We do this after deleting the P4TableEntry's in case a client is using 562 # "one-shot" references; these are incompatible with separate 563 # action profiles. 564 await self.delete_many( 565 [ 566 p4entity.P4ActionProfileGroup(), 567 p4entity.P4ActionProfileMember(), 568 ] 569 ) 570 571 # Delete DigestEntry separately. Wildcard reads are not supported. 572 digest_entries = [ 573 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 574 ] 575 if digest_entries: 576 await self.delete(digest_entries, strict=False) 577 578 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 579 """Delete entities that match a wildcard read. 580 581 This method always skips over entries in const tables. It is an error 582 to attempt to delete those. 583 """ 584 assert self._p4client is not None 585 586 request = p4r.ReadRequest( 587 device_id=self.device_id, 588 entities=p4entity.encode_entities(entities, self.p4info), 589 ) 590 591 # Compute set of all const table ID's (may be empty). 592 to_skip = {table.id for table in self.p4info.tables if table.is_const} 593 594 async for reply in self._p4client.request_iter(request): 595 if reply.entities: 596 if to_skip: 597 await self.delete( 598 reply 599 for reply in reply.entities 600 if reply.HasField("table_entry") 601 and reply.table_entry.table_id not in to_skip 602 ) 603 else: 604 await self.delete(reply.entities) 605 606 async def run(self) -> None: 607 "Run the switch's lifecycle repeatedly." 608 assert self._p4client is None 609 assert self._tasks is None 610 611 self._tasks = SwitchTasks(self._options.fail_fast) 612 self._p4client = P4Client(self._address, self._options.channel_credentials) 613 self._switch_start() 614 615 try: 616 while True: 617 # If the switch fails and restarts too quickly, slow it down. 618 async with _throttle_failure(): 619 self.create_task(self._run(), background=True) 620 await self._tasks.wait() 621 self._arbitrator.reset() 622 623 finally: 624 self._p4client = None 625 self._tasks = None 626 self._switch_stop() 627 628 def create_task( 629 self, 630 coro: Coroutine[Any, Any, _T], 631 *, 632 background: bool = False, 633 name: str | None = None, 634 ) -> asyncio.Task[_T]: 635 "Create an asyncio task tied to the Switch's lifecycle." 636 assert self._tasks is not None 637 638 return self._tasks.create_task( 639 coro, 640 switch=self, 641 background=background, 642 name=name, 643 ) 644 645 async def _run(self): 646 "Main Switch task runs the stream." 647 assert not self._is_channel_up 648 assert self._p4client is not None 649 650 try: 651 await self._p4client.open( 652 schema=self.p4info, 653 complete_request=self._arbitrator.complete_request, 654 ) 655 await self._arbitrator.handshake(self) 656 await self._fetch_capabilities() 657 await self._start_gnmi() 658 self._channel_up() 659 660 # Receive messages from the stream until it closes. 661 await self._receive_until_closed() 662 663 finally: 664 await self._stop_gnmi() 665 await self._p4client.close() 666 self._channel_down() 667 668 async def _receive_until_closed(self): 669 "Receive messages from stream until EOF." 670 assert self._p4client is not None 671 672 client = self._p4client 673 674 while True: 675 try: 676 msg = await client.receive() 677 except P4ClientError as ex: 678 if not ex.is_election_id_used: 679 raise 680 # Handle "election ID in use" error. 681 await self._arbitrator.handshake(self, conflict=True) 682 else: 683 await self._handle_stream_message(msg) 684 685 async def _handle_stream_message(self, msg: p4r.StreamMessageResponse): 686 "Handle a P4Runtime StreamMessageResponse." 687 match msg.WhichOneof("update"): 688 case "packet": 689 self._stream_packet_message(msg) 690 case "digest": 691 self._stream_digest_message(msg) 692 case "idle_timeout_notification": 693 self._stream_timeout_message(msg) 694 case "arbitration": 695 await self._arbitrator.update(self, msg.arbitration) 696 case "error": 697 self._stream_error_message(msg) 698 case other: 699 LOGGER.error("_handle_stream_message: unknown update %r", other) 700 701 async def __aenter__(self) -> Self: 702 "Similar to run() but provides a one-time context manager interface." 703 assert self._p4client is None 704 assert self._tasks is None 705 706 self._tasks = SwitchTasks(self._options.fail_fast) 707 self._p4client = P4Client( 708 self._address, 709 self._options.channel_credentials, 710 wait_for_ready=False, 711 ) 712 self._switch_start() 713 714 try: 715 # Start the switch's `_run` task in the background. Then, wait for 716 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 717 # cannot connect or fails in some other way, it will finish before 718 # the `ready` future. We need to handle the error in this case. 719 720 run = self.create_task(self._run(), background=True) 721 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 722 done, _ = await asyncio.wait( 723 [run, ready], return_when=asyncio.FIRST_COMPLETED 724 ) 725 if run in done: 726 await run 727 728 except BaseException: 729 await self.__aexit__(None, None, None) 730 raise 731 732 return self 733 734 async def __aexit__( 735 self, 736 _exc_type: type[BaseException] | None, 737 _exc_val: BaseException | None, 738 _exc_tb: TracebackType | None, 739 ) -> bool | None: 740 "Similar to run() but provides a one-time context manager interface." 741 assert self._p4client is not None 742 assert self._tasks is not None 743 744 self._tasks.cancel_all() 745 await self._tasks.wait() 746 self._arbitrator.reset() 747 self._p4client = None 748 self._tasks = None 749 self._switch_stop() 750 751 def _switch_start(self): 752 "Called when switch starts its run() cycle." 753 assert not self._is_channel_up 754 755 LOGGER.info( 756 "Switch start (name=%r, address=%r, device_id=%r, role_name=%r, initial_election_id=%r)", 757 self.name, 758 self.address, 759 self.device_id, 760 self.role_name, 761 self.options.initial_election_id, 762 ) 763 self.ee.emit(SwitchEvent.SWITCH_START) 764 765 def _switch_stop(self): 766 "Called when switch stops its run() cycle." 767 assert not self._is_channel_up 768 769 LOGGER.info( 770 "Switch stop (name=%r, address=%r, device_id=%r, role_name=%r)", 771 self.name, 772 self.address, 773 self.device_id, 774 self.role_name, 775 ) 776 self.ee.emit(SwitchEvent.SWITCH_STOP) 777 778 def _channel_up(self): 779 "Called when P4Runtime channel is UP." 780 assert not self._is_channel_up 781 782 ports = " ".join(f"({port.id}){port.name}" for port in self.ports) 783 LOGGER.info( 784 "Channel up (is_primary=%r, role_name=%r, p4r=%s): %s", 785 self.is_primary, 786 self.role_name, 787 self.p4runtime_version, 788 ports, 789 ) 790 self._is_channel_up = True 791 self.create_task(self._ready()) 792 793 self.ee.emit(SwitchEvent.CHANNEL_UP, self) 794 795 def _channel_down(self): 796 "Called when P4Runtime channel is DOWN." 797 if not self._is_channel_up: 798 return # do nothing! 799 800 LOGGER.info( 801 "Channel down (is_primary=%r, role_name=%r)", 802 self.is_primary, 803 self.role_name, 804 ) 805 self._is_channel_up = False 806 807 self.ee.emit(SwitchEvent.CHANNEL_DOWN, self) 808 809 def _become_primary(self): 810 "Called when a P4Runtime backup channel becomes the primary." 811 assert self._tasks is not None 812 813 LOGGER.info( 814 "Become primary (is_primary=%r, role_name=%r)", 815 self.is_primary, 816 self.role_name, 817 ) 818 819 self._tasks.cancel_primary() 820 self.create_task(self._ready()) 821 822 self.ee.emit(SwitchEvent.BECOME_PRIMARY, self) 823 824 def _become_backup(self): 825 "Called when a P4Runtime primary channel becomes a backup." 826 assert self._tasks is not None 827 828 LOGGER.info( 829 "Become backup (is_primary=%r, role_name=%r)", 830 self.is_primary, 831 self.role_name, 832 ) 833 834 self._tasks.cancel_primary() 835 self.create_task(self._ready()) 836 837 self.ee.emit(SwitchEvent.BECOME_BACKUP, self) 838 839 def _channel_ready(self): 840 "Called when a P4Runtime channel is READY." 841 LOGGER.info( 842 "Channel ready (is_primary=%r, role_name=%r): %s", 843 self.is_primary, 844 self.role_name, 845 self.p4info.get_pipeline_info(), 846 ) 847 848 if self._options.ready_handler: 849 self.create_task(self._options.ready_handler(self)) 850 851 self.ee.emit(SwitchEvent.CHANNEL_READY, self) 852 853 def _stream_packet_message(self, msg: p4r.StreamMessageResponse): 854 "Called when a P4Runtime packet-in response is received." 855 packet = p4entity.decode_stream(msg, self.p4info) 856 857 was_queued = False 858 for pkt_filter, queue in self._packet_queues: 859 if not queue.full() and pkt_filter(packet.payload): 860 queue.put_nowait(packet) 861 was_queued = True 862 863 if not was_queued: 864 LOGGER.warning("packet ignored: %r", packet) 865 866 def _stream_digest_message(self, msg: p4r.StreamMessageResponse): 867 "Called when a P4Runtime digest response is received." 868 try: 869 # Decode the digest list message. 870 digest: p4entity.P4DigestList = p4entity.decode_stream(msg, self.p4info) 871 except ValueError as ex: 872 # It's possible to receive a digest for a different P4Info file, or 873 # even before a P4Info is fetched from the switch. 874 LOGGER.warning("digest decode failed: %s", ex) 875 else: 876 # Place the decoded digest list in a queue, if one is waiting. 877 queue = self._digest_queues.get(digest.digest_id) 878 if queue is not None and not queue.full(): 879 queue.put_nowait(digest) 880 else: 881 LOGGER.warning("digest ignored: %r", digest) 882 883 def _stream_timeout_message(self, msg: p4r.StreamMessageResponse): 884 "Called when a P4Runtime timeout notification is received." 885 timeout: p4entity.P4IdleTimeoutNotification = p4entity.decode_stream( 886 msg, self.p4info 887 ) 888 queue = self._timeout_queue 889 890 if queue is not None and not queue.full(): 891 queue.put_nowait(timeout) 892 else: 893 LOGGER.warning("timeout ignored: %r", timeout) 894 895 def _stream_error_message(self, msg: p4r.StreamMessageResponse): 896 "Called when a P4Runtime stream error response is received." 897 assert self._p4client is not None 898 899 # Log the message at the ERROR level. 900 pbutil.log_msg(self._p4client.channel, msg, self.p4info, level=logging.ERROR) 901 902 self.ee.emit(SwitchEvent.STREAM_ERROR, self, msg) 903 904 async def _ready(self): 905 "Prepare the pipeline." 906 if self.p4info.is_authoritative and self.is_primary: 907 await self._set_pipeline() 908 else: 909 await self._get_pipeline() 910 911 self._channel_ready() 912 913 async def _get_pipeline(self): 914 "Get the switch's P4Info." 915 has_pipeline = False 916 917 try: 918 reply = await self._get_pipeline_config_request( 919 response_type=P4ConfigResponseType.P4INFO_AND_COOKIE 920 ) 921 922 if reply.config.HasField("p4info"): 923 has_pipeline = True 924 p4info = reply.config.p4info 925 if not self.p4info.exists: 926 # If we don't have P4Info yet, set it. 927 self.p4info.set_p4info(p4info) 928 elif not self.p4info.has_p4info(p4info): 929 # If P4Info is not identical, log a warning message. 930 LOGGER.warning("Retrieved P4Info is different than expected!") 931 932 except P4ClientError as ex: 933 if not ex.is_pipeline_missing: 934 raise 935 936 if not has_pipeline and self.p4info.exists: 937 LOGGER.warning("Forwarding pipeline is not configured") 938 939 async def _set_pipeline(self): 940 """Set up the pipeline. 941 942 If `p4force` is false (the default), we first retrieve the cookie for 943 the current pipeline and see if it matches the new pipeline's cookie. 944 If the cookies match, we are done; there is no need to set the pipeline. 945 946 If `p4force` is true, we always load the new pipeline. 947 """ 948 cookie = -1 949 try: 950 if not self.options.p4force: 951 reply = await self._get_pipeline_config_request() 952 if reply.config.HasField("cookie"): 953 cookie = reply.config.cookie.cookie 954 955 except P4ClientError as ex: 956 if not ex.is_pipeline_missing: 957 raise 958 959 if cookie != self.p4info.p4cookie: 960 LOGGER.debug( 961 "cookie %#x does not match expected %#x", cookie, self.p4info.p4cookie 962 ) 963 await self._set_pipeline_config_request( 964 config=self.p4info.get_pipeline_config() 965 ) 966 LOGGER.info("Pipeline installed: %s", self.p4info.get_pipeline_info()) 967 968 async def _get_pipeline_config_request( 969 self, 970 *, 971 response_type: P4ConfigResponseType = P4ConfigResponseType.COOKIE_ONLY, 972 ) -> p4r.GetForwardingPipelineConfigResponse: 973 "Send a GetForwardingPipelineConfigRequest and await the response." 974 assert self._p4client is not None 975 976 return await self._p4client.request( 977 p4r.GetForwardingPipelineConfigRequest( 978 device_id=self.device_id, 979 response_type=response_type.vt(), 980 ) 981 ) 982 983 async def _set_pipeline_config_request( 984 self, 985 *, 986 action: P4ConfigAction = P4ConfigAction.VERIFY_AND_COMMIT, 987 config: p4r.ForwardingPipelineConfig, 988 ) -> p4r.SetForwardingPipelineConfigResponse: 989 "Send a SetForwardingPipelineConfigRequest and await the response." 990 assert self._p4client is not None 991 992 return await self._p4client.request( 993 p4r.SetForwardingPipelineConfigRequest( 994 device_id=self.device_id, 995 action=action.vt(), 996 config=config, 997 ) 998 ) 999 1000 async def _write_request( 1001 self, 1002 updates: list[p4r.Update], 1003 strict: bool, 1004 warn_only: bool, 1005 ): 1006 "Send a P4Runtime WriteRequest." 1007 assert self._p4client is not None 1008 1009 try: 1010 await self._p4client.request( 1011 p4r.WriteRequest( 1012 device_id=self.device_id, 1013 updates=updates, 1014 ) 1015 ) 1016 except P4ClientError as ex: 1017 if strict or not ex.is_not_found_only: 1018 if warn_only: 1019 LOGGER.warning( 1020 "WriteRequest with `warn_only=True` failed", 1021 exc_info=True, 1022 ) 1023 else: 1024 raise 1025 1026 assert (not strict and ex.is_not_found_only) or warn_only 1027 1028 async def _fetch_capabilities(self): 1029 "Check the P4Runtime protocol version supported by the other end." 1030 assert self._p4client is not None 1031 1032 try: 1033 reply = await self._p4client.request(p4r.CapabilitiesRequest()) 1034 version = P4RuntimeVersion.parse(reply.p4runtime_api_version) 1035 self._p4schema.set_p4runtime_version(version) 1036 1037 # TODO: Do something with the `experimental` option added to 1038 # CapabilitiesResponse in P4Runtime 1.5.0. 1039 1040 except P4ClientError as ex: 1041 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1042 raise 1043 LOGGER.warning("CapabilitiesRequest is not implemented") 1044 1045 async def _start_gnmi(self): 1046 "Start the associated gNMI client." 1047 assert self._gnmi_client is None 1048 assert self._p4client is not None 1049 1050 self._gnmi_client = GNMIClient(self._address, self._options.channel_credentials) 1051 await self._gnmi_client.open(channel=self._p4client.channel) 1052 1053 try: 1054 await self._ports.subscribe(self._gnmi_client) 1055 if self._ports: 1056 self.create_task(self._ports.listen(), background=True, name="_ports") 1057 1058 except GNMIClientError as ex: 1059 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1060 raise 1061 LOGGER.warning("gNMI is not implemented") 1062 await self._gnmi_client.close() 1063 self._gnmi_client = None 1064 1065 async def _stop_gnmi(self): 1066 "Stop the associated gNMI client." 1067 if self._gnmi_client is not None: 1068 self._ports.close() 1069 await self._gnmi_client.close() 1070 self._gnmi_client = None 1071 1072 def __repr__(self) -> str: 1073 "Return string representation of switch." 1074 return f"Switch(name={self._name!r}, address={self._address!r})"
Represents a P4Runtime Switch.
A Switch is constructed with a name, address and an optional
SwitchOptions configuration.
The name is up to the user but should uniquely identify the switch.
The address identifies the target endpoint of the GRPC channel. It should
have the format <ADDRESS>:<PORT> where <ADDRESS> can be a domain name,
IPv4 address, or IPv6 address in square brackets, and <PORT> is the TCP
port number.
The options is a SwitchOptions object that specifies how the Switch
will behave.
opts = SwitchOptions(p4info=..., p4blob=...)
sw1 = Switch('sw1', '10.0.0.1:50000', opts)
The stash is an optional dictionary for storing arbitrary per-switch
information. The dictionary keys are strings. You can use this to store
anything that you need to manage the switch.
Each switch object has an event emitter ee. Use the EventEmitter to listen
for port change events like PORT_UP and PORT_DOWN. See the SwitchEvent
class for a list of other switch events.
182 def __init__( 183 self, 184 name: str, 185 address: str, 186 options: SwitchOptions | None = None, 187 *, 188 stash: dict[str, Any] | None = None, 189 ) -> None: 190 """Initialize switch with name, address and options. 191 192 Args: 193 name: A human-readable name to uniquely identify the switch. 194 address: The target address of the P4Runtime GRPC channel. 195 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or 196 IP address, and `<PORT>` is the TCP port number. 197 options: Configuration options for the switch. 198 stash: Optional user-controlled dictionary. 199 Used to store information that user code needs to access or share. 200 """ 201 if options is None: 202 options = SwitchOptions() 203 204 self._name = name 205 self._address = address 206 self._options = options 207 self._stash = stash or {} 208 self._ee = SwitchEmitter(self) 209 self._p4client = None 210 self._p4schema = P4Schema(options.p4info, options.p4blob) 211 self._tasks = None 212 self._packet_queues = [] 213 self._digest_queues = {} 214 self._timeout_queue = None 215 self._arbitrator = Arbitrator( 216 options.initial_election_id, options.role_name, options.role_config 217 ) 218 self._gnmi_client = None 219 self._ports = SwitchPortList()
Initialize switch with name, address and options.
Arguments:
- name: A human-readable name to uniquely identify the switch.
- address: The target address of the P4Runtime GRPC channel.
Format is
<ADDRESS>:<PORT>where<ADDRESS>is a DNS name or IP address, and<PORT>is the TCP port number. - options: Configuration options for the switch.
- stash: Optional user-controlled dictionary. Used to store information that user code needs to access or share.
226 @property 227 def address(self) -> str: 228 """Address of the switch formatted as a GRPC target. 229 230 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP 231 address, and `<PORT>` is the TCP port number. 232 """ 233 return self._address
Address of the switch formatted as a GRPC target.
Format is <ADDRESS>:<PORT> where <ADDRESS> is a DNS name or IP
address, and <PORT> is the TCP port number.
235 @property 236 def options(self) -> SwitchOptions: 237 "Switch options." 238 return self._options
Switch options.
252 @property 253 def stash(self) -> dict[str, Any]: 254 "Switch stash. Used to store per-switch data for any purpose." 255 return self._stash
Switch stash. Used to store per-switch data for any purpose.
257 @property 258 def ee(self) -> "SwitchEmitter": 259 "Switch event emitter. See `SwitchEvent` for more details on events." 260 return self._ee
Switch event emitter. See SwitchEvent for more details on events.
262 @property 263 def device_id(self) -> int: 264 "Switch's device ID." 265 return self._options.device_id
Switch's device ID.
267 @property 268 def is_up(self) -> bool: 269 "True if switch is UP." 270 return self._is_channel_up
True if switch is UP.
272 @property 273 def is_primary(self) -> bool: 274 "True if switch is primary." 275 return self._arbitrator.is_primary
True if switch is primary.
277 @property 278 def primary_id(self) -> int: 279 "Election ID of switch that is currently primary." 280 return self._arbitrator.primary_id
Election ID of switch that is currently primary.
282 @property 283 def election_id(self) -> int: 284 "Switch's current election ID." 285 return self._arbitrator.election_id
Switch's current election ID.
287 @property 288 def role_name(self) -> str: 289 "Switch's current role name." 290 return self._arbitrator.role_name
Switch's current role name.
297 @property 298 def gnmi_client(self) -> GNMIClient | None: 299 "Switch's gNMI client." 300 return self._gnmi_client
Switch's gNMI client.
302 @property 303 def ports(self) -> SwitchPortList: 304 "Switch's list of interfaces." 305 return self._ports
Switch's list of interfaces.
307 @property 308 def p4runtime_version(self) -> P4RuntimeVersion: 309 "P4Runtime protocol version." 310 return self._p4schema.p4runtime_version
P4Runtime protocol version.
336 async def read( 337 self, 338 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 339 ) -> AsyncGenerator[p4entity.P4Entity, None]: 340 "Async iterator that reads entities from the switch." 341 assert self._p4client is not None 342 343 if not entities: 344 return 345 346 if isinstance(entities, p4entity.P4Entity): 347 entities = [entities] 348 349 request = p4r.ReadRequest( 350 device_id=self.device_id, 351 entities=p4entity.encode_entities(entities, self.p4info), 352 ) 353 354 async for reply in self._p4client.request_iter(request): 355 for ent in reply.entities: 356 yield p4entity.decode_entity(ent, self.p4info)
Async iterator that reads entities from the switch.
358 async def read_packets( 359 self, 360 *, 361 queue_size: int = _DEFAULT_QUEUE_SIZE, 362 eth_types: Iterable[int] | None = None, 363 ) -> AsyncIterator["p4entity.P4PacketIn"]: 364 "Async iterator for incoming packets (P4PacketIn)." 365 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 366 367 if eth_types is None: 368 369 def _pkt_filter(_payload: bytes) -> bool: 370 return True 371 372 else: 373 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 374 375 def _pkt_filter(_payload: bytes) -> bool: 376 return _payload[12:14] in _filter 377 378 queue = Queue[p4entity.P4PacketIn](queue_size) 379 queue_filter = (_pkt_filter, queue) 380 self._packet_queues.append(queue_filter) 381 382 try: 383 while True: 384 yield await queue.get() 385 finally: 386 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 387 self._packet_queues.remove(queue_filter)
Async iterator for incoming packets (P4PacketIn).
389 async def read_digests( 390 self, 391 digest_id: str, 392 *, 393 queue_size: int = _DEFAULT_QUEUE_SIZE, 394 ) -> AsyncIterator["p4entity.P4DigestList"]: 395 "Async iterator for incoming digest lists (P4DigestList)." 396 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 397 398 if digest_id in self._digest_queues: 399 raise ValueError(f"queue for digest_id {digest_id!r} already open") 400 401 queue = Queue[p4entity.P4DigestList](queue_size) 402 self._digest_queues[digest_id] = queue 403 try: 404 while True: 405 yield await queue.get() 406 finally: 407 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 408 del self._digest_queues[digest_id]
Async iterator for incoming digest lists (P4DigestList).
410 async def read_idle_timeouts( 411 self, 412 *, 413 queue_size: int = _DEFAULT_QUEUE_SIZE, 414 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 415 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 416 LOGGER.debug("read_idle_timeouts: opening queue") 417 418 if self._timeout_queue is not None: 419 raise ValueError("timeout queue already open") 420 421 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 422 self._timeout_queue = queue 423 try: 424 while True: 425 yield await queue.get() 426 finally: 427 LOGGER.debug("read_idle_timeouts: closing queue") 428 self._timeout_queue = None
Async iterator for incoming idle timeouts (P4IdleTimeoutNotification).
430 async def write( 431 self, 432 entities: Iterable[p4entity.P4UpdateList], 433 *, 434 strict: bool = True, 435 warn_only: bool = False, 436 ) -> None: 437 """Write updates and stream messages to the switch. 438 439 If `strict` is False, MODIFY and DELETE operations will NOT raise an 440 error if the entity does not exist (NOT_FOUND). 441 442 If `warn_only` is True, no operations will raise an error. Instead, 443 the exception will be logged as a WARNING and the method will return 444 normally. 445 """ 446 assert self._p4client is not None 447 448 if not entities: 449 return 450 451 msgs = p4entity.encode_updates(entities, self.p4info) 452 453 updates: list[p4r.Update] = [] 454 for msg in msgs: 455 if isinstance(msg, p4r.StreamMessageRequest): 456 # StreamMessageRequests are transmitted immediately. 457 # TODO: Understand what happens with backpressure? 458 await self._p4client.send(msg) 459 else: 460 updates.append(msg) 461 462 if updates: 463 await self._write_request(updates, strict, warn_only)
Write updates and stream messages to the switch.
If strict is False, MODIFY and DELETE operations will NOT raise an
error if the entity does not exist (NOT_FOUND).
If warn_only is True, no operations will raise an error. Instead,
the exception will be logged as a WARNING and the method will return
normally.
465 async def insert( 466 self, 467 entities: Iterable[p4entity.P4EntityList], 468 *, 469 warn_only: bool = False, 470 ) -> None: 471 """Insert the specified entities. 472 473 If `warn_only` is True, errors will be logged as warnings instead of 474 raising an exception. 475 """ 476 if entities: 477 await self._write_request( 478 [ 479 p4r.Update(type=p4r.Update.INSERT, entity=ent) 480 for ent in p4entity.encode_entities(entities, self.p4info) 481 ], 482 True, 483 warn_only, 484 )
Insert the specified entities.
If warn_only is True, errors will be logged as warnings instead of
raising an exception.
486 async def modify( 487 self, 488 entities: Iterable[p4entity.P4EntityList], 489 *, 490 strict: bool = True, 491 warn_only: bool = False, 492 ) -> None: 493 """Modify the specified entities. 494 495 If `strict` is False, NOT_FOUND errors will be ignored. 496 497 If `warn_only` is True, errors will be logged as warnings instead of 498 raising an exception. 499 """ 500 if entities: 501 await self._write_request( 502 [ 503 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 504 for ent in p4entity.encode_entities(entities, self.p4info) 505 ], 506 strict, 507 warn_only, 508 )
Modify the specified entities.
If strict is False, NOT_FOUND errors will be ignored.
If warn_only is True, errors will be logged as warnings instead of
raising an exception.
510 async def delete( 511 self, 512 entities: Iterable[p4entity.P4EntityList], 513 *, 514 strict: bool = True, 515 warn_only: bool = False, 516 ) -> None: 517 """Delete the specified entities. 518 519 If `strict` is False, NOT_FOUND errors will be ignored. 520 521 If `warn_only` is True, errors will be logged as warnings instead of 522 raising an exception. 523 """ 524 if entities: 525 await self._write_request( 526 [ 527 p4r.Update(type=p4r.Update.DELETE, entity=ent) 528 for ent in p4entity.encode_entities(entities, self.p4info) 529 ], 530 strict, 531 warn_only, 532 )
Delete the specified entities.
If strict is False, NOT_FOUND errors will be ignored.
If warn_only is True, errors will be logged as warnings instead of
raising an exception.
534 async def delete_all(self) -> None: 535 """Delete all entities if no parameter is passed. Otherwise, delete 536 items that match `entities`. 537 538 This method does not attempt to delete entries in const tables. 539 540 TODO: This method does not affect indirect counters, meters or 541 value_sets. 542 """ 543 await self.delete_many( 544 [ 545 p4entity.P4TableEntry(), 546 p4entity.P4MulticastGroupEntry(), 547 p4entity.P4CloneSessionEntry(), 548 ] 549 ) 550 551 # Reset all default table entries. 552 default_entries = [ 553 p4entity.P4TableEntry(table.alias, is_default_action=True) 554 for table in self.p4info.tables 555 if table.const_default_action is None and table.action_profile is None 556 ] 557 if default_entries: 558 await self.modify(default_entries) 559 560 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 561 # We do this after deleting the P4TableEntry's in case a client is using 562 # "one-shot" references; these are incompatible with separate 563 # action profiles. 564 await self.delete_many( 565 [ 566 p4entity.P4ActionProfileGroup(), 567 p4entity.P4ActionProfileMember(), 568 ] 569 ) 570 571 # Delete DigestEntry separately. Wildcard reads are not supported. 572 digest_entries = [ 573 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 574 ] 575 if digest_entries: 576 await self.delete(digest_entries, strict=False)
Delete all entities if no parameter is passed. Otherwise, delete
items that match entities.
This method does not attempt to delete entries in const tables.
TODO: This method does not affect indirect counters, meters or value_sets.
578 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 579 """Delete entities that match a wildcard read. 580 581 This method always skips over entries in const tables. It is an error 582 to attempt to delete those. 583 """ 584 assert self._p4client is not None 585 586 request = p4r.ReadRequest( 587 device_id=self.device_id, 588 entities=p4entity.encode_entities(entities, self.p4info), 589 ) 590 591 # Compute set of all const table ID's (may be empty). 592 to_skip = {table.id for table in self.p4info.tables if table.is_const} 593 594 async for reply in self._p4client.request_iter(request): 595 if reply.entities: 596 if to_skip: 597 await self.delete( 598 reply 599 for reply in reply.entities 600 if reply.HasField("table_entry") 601 and reply.table_entry.table_id not in to_skip 602 ) 603 else: 604 await self.delete(reply.entities)
Delete entities that match a wildcard read.
This method always skips over entries in const tables. It is an error to attempt to delete those.
606 async def run(self) -> None: 607 "Run the switch's lifecycle repeatedly." 608 assert self._p4client is None 609 assert self._tasks is None 610 611 self._tasks = SwitchTasks(self._options.fail_fast) 612 self._p4client = P4Client(self._address, self._options.channel_credentials) 613 self._switch_start() 614 615 try: 616 while True: 617 # If the switch fails and restarts too quickly, slow it down. 618 async with _throttle_failure(): 619 self.create_task(self._run(), background=True) 620 await self._tasks.wait() 621 self._arbitrator.reset() 622 623 finally: 624 self._p4client = None 625 self._tasks = None 626 self._switch_stop()
Run the switch's lifecycle repeatedly.
628 def create_task( 629 self, 630 coro: Coroutine[Any, Any, _T], 631 *, 632 background: bool = False, 633 name: str | None = None, 634 ) -> asyncio.Task[_T]: 635 "Create an asyncio task tied to the Switch's lifecycle." 636 assert self._tasks is not None 637 638 return self._tasks.create_task( 639 coro, 640 switch=self, 641 background=background, 642 name=name, 643 )
Create an asyncio task tied to the Switch's lifecycle.
701 async def __aenter__(self) -> Self: 702 "Similar to run() but provides a one-time context manager interface." 703 assert self._p4client is None 704 assert self._tasks is None 705 706 self._tasks = SwitchTasks(self._options.fail_fast) 707 self._p4client = P4Client( 708 self._address, 709 self._options.channel_credentials, 710 wait_for_ready=False, 711 ) 712 self._switch_start() 713 714 try: 715 # Start the switch's `_run` task in the background. Then, wait for 716 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 717 # cannot connect or fails in some other way, it will finish before 718 # the `ready` future. We need to handle the error in this case. 719 720 run = self.create_task(self._run(), background=True) 721 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 722 done, _ = await asyncio.wait( 723 [run, ready], return_when=asyncio.FIRST_COMPLETED 724 ) 725 if run in done: 726 await run 727 728 except BaseException: 729 await self.__aexit__(None, None, None) 730 raise 731 732 return self
Similar to run() but provides a one-time context manager interface.
1077class SwitchEvent(str, enum.Enum): 1078 "Events for Switch class." 1079 1080 CONTROLLER_ENTER = "controller_enter" # (switch) 1081 CONTROLLER_LEAVE = "controller_leave" # (switch) 1082 SWITCH_START = "switch_start" # (switch) 1083 SWITCH_STOP = "switch_stop" # (switch) 1084 CHANNEL_UP = "channel_up" # (switch) 1085 CHANNEL_DOWN = "channel_down" # (switch) 1086 CHANNEL_READY = "channel_ready" # (switch) 1087 BECOME_PRIMARY = "become_primary" # (switch) 1088 BECOME_BACKUP = "become_backup" # (switch) 1089 PORT_UP = "port_up" # (switch, port) 1090 PORT_DOWN = "port_down" # (switch, port) 1091 STREAM_ERROR = "stream_error" # (switch, p4r.StreamMessageResponse)
Events for Switch class.
75@final 76@dataclasses.dataclass(frozen=True) 77class SwitchOptions: 78 """Represents the configuration options for a `Switch`. 79 80 ``` 81 opts = SwitchOptions( 82 p4info=Path("basic.p4info.txtpb"), 83 p4blob=Path("basic.json"), 84 ready_handler=on_ready, 85 ) 86 ``` 87 88 Each `SwitchOptions` object is immutable and may be shared by multiple 89 switches. You should treat all values as read-only. 90 91 You can use function call syntax to return a copy of a `SwitchOptions` with 92 one or more propertise altered. 93 94 ``` 95 new_opts = opts(device_id=6) 96 ``` 97 """ 98 99 p4info: Path | None = None 100 "Path to P4Info protobuf text file." 101 102 p4blob: Path | SupportsBytes | None = None 103 "Path to P4Blob file, or an object that can provide the bytes value." 104 105 p4force: bool = False 106 "If true, always load the P4 program after initial handshake." 107 108 device_id: int = 1 109 "Default P4Runtime device ID." 110 111 initial_election_id: int = 10 112 "Initial P4Runtime election ID." 113 114 channel_credentials: GRPCCredentialsTLS | None = None 115 "P4Runtime channel credentials. Used for TLS support." 116 117 role_name: str = "" 118 "P4Runtime role configuration." 119 120 role_config: pbutil.PBMessage | None = None 121 "P4Runtime role configuration." 122 123 ready_handler: Callable[["Switch"], Coroutine[Any, Any, None]] | None = None 124 "Ready handler async function callback." 125 126 fail_fast: bool = False 127 """If true, log switch errors as CRITICAL and immediately abort when the 128 switch is running in a Controller.""" 129 130 def __call__(self, **kwds: Any) -> Self: 131 return dataclasses.replace(self, **kwds)
Represents the configuration options for a Switch.
opts = SwitchOptions(
p4info=Path("basic.p4info.txtpb"),
p4blob=Path("basic.json"),
ready_handler=on_ready,
)
Each SwitchOptions object is immutable and may be shared by multiple
switches. You should treat all values as read-only.
You can use function call syntax to return a copy of a SwitchOptions with
one or more propertise altered.
new_opts = opts(device_id=6)
Path to P4Blob file, or an object that can provide the bytes value.
43@dataclass 44class SwitchPort: 45 "Represents a switch port." 46 47 id: int 48 name: str 49 oper_status: OperStatus = OperStatus.UNKNOWN 50 51 @property 52 def up(self) -> bool: 53 "Return true if port is basically up." 54 return self.oper_status == OperStatus.UP
Represents a switch port.
57class SwitchPortList: 58 "Represents a list of switch ports." 59 60 _ports: dict[str, SwitchPort] 61 _subscription: GNMISubscription | None = None 62 63 def __init__(self): 64 self._ports = {} 65 66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key] 69 70 def __len__(self) -> int: 71 "Return number of switch ports." 72 return len(self._ports) 73 74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values()) 77 78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist") 88 89 async def listen(self, switch: "_sw.Switch | None" = None) -> None: 90 "Listen for switch port updates." 91 assert self._subscription is not None 92 93 async for update in self._subscription.updates(): 94 self._update(update, switch) 95 96 def close(self) -> None: 97 "Close the switch port subscription." 98 if self._subscription is not None: 99 self._subscription.cancel() 100 self._subscription = None 101 self._ports = {} 102 103 async def _get_ports(self, client: GNMIClient) -> dict[str, SwitchPort]: 104 "Retrieve ID and name of each port." 105 ports: dict[str, SwitchPort] = {} 106 107 result = await client.get(_ifIndex) 108 for update in result: 109 path = update.path 110 assert path.last == _ifIndex.last 111 112 port = SwitchPort(update.value, path["name"]) 113 ports[port.name] = port 114 115 return ports 116 117 async def _get_subscription(self, client: GNMIClient) -> GNMISubscription: 118 sub = client.subscribe() 119 120 # Subscribe to change notifications. 121 for port in self._ports.values(): 122 sub.on_change(_ifOperStatus.set(name=port.name)) 123 124 # Synchronize initial settings for ports. 125 async for update in sub.synchronize(): 126 self._update(update, None) 127 128 return sub 129 130 def _update(self, update: GNMIUpdate, switch: "_sw.Switch | None"): 131 path = update.path 132 if path.last == _ifOperStatus.last: 133 status = OperStatus(update.value) 134 self._update_port(path["name"], status, switch) 135 else: 136 LOGGER.warning(f"PortList: unknown gNMI path: {path}") 137 138 def _update_port(self, name: str, status: OperStatus, switch: "_sw.Switch | None"): 139 port = self._ports[name] 140 141 prev_up = port.up 142 port.oper_status = status 143 curr_up = port.up 144 145 if switch is not None and curr_up != prev_up: 146 if curr_up: 147 switch.ee.emit(_sw.SwitchEvent.PORT_UP, switch, port) 148 else: 149 switch.ee.emit(_sw.SwitchEvent.PORT_DOWN, switch, port)
Represents a list of switch ports.
66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key]
Retrieve interface by ID.
74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values())
Iterate over switch ports.
78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist")
Obtain the initial list of ports and subscribe to switch port status updates using GNMI.
119class GNMIClient: 120 """Async GNMI client. 121 122 This client implements `get`, `set`, `subscribe` and `capabilities`. 123 124 The API depends on the protobuf definition of `gnmi.TypedValue`. 125 126 Get usage: 127 ``` 128 client = GNMIClient('127.0.0.1:9339') 129 await client.open() 130 131 path = GNMIPath("interfaces/interface") 132 async for update in client.get(path): 133 print(update) 134 ``` 135 136 Subscribe usage: 137 ``` 138 path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status") 139 sub = client.subscribe() 140 sub.on_change(path) 141 142 async for initial_state in sub.synchronize(): 143 print(initial_state) 144 145 async for update in sub.updates(): 146 print(update) 147 ``` 148 149 Set usage: 150 ``` 151 enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled") 152 153 await client.set(update={ 154 enabled: gnmi.TypedValue(boolValue=True), 155 }) 156 ``` 157 """ 158 159 _address: str 160 _credentials: GRPCCredentialsTLS | None 161 _channel: grpc.aio.Channel | None = None 162 _stub: gnmi_grpc.gNMIStub | None = None 163 _channel_reused: bool = False 164 165 def __init__( 166 self, 167 address: str, 168 credentials: GRPCCredentialsTLS | None = None, 169 ): 170 self._address = address 171 self._credentials = credentials 172 173 async def __aenter__(self) -> Self: 174 await self.open() 175 return self 176 177 async def __aexit__(self, *_args: Any) -> bool | None: 178 await self.close() 179 180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel) 205 206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False 216 217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), # type: ignore 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result 255 256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix) 292 293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), # type: ignore 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply 311 312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), # type: ignore 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp 374 375 def _log_msg(self, msg: "pbutil.PBMessage"): 376 "Log a gNMI message." 377 pbutil.log_msg(self._channel, msg, None)
Async GNMI client.
This client implements get, set, subscribe and capabilities.
The API depends on the protobuf definition of gnmi.TypedValue.
Get usage:
client = GNMIClient('127.0.0.1:9339')
await client.open()
path = GNMIPath("interfaces/interface")
async for update in client.get(path):
print(update)
Subscribe usage:
path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
sub = client.subscribe()
sub.on_change(path)
async for initial_state in sub.synchronize():
print(initial_state)
async for update in sub.updates():
print(update)
Set usage:
enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")
await client.set(update={
enabled: gnmi.TypedValue(boolValue=True),
})
180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel)
Open the client channel.
Note: This method is async for forward-compatible reasons.
206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False
Close the client channel.
217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), # type: ignore 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result
Retrieve value(s) using a GetRequest.
256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix)
Subscribe to gNMI change notifications.
Usage:
sub = client.subscribe()
sub.on_change(path1, ...)
sub.sample(path3, path4, sample_interval=1000000000)
async for update in sub.synchronize():
# do something with initial state
async for update in sub.updates():
# do something with updates
You can also subscribe in "ONCE" mode:
sub = client.subscribe()
sub.once(path1, path2, ...)
async for info in sub.synchronize():
# do something with info
The subscription object is not re-entrant, but a fully consumed subscription may be reused.
293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), # type: ignore 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply
Issue a CapabilitiesRequest.
312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), # type: ignore 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp
Set value(s) using SetRequest.
Returns the timestamp from the successful SetResponse.
25class GNMIPath: 26 """Concrete class for working with `gnmi.Path` objects. 27 28 A `GNMIPath` should be treated as an immutable object. You can access 29 the wrapped `gnmi.Path` protobuf class using the `.path` property. 30 `GNMIPath` objects are hashable, but you must be careful that you do not 31 mutate them via the underlying `gnmi.Path`. 32 33 You can construct a GNMIPath from a string: 34 ``` 35 path = GNMIPath("interfaces/interface[name=eth1]/state") 36 ``` 37 38 You can construct a GNMIPath object from a `gnmi.Path` directly. 39 ``` 40 path = GNMIPath(update.path) 41 ``` 42 43 You can create paths by using an existing path as a template, without 44 modifying the original path. Use the `set` method: 45 ``` 46 operStatus = GNMIPath("interfaces/interface/state/oper-status") 47 path = operStatus.set("interface", name="eth1") 48 ``` 49 50 Use [] to access the name/key of path elements: 51 ``` 52 path[1] == "interface" 53 path["interface", "name"] == "eth1" 54 path["name"] == "eth1" 55 ``` 56 """ 57 58 path: gnmi.Path 59 "The underlying `gnmi.Path` protobuf representation." 60 61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path 83 84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name 88 89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name 93 94 @property 95 def origin(self) -> str: 96 "Return the path's origin." 97 return self.path.origin 98 99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target 103 104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result 119 120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path)) 123 124 @overload 125 def __getitem__( 126 self, key: int | str | tuple[int | str, str] 127 ) -> str: ... # pragma: no cover 128 129 @overload 130 def __getitem__(self, key: slice) -> "GNMIPath": ... # pragma: no cover 131 132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}") 156 157 def __eq__(self, rhs: Any) -> bool: 158 "Return True if path's are equal." 159 if not isinstance(rhs, GNMIPath): 160 return False 161 return self.path == rhs.path # pyright: ignore[reportUnknownVariableType] 162 163 def __hash__(self) -> int: 164 "Return hash of path." 165 return hash(tuple(_to_tuple(elem) for elem in self.path.elem)) 166 167 def __contains__(self, name: str) -> bool: 168 "Return True if element name is in path." 169 for elem in self.path.elem: 170 if elem.name == name: 171 return True 172 return False 173 174 def __repr__(self) -> str: 175 "Return string representation of path." 176 path = gnmistring.to_str(self.path) 177 if self.target or self.origin: 178 return f"GNMIPath({path!r}, origin={self.origin!r}, target={self.target!r})" 179 return f"GNMIPath({path!r})" 180 181 def __str__(self) -> str: 182 "Return path as string." 183 return gnmistring.to_str(self.path) 184 185 def __len__(self) -> int: 186 "Return the length of the path." 187 return len(self.path.elem) 188 189 def __truediv__(self, rhs: "GNMIPath | str") -> "GNMIPath": 190 "Append values to end of path." 191 if not isinstance(rhs, GNMIPath): 192 rhs = GNMIPath(rhs) 193 194 result = gnmi.Path( 195 elem=itertools.chain(self.path.elem, rhs.path.elem), 196 origin=self.origin, 197 target=self.target, 198 ) 199 return GNMIPath(result) 200 201 def __rtruediv__(self, lhs: str) -> "GNMIPath": 202 "Prepend values to the beginning of the path." 203 path = GNMIPath(lhs) 204 205 result = gnmi.Path(elem=itertools.chain(path.path.elem, self.path.elem)) 206 return GNMIPath(result) 207 208 def _slice( 209 self, start: int | None, stop: int | None, step: int | None 210 ) -> "GNMIPath": 211 "Return specified slice of GNMIPath." 212 if start is None: 213 start = 0 214 215 if stop is None: 216 stop = len(self) 217 elif stop < 0: 218 stop = len(self) + stop 219 220 if step is None: 221 step = 1 222 223 path = gnmi.Path() 224 for i in range(start, stop, step): 225 elem = self.path.elem[i] 226 path.elem.append(gnmi.PathElem(name=elem.name, key=elem.key)) 227 228 return GNMIPath(path) 229 230 def _rekey(self, keys: dict[str, Any]) -> "GNMIPath": 231 """Construct a new path with specified keys replaced.""" 232 if not keys: 233 raise ValueError("empty keys") 234 235 result = self.copy() 236 237 found = False 238 for elem in result.path.elem: 239 for key, value in keys.items(): 240 if key in elem.key: 241 elem.key[key] = str(value) 242 found = True 243 244 if not found: 245 raise ValueError(f"no keys found in path: {keys!r}") 246 247 return result
Concrete class for working with gnmi.Path objects.
A GNMIPath should be treated as an immutable object. You can access
the wrapped gnmi.Path protobuf class using the .path property.
GNMIPath objects are hashable, but you must be careful that you do not
mutate them via the underlying gnmi.Path.
You can construct a GNMIPath from a string:
path = GNMIPath("interfaces/interface[name=eth1]/state")
You can construct a GNMIPath object from a gnmi.Path directly.
path = GNMIPath(update.path)
You can create paths by using an existing path as a template, without
modifying the original path. Use the set method:
operStatus = GNMIPath("interfaces/interface/state/oper-status")
path = operStatus.set("interface", name="eth1")
Use [] to access the name/key of path elements:
path[1] == "interface"
path["interface", "name"] == "eth1"
path["name"] == "eth1"
61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path
Construct a GNMIPath from a string or gnmi.Path.
84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name
Return the first element of the path.
89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name
Return the last element of the path.
99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target
Return the path's target.
104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result
Construct a new GNMIPath with keys set for the given elem.
120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path))
Return a copy of the path.
132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}")
Return the specified element or key value.
385class GNMISubscription: 386 """Represents a gNMI subscription stream. 387 388 Returned from `GNMIClient.subscribe()`. 389 """ 390 391 _client: GNMIClient 392 _stream: _StreamTypeAlias | None 393 394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path) 403 404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub) 411 412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub) 422 423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub) 442 443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None 456 457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None 467 468 def cancel(self) -> None: 469 "Cancel the subscription." 470 if self._stream is not None: 471 self._stream.cancel() 472 self._stream = None 473 474 async def _subscribe(self): 475 assert self._stream is None 476 assert self._client._stub is not None 477 478 self._stream = cast( 479 _StreamTypeAlias, 480 self._client._stub.Subscribe(wait_for_ready=True), # type: ignore 481 ) 482 483 request = gnmi.SubscribeRequest(subscribe=self._sublist) 484 self._client._log_msg(request) 485 try: 486 await self._stream.write(request) 487 except grpc.RpcError as ex: 488 raise GNMIClientError(ex) from None 489 490 async def _read(self, stop_at_sync: bool) -> AsyncIterator[GNMIUpdate]: 491 assert self._stream is not None 492 493 while True: 494 msg = cast( 495 gnmi.SubscribeResponse, 496 await self._stream.read(), 497 ) 498 if msg is GRPC_EOF: 499 LOGGER.warning("gNMI _read: unexpected EOF") 500 return 501 502 self._client._log_msg(msg) 503 504 match msg.WhichOneof("response"): 505 case "update": 506 for update in _read_updates(msg.update): 507 yield update 508 case "sync_response": 509 if stop_at_sync: 510 if self._is_once(): 511 # FIXME? I expected Stratum to issue an EOF after 512 # the sync_response in "ONCE" mode, but it doesn't. 513 # For now, cancel the stream explictly. 514 self.cancel() 515 # Let grpc react to the cancellation immediately. 516 await asyncio.sleep(0) 517 return # All done! 518 LOGGER.warning("gNMI _read: ignored sync_response") 519 case other: 520 LOGGER.warning("gNMI _read: unexpected oneof: %s", other) 521 522 def _is_once(self) -> bool: 523 "Return true if the subscription is in ONCE mode." 524 return self._sublist.mode == gnmi.SubscriptionList.Mode.ONCE
Represents a gNMI subscription stream.
Returned from GNMIClient.subscribe().
394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path)
Initialize a GNMISubscription.
404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub)
Subscribe in ONCE mode to given paths.
412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub)
Subscribe in ON_CHANGE mode to given paths.
423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub)
Subscribe in SAMPLE mode to given paths.
443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None
Async iterator for initial subscription updates.
Retrieve all updates up to sync_response message.
457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None
Async iterator for all remaining subscription updates.
63@dataclass 64class GNMIUpdate: 65 "Represents a gNMI update returned from GNMIClient." 66 67 timestamp: int 68 path: GNMIPath 69 typed_value: gnmi.TypedValue | None 70 71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr) 81 82 def __repr__(self) -> str: 83 "Override repr to strip newline from end of `TypedValue`." 84 value = repr(self.typed_value).rstrip() 85 return f"GNMIUpdate(timestamp={self.timestamp!r}, path={self.path!r}, typed_value=`{value}`)"
Represents a gNMI update returned from GNMIClient.
71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr)
Return the value as a Python value.
118@dataclass(kw_only=True) 119class GRPCCredentialsTLS: 120 "GRPC channel credentials for Transport Layer Security (TLS)." 121 122 cacert: Path | bytes | None 123 """Certificate authority used to authenticate the certificate at the other 124 end of the connection.""" 125 126 cert: Path | bytes | None 127 "Certificate that identifies this side of the connection." 128 129 private_key: Path | bytes | None 130 "Private key associated with this side's certificate identity." 131 132 target_name_override: str = "" 133 "Override the target name used for TLS host name checking (useful for testing)." 134 135 call_credentials: grpc.AuthMetadataPlugin | None = None 136 """Optional GRPC call credentials for the client channel. Be aware that the 137 auth plugin's callback takes place in a different thread.""" 138 139 def to_client_credentials(self) -> grpc.ChannelCredentials: 140 "Create native SSL client credentials object." 141 root_certificates = _coerce_tls_path(self.cacert) 142 certificate_chain = _coerce_tls_path(self.cert) 143 private_key = _coerce_tls_path(self.private_key) 144 145 return self._compose_credentials( 146 grpc.ssl_channel_credentials( 147 root_certificates=root_certificates, 148 private_key=private_key, 149 certificate_chain=certificate_chain, 150 ) 151 ) 152 153 def to_server_credentials(self) -> grpc.ServerCredentials: 154 """Create native SSL server credentials object. 155 156 On the server side, we ignore the `call_credentials`. 157 """ 158 root_certificates = _coerce_tls_path(self.cacert) 159 certificate_chain = _coerce_tls_path(self.cert) 160 private_key = _coerce_tls_path(self.private_key) 161 162 if not private_key: 163 raise ValueError("Empty private key in server credentials") 164 165 if not certificate_chain: 166 raise ValueError("Empty certificate chain in server credential") 167 168 return grpc.ssl_server_credentials( 169 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 170 root_certificates=root_certificates, 171 require_client_auth=True, 172 ) 173 174 def _compose_credentials( 175 self, channel_cred: grpc.ChannelCredentials 176 ) -> grpc.ChannelCredentials: 177 "Compose call credentials with channel credentials." 178 if not self.call_credentials: 179 return channel_cred 180 181 call_cred = grpc.metadata_call_credentials(self.call_credentials) 182 return grpc.composite_channel_credentials(channel_cred, call_cred)
GRPC channel credentials for Transport Layer Security (TLS).
Certificate authority used to authenticate the certificate at the other end of the connection.
Private key associated with this side's certificate identity.
Override the target name used for TLS host name checking (useful for testing).
Optional GRPC call credentials for the client channel. Be aware that the auth plugin's callback takes place in a different thread.
139 def to_client_credentials(self) -> grpc.ChannelCredentials: 140 "Create native SSL client credentials object." 141 root_certificates = _coerce_tls_path(self.cacert) 142 certificate_chain = _coerce_tls_path(self.cert) 143 private_key = _coerce_tls_path(self.private_key) 144 145 return self._compose_credentials( 146 grpc.ssl_channel_credentials( 147 root_certificates=root_certificates, 148 private_key=private_key, 149 certificate_chain=certificate_chain, 150 ) 151 )
Create native SSL client credentials object.
153 def to_server_credentials(self) -> grpc.ServerCredentials: 154 """Create native SSL server credentials object. 155 156 On the server side, we ignore the `call_credentials`. 157 """ 158 root_certificates = _coerce_tls_path(self.cacert) 159 certificate_chain = _coerce_tls_path(self.cert) 160 private_key = _coerce_tls_path(self.private_key) 161 162 if not private_key: 163 raise ValueError("Empty private key in server credentials") 164 165 if not certificate_chain: 166 raise ValueError("Empty certificate chain in server credential") 167 168 return grpc.ssl_server_credentials( 169 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 170 root_certificates=root_certificates, 171 require_client_auth=True, 172 )
Create native SSL server credentials object.
On the server side, we ignore the call_credentials.
42class GRPCStatusCode(_EnumBase): 43 "IntEnum equivalent to `grpc.StatusCode`." 44 45 OK = rpc_code.OK 46 CANCELLED = rpc_code.CANCELLED 47 UNKNOWN = rpc_code.UNKNOWN 48 FAILED_PRECONDITION = rpc_code.FAILED_PRECONDITION 49 INVALID_ARGUMENT = rpc_code.INVALID_ARGUMENT 50 DEADLINE_EXCEEDED = rpc_code.DEADLINE_EXCEEDED 51 NOT_FOUND = rpc_code.NOT_FOUND 52 ALREADY_EXISTS = rpc_code.ALREADY_EXISTS 53 PERMISSION_DENIED = rpc_code.PERMISSION_DENIED 54 UNAUTHENTICATED = rpc_code.UNAUTHENTICATED 55 RESOURCE_EXHAUSTED = rpc_code.RESOURCE_EXHAUSTED 56 ABORTED = rpc_code.ABORTED 57 OUT_OF_RANGE = rpc_code.OUT_OF_RANGE 58 UNIMPLEMENTED = rpc_code.UNIMPLEMENTED 59 INTERNAL = rpc_code.INTERNAL 60 UNAVAILABLE = rpc_code.UNAVAILABLE 61 DATA_LOSS = rpc_code.DATA_LOSS 62 63 @classmethod 64 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 65 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 66 n: Any = val.value[0] 67 assert isinstance(n, int) 68 return GRPCStatusCode(n) 69 70 @staticmethod 71 def _validate_enum() -> None: 72 "Verify that GRPCStatusCode covers every possible grpc.StatusCode." 73 for value in grpc.StatusCode: 74 n: Any = value.value[0] 75 assert isinstance(n, int) 76 assert GRPCStatusCode[value.name].value == n, value.name
IntEnum equivalent to grpc.StatusCode.
63 @classmethod 64 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 65 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 66 n: Any = val.value[0] 67 assert isinstance(n, int) 68 return GRPCStatusCode(n)
Create corresponding GRPCStatusCode from a grpc.StatusCode object.