finsy
Finsy P4Runtime Controller Library
Finsy is a P4Runtime controller library written in Python using asyncio. Finsy includes support for gNMI.
Check out the examples directory for some demonstration programs.
Installation
Finsy requires Python 3.10 or later. To install the latest version, type pip install finsy.
P4Runtime Scripts
With Finsy, you can write a Python script that reads/writes P4Runtime entities for a single switch.
Here is a complete example that retrieves the P4Info from a switch:
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Print out a description of the switch's P4Info, if one is configured.
print(sw1.p4info)
fy.run(main())
Here is another example that prints out all non-default table entries.
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Do a wildcard read for table entries.
async for entry in sw1.read(fy.P4TableEntry()):
print(entry)
fy.run(main())
P4Runtime Controller
You can also write a P4Runtime controller that manages multiple switches independently. Your controller can react to events from the Switch by changing the contents of P4 tables.
Each switch is managed by an async ready_handler function. Your ready_handler function can read or
update various P4Runtime entities in the switch. It can also create tasks to listen for
packets or digests.
When you write P4Runtime updates to the switch, you use a unary operator (+, -, \~) to specify the operation: INSERT (+), DELETE (-) or MODIFY (\~).
import finsy as fy
async def ready_handler(sw: fy.Switch):
await sw.delete_all()
await sw.write(
[
# Insert (+) multicast group with ports 1, 2, 3 and CONTROLLER.
+fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3, 255]),
# Modify (~) default table entry to flood all unmatched packets.
~fy.P4TableEntry(
"ipv4",
action=fy.Action("flood"),
is_default_action=True,
),
]
)
async for packet in sw.read_packets():
print(f"{sw.name}: {packet}")
Use the SwitchOptions class to specify each switch's settings, including the p4info/p4blob and
ready_handler. Use the Controller class to drive multiple switch connections. Each switch will call back
into your ready_handler function after the P4Runtime connection is established.
from pathlib import Path
options = fy.SwitchOptions(
p4info=Path("hello.p4info.txt"),
p4blob=Path("hello.json"),
ready_handler=ready_handler,
)
controller = fy.Controller([
fy.Switch("sw1", "127.0.0.1:50001", options),
fy.Switch("sw2", "127.0.0.1:50002", options),
fy.Switch("sw3", "127.0.0.1:50003", options),
])
fy.run(controller.run())
Your ready_handler can spawn concurrent tasks with the Switch.create_task method. Tasks
created this way will have their lifetimes managed by the switch object.
If the switch disconnects or its role changes to backup, the task running your ready_handler
(and any tasks it spawned) will be cancelled and the ready_handler will begin again.
For more examples, see the examples directory.
Switch Read/Write API
The Switch class provides the API for interacting with P4Runtime switches. You will control
a Switch object with a "ready handler" function. The ready handler is an
async function that is called when the switch is ready to accept commands.
Your ready handler will typically write some control entities to the switch, then
listen for incoming events and react to them with more writes. You may occasionally read entities
from the switch.
When your ready handler is invoked, there is already a P4Runtime channel established, with client
arbitration completed, and pipeline configured as specified in SwitchOptions.
Here is an example skeleton program. The ready handler is named ready().
async def ready(switch: fy.Switch):
# Check if switch is the primary. If not, we may want to proceed
# in read-only mode. In this example, ignore switch if it's a backup.
if not switch.is_primary:
return
# If we're reconnecting to a switch, it will already have runtime state.
# In this example, we just delete all entities and start over.
await switch.delete_all()
# Provision the pipeline with one or more `write` transactions. Each
# `write` is a single WriteRequest which may contain multiple updates.
await switch.write(
# [Next section will cover what goes here.]
)
# Listen for events and respond to them. This "infinite" loop will
# continue until the Switch disconnects, changes primary/backup status,
# or the controller is stopped.
async for packet in switch.read_packets():
await handle_packet(switch, packet)
The Switch class provides a switch.create_task method to start a managed task.
Tasks allow you to perform concurrent operations on the same switch. We could have
written the last stanza above that reads packets in an infinite loop as a separate
task. It's okay for the ready handler function to return early; any tasks it
created will still run.
Writes
Use the write() method to write one or more P4Runtime updates and packets.
A P4Runtime update supports one of three operations: INSERT, MODIFY or DELETE. Some entities support all three operations. Other entities only support MODIFY.
| Entity | Operations Permitted | Related Classes |
|---|---|---|
P4TableEntry |
INSERT, MODIFY, DELETE | Match, Action, IndirectAction, P4MeterConfig, P4CounterData, P4MeterCounterData |
P4ActionProfileMember |
INSERT, MODIFY, DELETE | |
P4ActionProfileGroup |
INSERT, MODIFY, DELETE | P4Member |
P4MulticastGroupEntry |
INSERT, MODIFY, DELETE | |
P4CloneSessionEntry |
INSERT, MODIFY, DELETE | |
P4DigestEntry |
INSERT, MODIFY, DELETE | |
P4ExternEntry |
INSERT, MODIFY, DELETE | |
P4RegisterEntry |
MODIFY | |
P4CounterEntry |
MODIFY | P4CounterData |
P4DirectCounterEntry |
MODIFY | P4CounterData |
P4MeterEntry |
MODIFY | P4MeterConfig, P4MeterCounterData |
P4DirectMeterEntry |
MODIFY | |
P4ValueSetEntry |
MODIFY | P4ValueSetMember |
Insert/Modify/Delete Updates
To specify the operation, use a unary + (INSERT), ~ (MODIFY), or - (DELETE). If you
do not specify the operation, write will raise a ValueError exception.
Here is an example showing how to insert and delete two different entities in the same WriteRequest.
await switch.write([
+fy.P4TableEntry( # unary + means INSERT
"ipv4",
match=fy.Match(dest="192.168.1.0/24"),
action=fy.Action("forward", port=1),
),
-fy.P4TableEntry( # unary - means DELETE
"ipv4",
match=fy.Match(dest="192.168.2.0/24"),
action=fy.Action("forward", port=2),
),
])
You should not insert, modify or delete the same entry in the same WriteRequest.
If you are performing the same operation on all entities, you can use the Switch
insert, delete, or modify methods.
await switch.insert([
fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3]),
fy.P4MulticastGroupEntry(2, replicas=[4, 5, 6]),
])
Modify-Only Updates
For entities that only support the modify operation, you do not need to specify the operation. (You can
optionally use ~.)
await switch.write([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
You can also use the modify method:
await switch.modify([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
If you pass a modify-only entity to the insert or delete methods, the P4Runtime server will
return an error.
Sending Packets
Use the write method to send a packet.
await switch.write([fy.P4PacketOut(b"a payload.....", port=3)])
You can include other entities in the same call. Any non-update objects (e.g. P4PacketOut, P4DigestListAck) will be sent before the WriteRequest.
Listening for Packets
To receive packets, use the async iterator Switch.read_packets().
In this example, pkt is a P4PacketIn object.
read_packets can filter for a specific eth_type.
# Read packets filtering only for ARP (eth_type == 0x0806).
async for pkt in switch.read_packets(eth_types={0x0806}):
# You can access the packet payload `pkt.payload` or any metadata value,
# e.g. `pkt['ingress_port']`
print(pkt.payload)
print(pkt['ingress_port'])
Listening for Digests
To receive digests, use the async iterator Switch.read_digests. You must specify
the name of the digest from your P4 program.
async for digest in switch.read_digests("digest_t"):
# You can access the digest metadata e.g. `digest['ingress_port']`
# Your code may need to update table entries based on the digest data.
# To ack the digest, write `digest.ack()`.
await switch.write([entry, ...])
await switch.write([digest.ack()])
To acknowledge the digest entry, you can write digest.ack().
Listening for Idle Timeouts
To receive idle timeout notifications, use the async iterator Switch.read_idle_timeouts.
You will receive a P4IdleTimeoutNotification which contains multiple table
entries -- one for each entry that timed out.
async for timeout in switch.read_idle_timeouts():
for entry in timeout.table_entry:
print(timeout.timestamp, entry)
Other Events
A P4 switch may report other events using the EventEmitter API. See
the SwitchEvent class for the event types. Each switch has a switch.ee
attribute that lets your code register for event callbacks.
Development and Testing
Perform these steps to set up your local environment for Finsy development, or try the codespace. Finsy requires Python 3.10 or later. If poetry is not installed, follow these directions to install it.
Clone and Prepare a Virtual Environment
The poetry install command installs all development dependencies into the
virtual environment (venv).
$ git clone https://github.com/byllyfish/finsy.git
$ cd finsy
$ python3 -m venv .venv
$ poetry install
Run Unit Tests
When you run pytest from the top level of the repository, you will run the unit tests.
$ poetry run pytest
Run Integration Tests
When you run pytest from within the examples directory, you will run the integration
tests instead of the unit tests. The integration tests run the example programs against a
Mininet network. Docker or podman are required.
$ cd examples
$ poetry run pytest
API Documentation
1""" 2.. include:: ../README.md 3 4# API Documentation 5""" 6 7# Copyright (c) 2022-2023 Bill Fisher 8# 9# Licensed under the Apache License, Version 2.0 (the "License"); 10# you may not use this file except in compliance with the License. 11# You may obtain a copy of the License at 12# 13# http://www.apache.org/licenses/LICENSE-2.0 14# 15# Unless required by applicable law or agreed to in writing, software 16# distributed under the License is distributed on an "AS IS" BASIS, 17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18# See the License for the specific language governing permissions and 19# limitations under the License. 20 21__version__ = "0.28.0" 22 23import sys 24 25if sys.version_info < (3, 10): # pragma: no cover 26 raise RuntimeError("Requires Python 3.10 or later.") 27 28from .controller import Controller 29from .gnmiclient import GNMIClient, GNMISubscription, GNMIUpdate 30from .gnmipath import GNMIPath 31from .grpcutil import GRPCCredentialsTLS, GRPCStatusCode 32from .log import LoggerAdapter 33from .macaddr import MACAddress 34from .p4client import P4Client, P4ClientError, P4Error 35from .p4entity import ( 36 P4ActionProfileGroup, 37 P4ActionProfileMember, 38 P4CloneSessionEntry, 39 P4CounterData, 40 P4CounterEntry, 41 P4DigestEntry, 42 P4DigestList, 43 P4DigestListAck, 44 P4DirectCounterEntry, 45 P4DirectMeterEntry, 46 P4ExternEntry, 47 P4IndirectAction, 48 P4Member, 49 P4MeterConfig, 50 P4MeterCounterData, 51 P4MeterEntry, 52 P4MulticastGroupEntry, 53 P4PacketIn, 54 P4PacketOut, 55 P4RegisterEntry, 56 P4TableAction, 57 P4TableEntry, 58 P4TableMatch, 59 P4ValueSetEntry, 60) 61from .p4schema import ( 62 P4ActionSelectionMode, 63 P4ActionSizeSemantics, 64 P4ConfigAction, 65 P4CounterUnit, 66 P4Schema, 67) 68from .ports import SwitchPort, SwitchPortList 69from .runner import run 70from .switch import Switch, SwitchEvent, SwitchOptions 71 72Match = P4TableMatch 73"`Match` is an alias for P4TableMatch." 74 75Action = P4TableAction 76"`Action` is an alias for P4TableAction." 77 78IndirectAction = P4IndirectAction 79"`IndirectAction` is an alias for P4IndirectAction." 80 81__all__ = [ 82 "run", 83 "Controller", 84 "LoggerAdapter", 85 "MACAddress", 86 "P4ActionProfileGroup", 87 "P4ActionProfileMember", 88 "P4Client", 89 "P4ClientError", 90 "P4CloneSessionEntry", 91 "P4CounterData", 92 "P4CounterEntry", 93 "P4CounterUnit", 94 "P4DigestEntry", 95 "P4DigestList", 96 "P4DigestListAck", 97 "P4DirectCounterEntry", 98 "P4DirectMeterEntry", 99 "P4Error", 100 "P4ExternEntry", 101 "IndirectAction", # alias for P4IndirectAction 102 "P4IndirectAction", 103 "P4Member", 104 "P4MeterConfig", 105 "P4MeterCounterData", 106 "P4MeterEntry", 107 "P4MulticastGroupEntry", 108 "P4PacketIn", 109 "P4PacketOut", 110 "P4RegisterEntry", 111 "Action", # alias for P4TableAction 112 "P4TableAction", 113 "P4TableEntry", 114 "Match", # alias for P4TableMatch 115 "P4TableMatch", 116 "P4ValueSetEntry", 117 "P4ActionSelectionMode", 118 "P4ActionSizeSemantics", 119 "P4ConfigAction", 120 "P4Schema", 121 "Switch", 122 "SwitchEvent", 123 "SwitchOptions", 124 "SwitchPort", 125 "SwitchPortList", 126 "GNMIClient", 127 "GNMIPath", 128 "GNMISubscription", 129 "GNMIUpdate", 130 "GRPCCredentialsTLS", 131 "GRPCStatusCode", 132]
53def run(coro: Coroutine[Any, Any, None]) -> None: 54 """`finsy.run` provides a useful wrapper around `asyncio.run`. 55 56 This function implements common boilerplate for running a Finsy application. 57 58 - Set up basic logging to stderr at the INFO log level. 59 - Set up a signal handler for SIGTERM that shuts down gracefully. 60 - Set up caching of P4Info data so common definitions are re-used. 61 62 Example: 63 64 ```python 65 import finsy as fy 66 67 async def main(): 68 async with fy.Switch("sw", "127.0.0.1:50001") as sw: 69 print(sw.p4info) 70 71 if __name__ == "__main__": 72 fy.run(main()) 73 ``` 74 75 If you choose to use `asyncio.run` instead, your P4Schema/P4Info objects 76 will not be eligible for sharing. You can create your own `P4SchemaCache` 77 context manager to implement this. 78 """ 79 try: 80 asyncio.run(_finsy_boilerplate(coro)) 81 except (KeyboardInterrupt, asyncio.CancelledError): 82 pass
finsy.run provides a useful wrapper around asyncio.run.
This function implements common boilerplate for running a Finsy application.
- Set up basic logging to stderr at the INFO log level.
- Set up a signal handler for SIGTERM that shuts down gracefully.
- Set up caching of P4Info data so common definitions are re-used.
Example:
import finsy as fy
async def main():
async with fy.Switch("sw", "127.0.0.1:50001") as sw:
print(sw.p4info)
if __name__ == "__main__":
fy.run(main())
If you choose to use asyncio.run instead, your P4Schema/P4Info objects
will not be eligible for sharing. You can create your own P4SchemaCache
context manager to implement this.
30@final 31class Controller: 32 """Represents a collection of P4Runtime switches. 33 34 Each `Switch` in the Controller is identified by its name. Each name must 35 be unique. 36 37 ``` 38 switches = [ 39 fy.Switch("sw1", "10.0.0.1:50000"), 40 fy.Switch("sw2", "10.0.0.2:50000"), 41 ] 42 controller = fy.Controller(switches) 43 await controller.run() 44 ``` 45 46 A Controller can be running or stopped. There are two ways to run a 47 Controller. You can use the `Controller.run()` method, or you can use 48 a Controller as a context manager. 49 50 ``` 51 controller = fy.Controller(switches) 52 async with controller: 53 # Let controller run for 60 seconds. 54 await asyncio.sleep(60) 55 ``` 56 57 You can `add` or `remove` switches regardless of whether a Controller is 58 running or not. If the Controller is not running, adding or removing a Switch is 59 instantaneous. If the Controller is running, adding a Switch will start 60 it running asynchronously. Removing a Switch will schedule the Switch to stop, 61 but defer actual removal until the Switch has stopped asynchronously. 62 63 When a switch starts inside a Controller, it fires the `CONTROLLER_ENTER` 64 event. When a switch stops inside a Controller, it fires the `CONTROLLER_LEAVE` 65 event. 66 67 A Controller supports these methods to access its contents: 68 69 - len(controller): Return number of switches in the controller. 70 - controller[name]: Return the switch with the given name. 71 - controller.get(name): Return the switch with the given name, or None if not found. 72 73 You can iterate over the switches in a Controller using a for loop: 74 75 ``` 76 for switch in controller: 77 print(switch.name) 78 ``` 79 80 Any task or sub-task running inside a controller can retrieve its 81 Controller object using the `Controller.current()` method. 82 """ 83 84 _switches: dict[str, Switch] 85 _pending_removal: set[Switch] 86 _task_count: CountdownFuture 87 88 _control_task: asyncio.Task[Any] | None = None 89 "Keep track of controller's main task." 90 91 def __init__(self, switches: Iterable[Switch] = ()): 92 """Initialize Controller object with an initial list of switches. 93 94 Args: 95 switches: a collection or other iterable of Switch objects. 96 """ 97 self._switches = {} 98 self._pending_removal = set() 99 self._task_count = CountdownFuture() 100 101 for switch in switches: 102 if switch.name in self._switches: 103 raise ValueError(f"Switch named {switch.name!r} already exists") 104 self._switches[switch.name] = switch 105 106 @property 107 def running(self) -> bool: 108 "True if Controller is running." 109 return self._control_task is not None 110 111 async def run(self) -> None: 112 "Run the controller." 113 async with self: 114 await wait_for_cancel() 115 116 def stop(self) -> None: 117 "Stop the controller if it is running." 118 if self._control_task is not None: 119 self._control_task.cancel() 120 121 async def __aenter__(self) -> Self: 122 "Run the controller as a context manager (see also run())." 123 assert not self.running, "Controller.__aenter__ is not re-entrant" 124 assert self._task_count.value() == 0 125 assert not self._pending_removal 126 127 self._control_task = asyncio.current_task() 128 _CONTROLLER.set(self) 129 130 try: 131 # Start each switch running. 132 for switch in self: 133 self._start_switch(switch) 134 except Exception: 135 self._control_task = None 136 _CONTROLLER.set(None) 137 raise 138 139 return self 140 141 async def __aexit__( 142 self, 143 _exc_type: type[BaseException] | None, 144 _exc_val: BaseException | None, 145 _exc_tb: TracebackType | None, 146 ) -> bool | None: 147 "Run the controller as a context manager (see also run())." 148 assert self.running 149 150 try: 151 # Stop all the switches. 152 for switch in self: 153 self._stop_switch(switch) 154 155 # Wait for switch tasks to finish. 156 await self._task_count.wait() 157 158 finally: 159 self._control_task = None 160 _CONTROLLER.set(None) 161 162 def add(self, switch: Switch) -> None: 163 """Add a switch to the controller. 164 165 If the controller is running, tell the switch to start. 166 167 Args: 168 switch: the Switch object. 169 """ 170 if switch.name in self._switches: 171 raise ValueError(f"Switch named {switch.name!r} already exists") 172 173 self._switches[switch.name] = switch 174 if self.running: 175 self._start_switch(switch) 176 177 def remove(self, switch: Switch) -> asyncio.Event: 178 """Remove a switch from the controller. 179 180 If the controller is running, tell the switch to stop and schedule it 181 for removal when it fully stops. 182 183 Args: 184 switch: the Switch object. 185 """ 186 name = switch.name 187 if self._switches.get(name, None) is not switch: 188 raise ValueError(f"Switch named {name!r} not found") 189 190 del self._switches[name] 191 192 event = asyncio.Event() 193 if self.running: 194 # When controller is running, event will complete when switch 195 # is actually stopped. 196 self._stop_switch(switch) 197 self._pending_removal.add(switch) 198 199 def _controller_leave(sw: Switch): 200 self._pending_removal.discard(sw) 201 event.set() 202 203 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 204 else: 205 # When controller is not running, event completes immediately. 206 event.set() 207 208 return event 209 210 def _start_switch(self, switch: Switch): 211 "Start the switch's control task." 212 LOGGER.debug("Controller._start_switch: %r", switch) 213 assert switch._control_task is None # pyright: ignore[reportPrivateUsage] 214 215 switch.ee.emit(SwitchEvent.CONTROLLER_ENTER, switch) 216 217 task = asyncio.create_task(switch.run(), name=f"fy:{switch.name}") 218 switch._control_task = task # pyright: ignore[reportPrivateUsage] 219 self._task_count.increment() 220 221 def _switch_done(done: asyncio.Task[Any]): 222 switch._control_task = None # pyright: ignore[reportPrivateUsage] 223 switch.ee.emit(SwitchEvent.CONTROLLER_LEAVE, switch) 224 self._task_count.decrement() 225 226 if not done.cancelled(): 227 ex = done.exception() 228 if ex is not None: 229 if not isinstance(ex, SwitchFailFastError): 230 # The `fail_fast` error has already been logged. If 231 # it's any other error, log it. (There shouldn't be 232 # any other error.) 233 LOGGER.critical( 234 "Controller task %r failed", 235 done.get_name(), 236 exc_info=ex, 237 ) 238 # Shutdown the program cleanly due to switch failure. 239 raise SystemExit(99) 240 241 task.add_done_callback(_switch_done) 242 243 def _stop_switch(self, switch: Switch): 244 "Stop the switch's control task." 245 LOGGER.debug("Controller._stop_switch: %r", switch) 246 247 if switch._control_task is not None: # pyright: ignore[reportPrivateUsage] 248 switch._control_task.cancel() # pyright: ignore[reportPrivateUsage] 249 250 def __len__(self) -> int: 251 "Return the number of switches." 252 return len(self._switches) 253 254 def __iter__(self) -> Iterator[Switch]: 255 "Iterate over the switches." 256 return iter(self._switches.values()) 257 258 def __getitem__(self, name: str) -> Switch: 259 "Retrieve a switch by name." 260 return self._switches[name] 261 262 def get(self, name: str) -> Switch | None: 263 "Retrieve a switch by name, or return None if not found." 264 return self._switches.get(name) 265 266 @staticmethod 267 def current() -> "Controller": 268 "Return the current Controller object." 269 result = _CONTROLLER.get() 270 if result is None: 271 raise RuntimeError("controller does not exist") 272 return result
Represents a collection of P4Runtime switches.
Each Switch in the Controller is identified by its name. Each name must
be unique.
switches = [
fy.Switch("sw1", "10.0.0.1:50000"),
fy.Switch("sw2", "10.0.0.2:50000"),
]
controller = fy.Controller(switches)
await controller.run()
A Controller can be running or stopped. There are two ways to run a
Controller. You can use the Controller.run() method, or you can use
a Controller as a context manager.
controller = fy.Controller(switches)
async with controller:
# Let controller run for 60 seconds.
await asyncio.sleep(60)
You can add or remove switches regardless of whether a Controller is
running or not. If the Controller is not running, adding or removing a Switch is
instantaneous. If the Controller is running, adding a Switch will start
it running asynchronously. Removing a Switch will schedule the Switch to stop,
but defer actual removal until the Switch has stopped asynchronously.
When a switch starts inside a Controller, it fires the CONTROLLER_ENTER
event. When a switch stops inside a Controller, it fires the CONTROLLER_LEAVE
event.
A Controller supports these methods to access its contents:
- len(controller): Return number of switches in the controller.
- controller[name]: Return the switch with the given name.
- controller.get(name): Return the switch with the given name, or None if not found.
You can iterate over the switches in a Controller using a for loop:
for switch in controller:
print(switch.name)
Any task or sub-task running inside a controller can retrieve its
Controller object using the Controller.current() method.
91 def __init__(self, switches: Iterable[Switch] = ()): 92 """Initialize Controller object with an initial list of switches. 93 94 Args: 95 switches: a collection or other iterable of Switch objects. 96 """ 97 self._switches = {} 98 self._pending_removal = set() 99 self._task_count = CountdownFuture() 100 101 for switch in switches: 102 if switch.name in self._switches: 103 raise ValueError(f"Switch named {switch.name!r} already exists") 104 self._switches[switch.name] = switch
Initialize Controller object with an initial list of switches.
Arguments:
- switches: a collection or other iterable of Switch objects.
106 @property 107 def running(self) -> bool: 108 "True if Controller is running." 109 return self._control_task is not None
True if Controller is running.
111 async def run(self) -> None: 112 "Run the controller." 113 async with self: 114 await wait_for_cancel()
Run the controller.
116 def stop(self) -> None: 117 "Stop the controller if it is running." 118 if self._control_task is not None: 119 self._control_task.cancel()
Stop the controller if it is running.
121 async def __aenter__(self) -> Self: 122 "Run the controller as a context manager (see also run())." 123 assert not self.running, "Controller.__aenter__ is not re-entrant" 124 assert self._task_count.value() == 0 125 assert not self._pending_removal 126 127 self._control_task = asyncio.current_task() 128 _CONTROLLER.set(self) 129 130 try: 131 # Start each switch running. 132 for switch in self: 133 self._start_switch(switch) 134 except Exception: 135 self._control_task = None 136 _CONTROLLER.set(None) 137 raise 138 139 return self
Run the controller as a context manager (see also run()).
162 def add(self, switch: Switch) -> None: 163 """Add a switch to the controller. 164 165 If the controller is running, tell the switch to start. 166 167 Args: 168 switch: the Switch object. 169 """ 170 if switch.name in self._switches: 171 raise ValueError(f"Switch named {switch.name!r} already exists") 172 173 self._switches[switch.name] = switch 174 if self.running: 175 self._start_switch(switch)
Add a switch to the controller.
If the controller is running, tell the switch to start.
Arguments:
- switch: the Switch object.
177 def remove(self, switch: Switch) -> asyncio.Event: 178 """Remove a switch from the controller. 179 180 If the controller is running, tell the switch to stop and schedule it 181 for removal when it fully stops. 182 183 Args: 184 switch: the Switch object. 185 """ 186 name = switch.name 187 if self._switches.get(name, None) is not switch: 188 raise ValueError(f"Switch named {name!r} not found") 189 190 del self._switches[name] 191 192 event = asyncio.Event() 193 if self.running: 194 # When controller is running, event will complete when switch 195 # is actually stopped. 196 self._stop_switch(switch) 197 self._pending_removal.add(switch) 198 199 def _controller_leave(sw: Switch): 200 self._pending_removal.discard(sw) 201 event.set() 202 203 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 204 else: 205 # When controller is not running, event completes immediately. 206 event.set() 207 208 return event
Remove a switch from the controller.
If the controller is running, tell the switch to stop and schedule it for removal when it fully stops.
Arguments:
- switch: the Switch object.
254 def __iter__(self) -> Iterator[Switch]: 255 "Iterate over the switches." 256 return iter(self._switches.values())
Iterate over the switches.
258 def __getitem__(self, name: str) -> Switch: 259 "Retrieve a switch by name." 260 return self._switches[name]
Retrieve a switch by name.
262 def get(self, name: str) -> Switch | None: 263 "Retrieve a switch by name, or return None if not found." 264 return self._switches.get(name)
Retrieve a switch by name, or return None if not found.
68class LoggerAdapter(_BaseLoggerAdapter): 69 """Custom log adapter to include the name of the current task.""" 70 71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs 81 82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name representation for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
Custom log adapter to include the name of the current task.
71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs
Process the logging message and keyword arguments passed in to a logging call to insert contextual information.
82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name representation for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
INFO level uses a concise task name representation for readability.
24@functools.total_ordering 25class MACAddress: 26 """Concrete class for a MAC address.""" 27 28 __slots__ = ("__weakref__", "_mac") 29 _mac: int 30 31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value)) 44 45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac) 49 50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH 54 55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0 59 60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0 64 65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private 69 70 @property 71 def is_unspecified(self) -> bool: 72 "Return true if MAC address is all zeros." 73 return self._mac == 0 74 75 @property 76 def is_broadcast(self) -> bool: 77 "Return true if MAC address is the broadcast address." 78 return self._mac == (1 << _BIT_WIDTH) - 1 79 80 def __int__(self) -> int: 81 return self._mac 82 83 def __eq__(self, rhs: object) -> bool: 84 if not isinstance(rhs, MACAddress): 85 return NotImplemented 86 return self._mac == rhs._mac 87 88 def __lt__(self, rhs: object) -> bool: 89 if not isinstance(rhs, MACAddress): 90 return NotImplemented 91 return self._mac < rhs._mac 92 93 def __repr__(self) -> str: 94 return f"MACAddress({_to_string(self._mac)!r})" 95 96 def __str__(self) -> str: 97 return _to_string(self._mac) 98 99 def __hash__(self) -> int: 100 return hash(hex(self._mac))
Concrete class for a MAC address.
31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value))
Construct a MAC address from a string, integer or bytes object.
45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac)
Return the MAC address a byte string.
50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH
Return the maximum prefix length (48 bits).
55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0
Return true if MAC address has the multicast bit set.
60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0
Return true if MAC address has the locally administered bit set.
65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private
Return true if the locally administered bit is not set.
1510@decodable("action_profile_group") 1511@dataclass(slots=True) 1512class P4ActionProfileGroup(_P4Writable): 1513 "Represents a P4Runtime ActionProfileGroup." 1514 1515 action_profile_id: str = "" 1516 _: KW_ONLY 1517 group_id: int = 0 1518 max_size: int = 0 1519 members: Sequence[P4Member] | None = None 1520 1521 def encode(self, schema: P4Schema) -> p4r.Entity: 1522 "Encode P4ActionProfileGroup as protobuf." 1523 if not self.action_profile_id: 1524 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1525 1526 profile = schema.action_profiles[self.action_profile_id] 1527 1528 if self.members is not None: 1529 members = [member.encode() for member in self.members] 1530 else: 1531 members = None 1532 1533 entry = p4r.ActionProfileGroup( 1534 action_profile_id=profile.id, 1535 group_id=self.group_id, 1536 members=members, 1537 max_size=self.max_size, 1538 ) 1539 return p4r.Entity(action_profile_group=entry) 1540 1541 @classmethod 1542 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1543 "Decode protobuf to ActionProfileGroup data." 1544 entry = msg.action_profile_group 1545 if entry.action_profile_id == 0: 1546 return cls() 1547 1548 profile = schema.action_profiles[entry.action_profile_id] 1549 1550 if entry.members: 1551 members = [P4Member.decode(member) for member in entry.members] 1552 else: 1553 members = None 1554 1555 return cls( 1556 action_profile_id=profile.alias, 1557 group_id=entry.group_id, 1558 max_size=entry.max_size, 1559 members=members, 1560 ) 1561 1562 def action_str(self, _schema: P4Schema) -> str: 1563 "Return string representation of the weighted members." 1564 if not self.members: 1565 return "" 1566 1567 return " ".join( 1568 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1569 )
Represents a P4Runtime ActionProfileGroup.
1521 def encode(self, schema: P4Schema) -> p4r.Entity: 1522 "Encode P4ActionProfileGroup as protobuf." 1523 if not self.action_profile_id: 1524 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1525 1526 profile = schema.action_profiles[self.action_profile_id] 1527 1528 if self.members is not None: 1529 members = [member.encode() for member in self.members] 1530 else: 1531 members = None 1532 1533 entry = p4r.ActionProfileGroup( 1534 action_profile_id=profile.id, 1535 group_id=self.group_id, 1536 members=members, 1537 max_size=self.max_size, 1538 ) 1539 return p4r.Entity(action_profile_group=entry)
Encode P4ActionProfileGroup as protobuf.
1541 @classmethod 1542 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1543 "Decode protobuf to ActionProfileGroup data." 1544 entry = msg.action_profile_group 1545 if entry.action_profile_id == 0: 1546 return cls() 1547 1548 profile = schema.action_profiles[entry.action_profile_id] 1549 1550 if entry.members: 1551 members = [P4Member.decode(member) for member in entry.members] 1552 else: 1553 members = None 1554 1555 return cls( 1556 action_profile_id=profile.alias, 1557 group_id=entry.group_id, 1558 max_size=entry.max_size, 1559 members=members, 1560 )
Decode protobuf to ActionProfileGroup data.
1562 def action_str(self, _schema: P4Schema) -> str: 1563 "Return string representation of the weighted members." 1564 if not self.members: 1565 return "" 1566 1567 return " ".join( 1568 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1569 )
Return string representation of the weighted members.
1408@decodable("action_profile_member") 1409@dataclass(slots=True) 1410class P4ActionProfileMember(_P4Writable): 1411 "Represents a P4Runtime ActionProfileMember." 1412 1413 action_profile_id: str = "" 1414 _: KW_ONLY 1415 member_id: int = 0 1416 action: P4TableAction | None = None 1417 1418 def encode(self, schema: P4Schema) -> p4r.Entity: 1419 "Encode P4ActionProfileMember as protobuf." 1420 if not self.action_profile_id: 1421 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1422 1423 profile = schema.action_profiles[self.action_profile_id] 1424 1425 if self.action: 1426 action = self.action.encode_action(schema) 1427 else: 1428 action = None 1429 1430 entry = p4r.ActionProfileMember( 1431 action_profile_id=profile.id, 1432 member_id=self.member_id, 1433 action=action, 1434 ) 1435 return p4r.Entity(action_profile_member=entry) 1436 1437 @classmethod 1438 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1439 "Decode protobuf to ActionProfileMember data." 1440 entry = msg.action_profile_member 1441 if entry.action_profile_id == 0: 1442 return cls() 1443 1444 profile = schema.action_profiles[entry.action_profile_id] 1445 1446 if entry.HasField("action"): 1447 action = P4TableAction.decode_action(entry.action, schema) 1448 else: 1449 action = None 1450 1451 return cls( 1452 action_profile_id=profile.alias, 1453 member_id=entry.member_id, 1454 action=action, 1455 ) 1456 1457 def action_str(self, schema: P4Schema) -> str: 1458 "Format the action as a human-readable, canonical string." 1459 if self.action is None: 1460 return NOACTION_STR 1461 return self.action.format_str(schema)
Represents a P4Runtime ActionProfileMember.
1418 def encode(self, schema: P4Schema) -> p4r.Entity: 1419 "Encode P4ActionProfileMember as protobuf." 1420 if not self.action_profile_id: 1421 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1422 1423 profile = schema.action_profiles[self.action_profile_id] 1424 1425 if self.action: 1426 action = self.action.encode_action(schema) 1427 else: 1428 action = None 1429 1430 entry = p4r.ActionProfileMember( 1431 action_profile_id=profile.id, 1432 member_id=self.member_id, 1433 action=action, 1434 ) 1435 return p4r.Entity(action_profile_member=entry)
Encode P4ActionProfileMember as protobuf.
1437 @classmethod 1438 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1439 "Decode protobuf to ActionProfileMember data." 1440 entry = msg.action_profile_member 1441 if entry.action_profile_id == 0: 1442 return cls() 1443 1444 profile = schema.action_profiles[entry.action_profile_id] 1445 1446 if entry.HasField("action"): 1447 action = P4TableAction.decode_action(entry.action, schema) 1448 else: 1449 action = None 1450 1451 return cls( 1452 action_profile_id=profile.alias, 1453 member_id=entry.member_id, 1454 action=action, 1455 )
Decode protobuf to ActionProfileMember data.
241class P4Client: 242 "Implements a P4Runtime client." 243 244 _address: str 245 _credentials: GRPCCredentialsTLS | None 246 _wait_for_ready: bool 247 _channel: grpc.aio.Channel | None = None 248 _stub: p4r_grpc.P4RuntimeStub | None = None 249 _stream: _P4StreamTypeAlias | None = None 250 _complete_request: Callable[[pbutil.PBMessage], None] | None = None 251 252 _schema: P4Schema | None = None 253 "Annotate log messages using this optional P4Info schema." 254 255 def __init__( 256 self, 257 address: str, 258 credentials: GRPCCredentialsTLS | None = None, 259 *, 260 wait_for_ready: bool = True, 261 ) -> None: 262 self._address = address 263 self._credentials = credentials 264 self._wait_for_ready = wait_for_ready 265 266 @property 267 def channel(self) -> grpc.aio.Channel | None: 268 "Return the GRPC channel object, or None if the channel is not open." 269 return self._channel 270 271 async def __aenter__(self) -> Self: 272 await self.open() 273 return self 274 275 async def __aexit__(self, *args: Any) -> bool | None: 276 await self.close() 277 278 async def open( 279 self, 280 *, 281 schema: P4Schema | None = None, 282 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 283 ) -> None: 284 """Open the client channel. 285 286 Note: This method is `async` for forward-compatible reasons. 287 """ 288 assert self._channel is None 289 assert self._stub is None 290 291 # Increase max_metadata_size from 8 KB to 64 KB. 292 options = GRPCOptions( 293 max_metadata_size=64 * 1024, # 64 kilobytes 294 max_reconnect_backoff_ms=15000, # 15.0 seconds 295 ) 296 297 self._channel = grpc_channel( 298 self._address, 299 credentials=self._credentials, 300 options=options, 301 client_type="P4Client", 302 ) 303 304 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 305 self._schema = schema 306 self._complete_request = complete_request 307 308 async def close(self) -> None: 309 "Close the client channel." 310 if self._channel is not None: 311 LOGGER.debug("P4Client: close channel %r", self._address) 312 313 if self._stream is not None: 314 self._stream.cancel() 315 self._stream = None 316 317 await self._channel.close() 318 self._channel = None 319 self._stub = None 320 self._schema = None 321 self._complete_request = None 322 323 async def send(self, msg: p4r.StreamMessageRequest) -> None: 324 """Send a message to the stream.""" 325 assert self._stub is not None 326 327 if not self._stream or self._stream.done(): 328 self._stream = cast( 329 _P4StreamTypeAlias, 330 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 331 ) 332 333 self._log_msg(msg) 334 335 try: 336 await self._stream.write(msg) 337 except grpc.RpcError as ex: 338 raise P4ClientError(ex, "send") from None 339 340 async def receive(self) -> p4r.StreamMessageResponse: 341 """Read a message from the stream.""" 342 assert self._stream is not None 343 344 try: 345 msg = cast( 346 p4r.StreamMessageResponse, 347 await self._stream.read(), 348 ) 349 if msg is GRPC_EOF: 350 # Treat EOF as a protocol violation. 351 raise RuntimeError("P4Client.receive got EOF!") 352 353 except grpc.RpcError as ex: 354 raise P4ClientError(ex, "receive") from None 355 356 self._log_msg(msg) 357 return msg 358 359 @overload 360 async def request( 361 self, msg: p4r.WriteRequest 362 ) -> p4r.WriteResponse: ... # pragma: no cover 363 364 @overload 365 async def request( 366 self, msg: p4r.GetForwardingPipelineConfigRequest 367 ) -> p4r.GetForwardingPipelineConfigResponse: ... # pragma: no cover 368 369 @overload 370 async def request( 371 self, msg: p4r.SetForwardingPipelineConfigRequest 372 ) -> p4r.SetForwardingPipelineConfigResponse: ... # pragma: no cover 373 374 @overload 375 async def request( 376 self, msg: p4r.CapabilitiesRequest 377 ) -> p4r.CapabilitiesResponse: ... # pragma: no cover 378 379 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 380 "Send a unary-unary P4Runtime request and wait for the response." 381 assert self._stub is not None 382 383 if self._complete_request: 384 self._complete_request(msg) 385 386 msg_type = type(msg).__name__ 387 assert msg_type.endswith("Request") 388 rpc_method = getattr(self._stub, msg_type[:-7]) 389 390 self._log_msg(msg) 391 try: 392 reply = await rpc_method( 393 msg, 394 timeout=_DEFAULT_RPC_TIMEOUT, 395 ) 396 except grpc.RpcError as ex: 397 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 398 399 self._log_msg(reply) 400 return reply 401 402 async def request_iter( 403 self, msg: p4r.ReadRequest 404 ) -> AsyncIterator[p4r.ReadResponse]: 405 "Send a unary-stream P4Runtime read request and wait for the responses." 406 assert self._stub is not None 407 408 if self._complete_request: 409 self._complete_request(msg) 410 411 msg_type = type(msg).__name__ 412 assert msg_type.endswith("Request") 413 rpc_method = getattr(self._stub, msg_type[:-7]) 414 415 self._log_msg(msg) 416 try: 417 async for reply in rpc_method( 418 msg, 419 timeout=_DEFAULT_RPC_TIMEOUT, 420 ): 421 self._log_msg(reply) 422 yield reply 423 except grpc.RpcError as ex: 424 raise P4ClientError(ex, msg_type) from None 425 426 def _log_msg(self, msg: pbutil.PBMessage) -> None: 427 "Log a P4Runtime request or response." 428 pbutil.log_msg(self._channel, msg, self._schema)
Implements a P4Runtime client.
266 @property 267 def channel(self) -> grpc.aio.Channel | None: 268 "Return the GRPC channel object, or None if the channel is not open." 269 return self._channel
Return the GRPC channel object, or None if the channel is not open.
278 async def open( 279 self, 280 *, 281 schema: P4Schema | None = None, 282 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 283 ) -> None: 284 """Open the client channel. 285 286 Note: This method is `async` for forward-compatible reasons. 287 """ 288 assert self._channel is None 289 assert self._stub is None 290 291 # Increase max_metadata_size from 8 KB to 64 KB. 292 options = GRPCOptions( 293 max_metadata_size=64 * 1024, # 64 kilobytes 294 max_reconnect_backoff_ms=15000, # 15.0 seconds 295 ) 296 297 self._channel = grpc_channel( 298 self._address, 299 credentials=self._credentials, 300 options=options, 301 client_type="P4Client", 302 ) 303 304 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 305 self._schema = schema 306 self._complete_request = complete_request
Open the client channel.
Note: This method is async for forward-compatible reasons.
308 async def close(self) -> None: 309 "Close the client channel." 310 if self._channel is not None: 311 LOGGER.debug("P4Client: close channel %r", self._address) 312 313 if self._stream is not None: 314 self._stream.cancel() 315 self._stream = None 316 317 await self._channel.close() 318 self._channel = None 319 self._stub = None 320 self._schema = None 321 self._complete_request = None
Close the client channel.
323 async def send(self, msg: p4r.StreamMessageRequest) -> None: 324 """Send a message to the stream.""" 325 assert self._stub is not None 326 327 if not self._stream or self._stream.done(): 328 self._stream = cast( 329 _P4StreamTypeAlias, 330 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 331 ) 332 333 self._log_msg(msg) 334 335 try: 336 await self._stream.write(msg) 337 except grpc.RpcError as ex: 338 raise P4ClientError(ex, "send") from None
Send a message to the stream.
340 async def receive(self) -> p4r.StreamMessageResponse: 341 """Read a message from the stream.""" 342 assert self._stream is not None 343 344 try: 345 msg = cast( 346 p4r.StreamMessageResponse, 347 await self._stream.read(), 348 ) 349 if msg is GRPC_EOF: 350 # Treat EOF as a protocol violation. 351 raise RuntimeError("P4Client.receive got EOF!") 352 353 except grpc.RpcError as ex: 354 raise P4ClientError(ex, "receive") from None 355 356 self._log_msg(msg) 357 return msg
Read a message from the stream.
379 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 380 "Send a unary-unary P4Runtime request and wait for the response." 381 assert self._stub is not None 382 383 if self._complete_request: 384 self._complete_request(msg) 385 386 msg_type = type(msg).__name__ 387 assert msg_type.endswith("Request") 388 rpc_method = getattr(self._stub, msg_type[:-7]) 389 390 self._log_msg(msg) 391 try: 392 reply = await rpc_method( 393 msg, 394 timeout=_DEFAULT_RPC_TIMEOUT, 395 ) 396 except grpc.RpcError as ex: 397 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 398 399 self._log_msg(reply) 400 return reply
Send a unary-unary P4Runtime request and wait for the response.
402 async def request_iter( 403 self, msg: p4r.ReadRequest 404 ) -> AsyncIterator[p4r.ReadResponse]: 405 "Send a unary-stream P4Runtime read request and wait for the responses." 406 assert self._stub is not None 407 408 if self._complete_request: 409 self._complete_request(msg) 410 411 msg_type = type(msg).__name__ 412 assert msg_type.endswith("Request") 413 rpc_method = getattr(self._stub, msg_type[:-7]) 414 415 self._log_msg(msg) 416 try: 417 async for reply in rpc_method( 418 msg, 419 timeout=_DEFAULT_RPC_TIMEOUT, 420 ): 421 self._log_msg(reply) 422 yield reply 423 except grpc.RpcError as ex: 424 raise P4ClientError(ex, msg_type) from None
Send a unary-stream P4Runtime read request and wait for the responses.
127class P4ClientError(Exception): 128 "Wrap `grpc.RpcError`." 129 130 _operation: str 131 _status: P4RpcStatus 132 _outer_code: GRPCStatusCode 133 _outer_message: str 134 _schema: P4Schema | None = None # for annotating sub-value details 135 136 def __init__( 137 self, 138 error: grpc.RpcError, 139 operation: str, 140 *, 141 msg: pbutil.PBMessage | None = None, 142 schema: P4Schema | None = None, 143 ): 144 super().__init__() 145 assert isinstance(error, grpc.aio.AioRpcError) 146 147 self._operation = operation 148 self._status = P4RpcStatus.from_rpc_error(error) 149 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 150 self._outer_message = error.details() or "" 151 152 if msg is not None and self.details: 153 self._attach_details(msg) 154 self._schema = schema 155 156 LOGGER.debug("%s failed: %s", operation, self) 157 158 @property 159 def code(self) -> GRPCStatusCode: 160 "GRPC status code." 161 return self._status.code 162 163 @property 164 def message(self) -> str: 165 "GRPC status message." 166 return self._status.message 167 168 @property 169 def details(self) -> dict[int, P4Error]: 170 "Optional details about P4Runtime Write updates that failed." 171 return self._status.details 172 173 @property 174 def is_not_found_only(self) -> bool: 175 """Return True if the only sub-errors are NOT_FOUND.""" 176 if self.code != GRPCStatusCode.UNKNOWN: 177 return False 178 179 for err in self.details.values(): 180 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 181 return False 182 return True 183 184 @property 185 def is_election_id_used(self) -> bool: 186 """Return true if error is that election ID is in use.""" 187 return ( 188 self.code == GRPCStatusCode.INVALID_ARGUMENT 189 and _ELECTION_ID_EXISTS.search(self.message) is not None 190 ) 191 192 @property 193 def is_pipeline_missing(self) -> bool: 194 "Return true if error is that no pipeline config is set." 195 return ( 196 self.code == GRPCStatusCode.FAILED_PRECONDITION 197 and _NO_PIPELINE_CONFIG.search(self.message) is not None 198 ) 199 200 def _attach_details(self, msg: pbutil.PBMessage): 201 "Attach the subvalue(s) from the message that caused the error." 202 if isinstance(msg, p4r.WriteRequest): 203 for key, value in self.details.items(): 204 value.subvalue = msg.updates[key] 205 206 def __str__(self) -> str: 207 "Return string representation of P4ClientError object." 208 if self.details: 209 210 def _show(value: P4Error): 211 s = repr(value) 212 if self._schema: 213 s = pbutil.log_annotate(s, self._schema) 214 s = s.replace("\n}\n)", "\n})") # tidy multiline repr 215 return s.replace("\n", "\n" + " " * 6) # indent 6 spaces 216 217 items = [""] + [ 218 f" [details.{key}] {_show(val)}" for key, val in self.details.items() 219 ] 220 details = "\n".join(items) 221 else: 222 details = "" 223 224 if self.code == self._outer_code and self.message == self._outer_message: 225 return ( 226 f"operation={self._operation} code={self.code!r} " 227 f"message={self.message!r} {details}" 228 ) 229 return ( 230 f"code={self.code!r} message={self.message!r} " 231 f"details={self.details!r} operation={self._operation} " 232 f"_outer_message={self._outer_message!r} _outer_code={self._outer_code!r}" 233 )
Wrap grpc.RpcError.
136 def __init__( 137 self, 138 error: grpc.RpcError, 139 operation: str, 140 *, 141 msg: pbutil.PBMessage | None = None, 142 schema: P4Schema | None = None, 143 ): 144 super().__init__() 145 assert isinstance(error, grpc.aio.AioRpcError) 146 147 self._operation = operation 148 self._status = P4RpcStatus.from_rpc_error(error) 149 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 150 self._outer_message = error.details() or "" 151 152 if msg is not None and self.details: 153 self._attach_details(msg) 154 self._schema = schema 155 156 LOGGER.debug("%s failed: %s", operation, self)
158 @property 159 def code(self) -> GRPCStatusCode: 160 "GRPC status code." 161 return self._status.code
GRPC status code.
163 @property 164 def message(self) -> str: 165 "GRPC status message." 166 return self._status.message
GRPC status message.
168 @property 169 def details(self) -> dict[int, P4Error]: 170 "Optional details about P4Runtime Write updates that failed." 171 return self._status.details
Optional details about P4Runtime Write updates that failed.
173 @property 174 def is_not_found_only(self) -> bool: 175 """Return True if the only sub-errors are NOT_FOUND.""" 176 if self.code != GRPCStatusCode.UNKNOWN: 177 return False 178 179 for err in self.details.values(): 180 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 181 return False 182 return True
Return True if the only sub-errors are NOT_FOUND.
184 @property 185 def is_election_id_used(self) -> bool: 186 """Return true if error is that election ID is in use.""" 187 return ( 188 self.code == GRPCStatusCode.INVALID_ARGUMENT 189 and _ELECTION_ID_EXISTS.search(self.message) is not None 190 )
Return true if error is that election ID is in use.
192 @property 193 def is_pipeline_missing(self) -> bool: 194 "Return true if error is that no pipeline config is set." 195 return ( 196 self.code == GRPCStatusCode.FAILED_PRECONDITION 197 and _NO_PIPELINE_CONFIG.search(self.message) is not None 198 )
Return true if error is that no pipeline config is set.
1319@decodable("clone_session_entry") 1320@dataclass(slots=True) 1321class P4CloneSessionEntry(_P4Writable): 1322 "Represents a P4Runtime CloneSessionEntry." 1323 1324 session_id: int = 0 1325 _: KW_ONLY 1326 class_of_service: int = 0 1327 packet_length_bytes: int = 0 1328 replicas: Sequence[_ReplicaType] = () 1329 1330 def encode(self, schema: P4Schema) -> p4r.Entity: 1331 "Encode CloneSessionEntry data as protobuf." 1332 entry = p4r.CloneSessionEntry( 1333 session_id=self.session_id, 1334 class_of_service=self.class_of_service, 1335 packet_length_bytes=self.packet_length_bytes, 1336 replicas=[encode_replica(replica) for replica in self.replicas], 1337 ) 1338 return p4r.Entity( 1339 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1340 clone_session_entry=entry 1341 ) 1342 ) 1343 1344 @classmethod 1345 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1346 "Decode protobuf to CloneSessionEntry data." 1347 entry = msg.packet_replication_engine_entry.clone_session_entry 1348 return cls( 1349 session_id=entry.session_id, 1350 class_of_service=entry.class_of_service, 1351 packet_length_bytes=entry.packet_length_bytes, 1352 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1353 ) 1354 1355 def replicas_str(self) -> str: 1356 "Format the replicas as a human-readable, canonical string." 1357 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime CloneSessionEntry.
1330 def encode(self, schema: P4Schema) -> p4r.Entity: 1331 "Encode CloneSessionEntry data as protobuf." 1332 entry = p4r.CloneSessionEntry( 1333 session_id=self.session_id, 1334 class_of_service=self.class_of_service, 1335 packet_length_bytes=self.packet_length_bytes, 1336 replicas=[encode_replica(replica) for replica in self.replicas], 1337 ) 1338 return p4r.Entity( 1339 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1340 clone_session_entry=entry 1341 ) 1342 )
Encode CloneSessionEntry data as protobuf.
1344 @classmethod 1345 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1346 "Decode protobuf to CloneSessionEntry data." 1347 entry = msg.packet_replication_engine_entry.clone_session_entry 1348 return cls( 1349 session_id=entry.session_id, 1350 class_of_service=entry.class_of_service, 1351 packet_length_bytes=entry.packet_length_bytes, 1352 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1353 )
Decode protobuf to CloneSessionEntry data.
878@dataclass(kw_only=True, slots=True) 879class P4CounterData: 880 """Represents a P4Runtime object that keeps statistics of bytes and packets. 881 882 Attributes: 883 byte_count (int): the number of octets 884 packet_count (int): the number of packets 885 886 See Also: 887 - P4TableEntry 888 - P4MeterCounterData 889 - P4CounterEntry 890 - P4DirectCounterEntry 891 """ 892 893 byte_count: int = 0 894 "The number of octets." 895 packet_count: int = 0 896 "The number of packets." 897 898 def encode(self) -> p4r.CounterData: 899 "Encode object as CounterData." 900 return p4r.CounterData( 901 byte_count=self.byte_count, packet_count=self.packet_count 902 ) 903 904 @classmethod 905 def decode(cls, msg: p4r.CounterData) -> Self: 906 "Decode CounterData." 907 return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)
Represents a P4Runtime object that keeps statistics of bytes and packets.
Attributes:
- byte_count (int): the number of octets
- packet_count (int): the number of packets
See Also:
- P4TableEntry
- P4MeterCounterData
- P4CounterEntry
- P4DirectCounterEntry
1705@decodable("counter_entry") 1706@dataclass(slots=True) 1707class P4CounterEntry(_P4ModifyOnly): 1708 "Represents a P4Runtime CounterEntry." 1709 1710 counter_id: str = "" 1711 _: KW_ONLY 1712 index: int | None = None 1713 data: P4CounterData | None = None 1714 1715 @property 1716 def packet_count(self) -> int: 1717 "Packet count from counter data (or 0 if there is no data)." 1718 if self.data is not None: 1719 return self.data.packet_count 1720 return 0 1721 1722 @property 1723 def byte_count(self) -> int: 1724 "Byte count from counter data (or 0 if there is no data)." 1725 if self.data is not None: 1726 return self.data.byte_count 1727 return 0 1728 1729 def encode(self, schema: P4Schema) -> p4r.Entity: 1730 "Encode P4CounterEntry as protobuf." 1731 if not self.counter_id: 1732 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1733 1734 counter = schema.counters[self.counter_id] 1735 1736 if self.index is not None: 1737 index = p4r.Index(index=self.index) 1738 else: 1739 index = None 1740 1741 if self.data is not None: 1742 data = self.data.encode() 1743 else: 1744 data = None 1745 1746 entry = p4r.CounterEntry( 1747 counter_id=counter.id, 1748 index=index, 1749 data=data, 1750 ) 1751 return p4r.Entity(counter_entry=entry) 1752 1753 @classmethod 1754 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1755 "Decode protobuf to P4CounterEntry." 1756 entry = msg.counter_entry 1757 if not entry.counter_id: 1758 return cls() 1759 1760 counter = schema.counters[entry.counter_id] 1761 1762 if entry.HasField("index"): 1763 index = entry.index.index 1764 else: 1765 index = None 1766 1767 if entry.HasField("data"): 1768 data = P4CounterData.decode(entry.data) 1769 else: 1770 data = None 1771 1772 return cls(counter_id=counter.alias, index=index, data=data)
Represents a P4Runtime CounterEntry.
1715 @property 1716 def packet_count(self) -> int: 1717 "Packet count from counter data (or 0 if there is no data)." 1718 if self.data is not None: 1719 return self.data.packet_count 1720 return 0
Packet count from counter data (or 0 if there is no data).
1722 @property 1723 def byte_count(self) -> int: 1724 "Byte count from counter data (or 0 if there is no data)." 1725 if self.data is not None: 1726 return self.data.byte_count 1727 return 0
Byte count from counter data (or 0 if there is no data).
1729 def encode(self, schema: P4Schema) -> p4r.Entity: 1730 "Encode P4CounterEntry as protobuf." 1731 if not self.counter_id: 1732 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1733 1734 counter = schema.counters[self.counter_id] 1735 1736 if self.index is not None: 1737 index = p4r.Index(index=self.index) 1738 else: 1739 index = None 1740 1741 if self.data is not None: 1742 data = self.data.encode() 1743 else: 1744 data = None 1745 1746 entry = p4r.CounterEntry( 1747 counter_id=counter.id, 1748 index=index, 1749 data=data, 1750 ) 1751 return p4r.Entity(counter_entry=entry)
Encode P4CounterEntry as protobuf.
1753 @classmethod 1754 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1755 "Decode protobuf to P4CounterEntry." 1756 entry = msg.counter_entry 1757 if not entry.counter_id: 1758 return cls() 1759 1760 counter = schema.counters[entry.counter_id] 1761 1762 if entry.HasField("index"): 1763 index = entry.index.index 1764 else: 1765 index = None 1766 1767 if entry.HasField("data"): 1768 data = P4CounterData.decode(entry.data) 1769 else: 1770 data = None 1771 1772 return cls(counter_id=counter.alias, index=index, data=data)
Decode protobuf to P4CounterEntry.
82class P4CounterUnit(_EnumBase): 83 "IntEnum equivalent to `p4i.CounterSpec.Unit`." 84 85 UNSPECIFIED = p4i.CounterSpec.Unit.UNSPECIFIED 86 BYTES = p4i.CounterSpec.Unit.BYTES 87 PACKETS = p4i.CounterSpec.Unit.PACKETS 88 BOTH = p4i.CounterSpec.Unit.BOTH
IntEnum equivalent to p4i.CounterSpec.Unit.
1360@decodable("digest_entry") 1361@dataclass(slots=True) 1362class P4DigestEntry(_P4Writable): 1363 "Represents a P4Runtime DigestEntry." 1364 1365 digest_id: str = "" 1366 _: KW_ONLY 1367 max_list_size: int = 0 1368 max_timeout_ns: int = 0 1369 ack_timeout_ns: int = 0 1370 1371 def encode(self, schema: P4Schema) -> p4r.Entity: 1372 "Encode DigestEntry data as protobuf." 1373 if not self.digest_id: 1374 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1375 1376 digest = schema.digests[self.digest_id] 1377 1378 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1379 config = None 1380 else: 1381 config = p4r.DigestEntry.Config( 1382 max_timeout_ns=self.max_timeout_ns, 1383 max_list_size=self.max_list_size, 1384 ack_timeout_ns=self.ack_timeout_ns, 1385 ) 1386 1387 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1388 return p4r.Entity(digest_entry=entry) 1389 1390 @classmethod 1391 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1392 "Decode protobuf to DigestEntry data." 1393 entry = msg.digest_entry 1394 if entry.digest_id == 0: 1395 return cls() 1396 1397 digest = schema.digests[entry.digest_id] 1398 1399 config = entry.config 1400 return cls( 1401 digest.alias, 1402 max_list_size=config.max_list_size, 1403 max_timeout_ns=config.max_timeout_ns, 1404 ack_timeout_ns=config.ack_timeout_ns, 1405 )
Represents a P4Runtime DigestEntry.
1371 def encode(self, schema: P4Schema) -> p4r.Entity: 1372 "Encode DigestEntry data as protobuf." 1373 if not self.digest_id: 1374 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1375 1376 digest = schema.digests[self.digest_id] 1377 1378 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1379 config = None 1380 else: 1381 config = p4r.DigestEntry.Config( 1382 max_timeout_ns=self.max_timeout_ns, 1383 max_list_size=self.max_list_size, 1384 ack_timeout_ns=self.ack_timeout_ns, 1385 ) 1386 1387 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1388 return p4r.Entity(digest_entry=entry)
Encode DigestEntry data as protobuf.
1390 @classmethod 1391 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1392 "Decode protobuf to DigestEntry data." 1393 entry = msg.digest_entry 1394 if entry.digest_id == 0: 1395 return cls() 1396 1397 digest = schema.digests[entry.digest_id] 1398 1399 config = entry.config 1400 return cls( 1401 digest.alias, 1402 max_list_size=config.max_list_size, 1403 max_timeout_ns=config.max_timeout_ns, 1404 ack_timeout_ns=config.ack_timeout_ns, 1405 )
Decode protobuf to DigestEntry data.
2019@decodable("digest") 2020@dataclass(slots=True) 2021class P4DigestList: 2022 "Represents a P4Runtime DigestList." 2023 2024 digest_id: str 2025 _: KW_ONLY 2026 list_id: int 2027 timestamp: int 2028 data: list[_DataValueType] 2029 2030 @classmethod 2031 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 2032 "Decode protobuf to DigestList data." 2033 digest_list = msg.digest 2034 digest = schema.digests[digest_list.digest_id] 2035 2036 type_spec = digest.type_spec 2037 return cls( 2038 digest_id=digest.alias, 2039 list_id=digest_list.list_id, 2040 timestamp=digest_list.timestamp, 2041 data=[type_spec.decode_data(item) for item in digest_list.data], 2042 ) 2043 2044 def __len__(self) -> int: 2045 "Return number of values in digest list." 2046 return len(self.data) 2047 2048 def __getitem__(self, key: int) -> _DataValueType: 2049 "Retrieve value at given index from digest list." 2050 return self.data[key] 2051 2052 def __iter__(self) -> Iterator[_DataValueType]: 2053 "Iterate over values in digest list." 2054 return iter(self.data) 2055 2056 def ack(self) -> "P4DigestListAck": 2057 "Return the corresponding DigestListAck message." 2058 return P4DigestListAck(self.digest_id, self.list_id)
Represents a P4Runtime DigestList.
2030 @classmethod 2031 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 2032 "Decode protobuf to DigestList data." 2033 digest_list = msg.digest 2034 digest = schema.digests[digest_list.digest_id] 2035 2036 type_spec = digest.type_spec 2037 return cls( 2038 digest_id=digest.alias, 2039 list_id=digest_list.list_id, 2040 timestamp=digest_list.timestamp, 2041 data=[type_spec.decode_data(item) for item in digest_list.data], 2042 )
Decode protobuf to DigestList data.
2044 def __len__(self) -> int: 2045 "Return number of values in digest list." 2046 return len(self.data)
Return number of values in digest list.
2048 def __getitem__(self, key: int) -> _DataValueType: 2049 "Retrieve value at given index from digest list." 2050 return self.data[key]
Retrieve value at given index from digest list.
2052 def __iter__(self) -> Iterator[_DataValueType]: 2053 "Iterate over values in digest list." 2054 return iter(self.data)
Iterate over values in digest list.
2061@dataclass(slots=True) 2062class P4DigestListAck: 2063 "Represents a P4Runtime DigestListAck." 2064 2065 digest_id: str 2066 list_id: int 2067 2068 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2069 "Encode DigestListAck data as protobuf." 2070 digest = schema.digests[self.digest_id] 2071 2072 return p4r.StreamMessageRequest( 2073 digest_ack=p4r.DigestListAck( 2074 digest_id=digest.id, 2075 list_id=self.list_id, 2076 ) 2077 )
Represents a P4Runtime DigestListAck.
2068 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2069 "Encode DigestListAck data as protobuf." 2070 digest = schema.digests[self.digest_id] 2071 2072 return p4r.StreamMessageRequest( 2073 digest_ack=p4r.DigestListAck( 2074 digest_id=digest.id, 2075 list_id=self.list_id, 2076 ) 2077 )
Encode DigestListAck data as protobuf.
1775@decodable("direct_counter_entry") 1776@dataclass(slots=True) 1777class P4DirectCounterEntry(_P4ModifyOnly): 1778 "Represents a P4Runtime DirectCounterEntry." 1779 1780 counter_id: str = "" 1781 _: KW_ONLY 1782 table_entry: P4TableEntry | None = None 1783 data: P4CounterData | None = None 1784 1785 @property 1786 def table_id(self) -> str: 1787 "Return table_id of related table." 1788 if self.table_entry is None: 1789 return "" 1790 return self.table_entry.table_id 1791 1792 @property 1793 def packet_count(self) -> int: 1794 "Packet count from counter data (or 0 if there is no data)." 1795 if self.data is not None: 1796 return self.data.packet_count 1797 return 0 1798 1799 @property 1800 def byte_count(self) -> int: 1801 "Byte count from counter data (or 0 if there is no data)." 1802 if self.data is not None: 1803 return self.data.byte_count 1804 return 0 1805 1806 def encode(self, schema: P4Schema) -> p4r.Entity: 1807 "Encode P4DirectCounterEntry as protobuf." 1808 if self.table_entry is None: 1809 # Use `counter_id` to construct a `P4TableEntry` with the proper 1810 # table name. 1811 if self.counter_id: 1812 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1813 table_entry = P4TableEntry(tb_name) 1814 else: 1815 table_entry = P4TableEntry() 1816 else: 1817 table_entry = self.table_entry 1818 1819 if self.data is not None: 1820 data = self.data.encode() 1821 else: 1822 data = None 1823 1824 entry = p4r.DirectCounterEntry( 1825 table_entry=table_entry.encode_entry(schema), 1826 data=data, 1827 ) 1828 return p4r.Entity(direct_counter_entry=entry) 1829 1830 @classmethod 1831 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1832 "Decode protobuf to P4DirectCounterEntry." 1833 entry = msg.direct_counter_entry 1834 1835 if entry.HasField("table_entry"): 1836 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1837 else: 1838 table_entry = None 1839 1840 if entry.HasField("data"): 1841 data = P4CounterData.decode(entry.data) 1842 else: 1843 data = None 1844 1845 # Determine `counter_id` from table_entry. 1846 counter_id = "" 1847 if table_entry is not None and table_entry.table_id: 1848 direct_counter = schema.tables[table_entry.table_id].direct_counter 1849 assert direct_counter is not None 1850 counter_id = direct_counter.alias 1851 1852 return cls(counter_id, table_entry=table_entry, data=data)
Represents a P4Runtime DirectCounterEntry.
1785 @property 1786 def table_id(self) -> str: 1787 "Return table_id of related table." 1788 if self.table_entry is None: 1789 return "" 1790 return self.table_entry.table_id
Return table_id of related table.
1792 @property 1793 def packet_count(self) -> int: 1794 "Packet count from counter data (or 0 if there is no data)." 1795 if self.data is not None: 1796 return self.data.packet_count 1797 return 0
Packet count from counter data (or 0 if there is no data).
1799 @property 1800 def byte_count(self) -> int: 1801 "Byte count from counter data (or 0 if there is no data)." 1802 if self.data is not None: 1803 return self.data.byte_count 1804 return 0
Byte count from counter data (or 0 if there is no data).
1806 def encode(self, schema: P4Schema) -> p4r.Entity: 1807 "Encode P4DirectCounterEntry as protobuf." 1808 if self.table_entry is None: 1809 # Use `counter_id` to construct a `P4TableEntry` with the proper 1810 # table name. 1811 if self.counter_id: 1812 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1813 table_entry = P4TableEntry(tb_name) 1814 else: 1815 table_entry = P4TableEntry() 1816 else: 1817 table_entry = self.table_entry 1818 1819 if self.data is not None: 1820 data = self.data.encode() 1821 else: 1822 data = None 1823 1824 entry = p4r.DirectCounterEntry( 1825 table_entry=table_entry.encode_entry(schema), 1826 data=data, 1827 ) 1828 return p4r.Entity(direct_counter_entry=entry)
Encode P4DirectCounterEntry as protobuf.
1830 @classmethod 1831 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1832 "Decode protobuf to P4DirectCounterEntry." 1833 entry = msg.direct_counter_entry 1834 1835 if entry.HasField("table_entry"): 1836 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1837 else: 1838 table_entry = None 1839 1840 if entry.HasField("data"): 1841 data = P4CounterData.decode(entry.data) 1842 else: 1843 data = None 1844 1845 # Determine `counter_id` from table_entry. 1846 counter_id = "" 1847 if table_entry is not None and table_entry.table_id: 1848 direct_counter = schema.tables[table_entry.table_id].direct_counter 1849 assert direct_counter is not None 1850 counter_id = direct_counter.alias 1851 1852 return cls(counter_id, table_entry=table_entry, data=data)
Decode protobuf to P4DirectCounterEntry.
1645@decodable("direct_meter_entry") 1646@dataclass(kw_only=True, slots=True) 1647class P4DirectMeterEntry(_P4ModifyOnly): 1648 "Represents a P4Runtime DirectMeterEntry." 1649 1650 table_entry: P4TableEntry | None = None 1651 config: P4MeterConfig | None = None 1652 counter_data: P4MeterCounterData | None = None 1653 1654 def encode(self, schema: P4Schema) -> p4r.Entity: 1655 "Encode P4DirectMeterEntry as protobuf." 1656 if self.table_entry is not None: 1657 table_entry = self.table_entry.encode_entry(schema) 1658 else: 1659 table_entry = None 1660 1661 if self.config is not None: 1662 config = self.config.encode() 1663 else: 1664 config = None 1665 1666 if self.counter_data is not None: 1667 counter_data = self.counter_data.encode() 1668 else: 1669 counter_data = None 1670 1671 entry = p4r.DirectMeterEntry( 1672 table_entry=table_entry, 1673 config=config, 1674 counter_data=counter_data, 1675 ) 1676 return p4r.Entity(direct_meter_entry=entry) 1677 1678 @classmethod 1679 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1680 "Decode protobuf to P4DirectMeterEntry." 1681 entry = msg.direct_meter_entry 1682 1683 if entry.HasField("table_entry"): 1684 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1685 else: 1686 table_entry = None 1687 1688 if entry.HasField("config"): 1689 config = P4MeterConfig.decode(entry.config) 1690 else: 1691 config = None 1692 1693 if entry.HasField("counter_data"): 1694 counter_data = P4MeterCounterData.decode(entry.counter_data) 1695 else: 1696 counter_data = None 1697 1698 return cls( 1699 table_entry=table_entry, 1700 config=config, 1701 counter_data=counter_data, 1702 )
Represents a P4Runtime DirectMeterEntry.
1654 def encode(self, schema: P4Schema) -> p4r.Entity: 1655 "Encode P4DirectMeterEntry as protobuf." 1656 if self.table_entry is not None: 1657 table_entry = self.table_entry.encode_entry(schema) 1658 else: 1659 table_entry = None 1660 1661 if self.config is not None: 1662 config = self.config.encode() 1663 else: 1664 config = None 1665 1666 if self.counter_data is not None: 1667 counter_data = self.counter_data.encode() 1668 else: 1669 counter_data = None 1670 1671 entry = p4r.DirectMeterEntry( 1672 table_entry=table_entry, 1673 config=config, 1674 counter_data=counter_data, 1675 ) 1676 return p4r.Entity(direct_meter_entry=entry)
Encode P4DirectMeterEntry as protobuf.
1678 @classmethod 1679 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1680 "Decode protobuf to P4DirectMeterEntry." 1681 entry = msg.direct_meter_entry 1682 1683 if entry.HasField("table_entry"): 1684 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1685 else: 1686 table_entry = None 1687 1688 if entry.HasField("config"): 1689 config = P4MeterConfig.decode(entry.config) 1690 else: 1691 config = None 1692 1693 if entry.HasField("counter_data"): 1694 counter_data = P4MeterCounterData.decode(entry.counter_data) 1695 else: 1696 counter_data = None 1697 1698 return cls( 1699 table_entry=table_entry, 1700 config=config, 1701 counter_data=counter_data, 1702 )
Decode protobuf to P4DirectMeterEntry.
58@dataclass 59class P4Error: 60 "P4Runtime Error message used to report a single P4-entity error." 61 62 canonical_code: GRPCStatusCode 63 message: str 64 space: str 65 code: int 66 subvalue: pbutil.PBMessage | None = None
P4Runtime Error message used to report a single P4-entity error.
2112@decodable("extern_entry") 2113@dataclass(kw_only=True, slots=True) 2114class P4ExternEntry(_P4Writable): 2115 "Represents a P4Runtime ExternEntry." 2116 2117 extern_type_id: str 2118 extern_id: str 2119 entry: pbutil.PBAny 2120 2121 def encode(self, schema: P4Schema) -> p4r.Entity: 2122 "Encode ExternEntry data as protobuf." 2123 extern = schema.externs[self.extern_type_id, self.extern_id] 2124 entry = p4r.ExternEntry( 2125 extern_type_id=extern.extern_type_id, 2126 extern_id=extern.id, 2127 entry=self.entry, 2128 ) 2129 return p4r.Entity(extern_entry=entry) 2130 2131 @classmethod 2132 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2133 "Decode protobuf to ExternEntry data." 2134 entry = msg.extern_entry 2135 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2136 return cls( 2137 extern_type_id=extern.extern_type_name, 2138 extern_id=extern.name, 2139 entry=entry.entry, 2140 )
Represents a P4Runtime ExternEntry.
2121 def encode(self, schema: P4Schema) -> p4r.Entity: 2122 "Encode ExternEntry data as protobuf." 2123 extern = schema.externs[self.extern_type_id, self.extern_id] 2124 entry = p4r.ExternEntry( 2125 extern_type_id=extern.extern_type_id, 2126 extern_id=extern.id, 2127 entry=self.entry, 2128 ) 2129 return p4r.Entity(extern_entry=entry)
Encode ExternEntry data as protobuf.
2131 @classmethod 2132 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2133 "Decode protobuf to ExternEntry data." 2134 entry = msg.extern_entry 2135 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2136 return cls( 2137 extern_type_id=extern.extern_type_name, 2138 extern_id=extern.name, 2139 entry=entry.entry, 2140 )
Decode protobuf to ExternEntry data.
IndirectAction is an alias for P4IndirectAction.
620@dataclass(slots=True) 621class P4IndirectAction: 622 """Represents a P4Runtime Action reference for an indirect table. 623 624 An indirect action can be either: 625 626 1. a "one-shot" action (action_set) 627 2. a reference to an action profile member (member_id) 628 3. a reference to an action profile group (group_id) 629 630 Only one of action_set, member_id or group_id may be configured. The other 631 values must be None. 632 633 Attributes: 634 action_set: sequence of weighted actions for one-shot 635 member_id: ID of action profile member 636 group_id: ID of action profile group 637 638 Examples: 639 640 ```python 641 # Construct a one-shot action profile. 642 one_shot = P4IndirectAction( 643 2 * P4TableAction("forward", port=1), 644 1 * P4TableAction("forward", port=2), 645 ) 646 647 # Refer to an action profile member by ID. 648 member_action = P4IndirectAction(member_id=1) 649 650 # Refer to an action profile group by ID. 651 group_action = P4IndirectAction(group_id=2) 652 ``` 653 654 References: 655 - "9.1.2. Action Specification", 656 - "9.2.3. One Shot Action Selector Programming" 657 658 See Also: 659 - P4TableEntry 660 """ 661 662 action_set: Sequence[P4WeightedAction] | None = None 663 "Sequence of weighted actions defining one-shot action profile." 664 _: KW_ONLY 665 member_id: int | None = None 666 "ID of action profile member." 667 group_id: int | None = None 668 "ID of action profile group." 669 selection_mode: P4ActionSelectionMode = ( 670 P4ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 671 ) 672 "Action selection mode for one-shot action profile (1.5.0)." 673 size_semantics: P4ActionSizeSemantics = ( 674 P4ActionSizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 675 ) 676 "Action size semantics for one-shot action profile (1.5.0)." 677 678 def __post_init__(self) -> None: 679 if not self._check_invariant(): 680 raise ValueError( 681 "exactly one of action_set, member_id, or group_id must be set" 682 ) 683 684 def _check_invariant(self) -> bool: 685 "Return true if instance satisfies class invariant." 686 if self.action_set is not None: 687 return self.member_id is None and self.group_id is None 688 if self.member_id is not None: 689 return self.group_id is None 690 return self.group_id is not None 691 692 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 693 "Encode object as a TableAction." 694 if self.action_set is not None: 695 return p4r.TableAction( 696 action_profile_action_set=self.encode_action_set(table) 697 ) 698 699 if self.member_id is not None: 700 return p4r.TableAction(action_profile_member_id=self.member_id) 701 702 assert self.group_id is not None 703 return p4r.TableAction(action_profile_group_id=self.group_id) 704 705 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 706 "Encode object as an ActionProfileActionSet." 707 assert self.action_set is not None 708 709 profile_actions: list[p4r.ActionProfileAction] = [] 710 for weight, table_action in self.action_set: 711 action = table_action.encode_action(table) 712 713 match weight: 714 case int(weight_value): 715 watch_port = None 716 case (weight_value, int(watch)): 717 watch_port = encode_watch_port(watch) 718 case _: 719 raise ValueError(f"unexpected action weight: {weight!r}") 720 721 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 722 if watch_port is not None: 723 profile.watch_port = watch_port 724 profile_actions.append(profile) 725 726 return p4r.ActionProfileActionSet( 727 action_profile_actions=profile_actions, 728 action_selection_mode=self.selection_mode.vt(), 729 size_semantics=self.size_semantics.vt(), 730 ) 731 732 @classmethod 733 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 734 "Decode ActionProfileActionSet." 735 action_set = list[P4WeightedAction]() 736 737 for action in msg.action_profile_actions: 738 match action.WhichOneof("watch_kind"): 739 case "watch_port": 740 weight = (action.weight, decode_watch_port(action.watch_port)) 741 case None: 742 weight = action.weight 743 case other: 744 # "watch" (deprecated) is not supported 745 raise ValueError(f"unexpected oneof: {other!r}") 746 747 table_action = P4TableAction.decode_action(action.action, table) 748 action_set.append((weight, table_action)) 749 750 return cls( 751 action_set, 752 selection_mode=P4ActionSelectionMode(msg.action_selection_mode), 753 size_semantics=P4ActionSizeSemantics(msg.size_semantics), 754 ) 755 756 def format_str(self, table: P4Table) -> str: 757 """Format the indirect table action as a human-readable string.""" 758 if self.action_set is not None: 759 weighted_actions = [ 760 f"{weight}*{action.format_str(table)}" 761 for weight, action in self.action_set 762 ] 763 props = self._format_properties() 764 if props: 765 props = f" <{props}>" 766 return " ".join(weighted_actions) + props 767 768 # Use the name of the action_profile, if we can get it. If not, just 769 # use the placeholder "__indirect". 770 if table.action_profile is not None: 771 profile_name = f"@{table.action_profile.alias}" 772 else: 773 profile_name = "__indirect" 774 775 if self.member_id is not None: 776 return f"{profile_name}[[{self.member_id:#x}]]" 777 778 return f"{profile_name}[{self.group_id:#x}]" 779 780 def __repr__(self) -> str: 781 "Customize representation to make it more concise." 782 if self.action_set is not None: 783 props = self._repr_properties() 784 if props: 785 return f"P4IndirectAction(action_set={self.action_set!r}, {props})" 786 else: 787 return f"P4IndirectAction(action_set={self.action_set!r})" 788 if self.member_id is not None: 789 return f"P4IndirectAction(member_id={self.member_id!r})" 790 return f"P4IndirectAction(group_id={self.group_id!r})" 791 792 def _repr_properties(self) -> str: 793 "Return string description of properties used in __repr__." 794 result: list[str] = [] 795 if ( 796 self.selection_mode 797 != P4ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 798 ): 799 result.append(f"selection_mode={self.selection_mode!r}") 800 if ( 801 self.size_semantics 802 != P4ActionSizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 803 ): 804 result.append(f"size_semantics={self.size_semantics!r}") 805 return ", ".join(result) 806 807 def _format_properties(self) -> str: 808 "Return string description of properties used in format_str." 809 result: list[str] = [] 810 if ( 811 self.selection_mode 812 != P4ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 813 ): 814 result.append(f"{self.selection_mode.name}") 815 if ( 816 self.size_semantics 817 != P4ActionSizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 818 ): 819 result.append(f"{self.size_semantics.name}") 820 return ", ".join(result)
Represents a P4Runtime Action reference for an indirect table.
An indirect action can be either:
- a "one-shot" action (action_set)
- a reference to an action profile member (member_id)
- a reference to an action profile group (group_id)
Only one of action_set, member_id or group_id may be configured. The other values must be None.
Attributes:
- action_set: sequence of weighted actions for one-shot
- member_id: ID of action profile member
- group_id: ID of action profile group
Examples:
# Construct a one-shot action profile.
one_shot = P4IndirectAction(
2 * P4TableAction("forward", port=1),
1 * P4TableAction("forward", port=2),
)
# Refer to an action profile member by ID.
member_action = P4IndirectAction(member_id=1)
# Refer to an action profile group by ID.
group_action = P4IndirectAction(group_id=2)
References:
- "9.1.2. Action Specification",
- "9.2.3. One Shot Action Selector Programming"
See Also:
- P4TableEntry
Sequence of weighted actions defining one-shot action profile.
692 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 693 "Encode object as a TableAction." 694 if self.action_set is not None: 695 return p4r.TableAction( 696 action_profile_action_set=self.encode_action_set(table) 697 ) 698 699 if self.member_id is not None: 700 return p4r.TableAction(action_profile_member_id=self.member_id) 701 702 assert self.group_id is not None 703 return p4r.TableAction(action_profile_group_id=self.group_id)
Encode object as a TableAction.
705 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 706 "Encode object as an ActionProfileActionSet." 707 assert self.action_set is not None 708 709 profile_actions: list[p4r.ActionProfileAction] = [] 710 for weight, table_action in self.action_set: 711 action = table_action.encode_action(table) 712 713 match weight: 714 case int(weight_value): 715 watch_port = None 716 case (weight_value, int(watch)): 717 watch_port = encode_watch_port(watch) 718 case _: 719 raise ValueError(f"unexpected action weight: {weight!r}") 720 721 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 722 if watch_port is not None: 723 profile.watch_port = watch_port 724 profile_actions.append(profile) 725 726 return p4r.ActionProfileActionSet( 727 action_profile_actions=profile_actions, 728 action_selection_mode=self.selection_mode.vt(), 729 size_semantics=self.size_semantics.vt(), 730 )
Encode object as an ActionProfileActionSet.
732 @classmethod 733 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 734 "Decode ActionProfileActionSet." 735 action_set = list[P4WeightedAction]() 736 737 for action in msg.action_profile_actions: 738 match action.WhichOneof("watch_kind"): 739 case "watch_port": 740 weight = (action.weight, decode_watch_port(action.watch_port)) 741 case None: 742 weight = action.weight 743 case other: 744 # "watch" (deprecated) is not supported 745 raise ValueError(f"unexpected oneof: {other!r}") 746 747 table_action = P4TableAction.decode_action(action.action, table) 748 action_set.append((weight, table_action)) 749 750 return cls( 751 action_set, 752 selection_mode=P4ActionSelectionMode(msg.action_selection_mode), 753 size_semantics=P4ActionSizeSemantics(msg.size_semantics), 754 )
Decode ActionProfileActionSet.
756 def format_str(self, table: P4Table) -> str: 757 """Format the indirect table action as a human-readable string.""" 758 if self.action_set is not None: 759 weighted_actions = [ 760 f"{weight}*{action.format_str(table)}" 761 for weight, action in self.action_set 762 ] 763 props = self._format_properties() 764 if props: 765 props = f" <{props}>" 766 return " ".join(weighted_actions) + props 767 768 # Use the name of the action_profile, if we can get it. If not, just 769 # use the placeholder "__indirect". 770 if table.action_profile is not None: 771 profile_name = f"@{table.action_profile.alias}" 772 else: 773 profile_name = "__indirect" 774 775 if self.member_id is not None: 776 return f"{profile_name}[[{self.member_id:#x}]]" 777 778 return f"{profile_name}[{self.group_id:#x}]"
Format the indirect table action as a human-readable string.
1464@dataclass(slots=True) 1465class P4Member: 1466 """Represents an ActionProfileGroup Member. 1467 1468 See Also: 1469 - P4ActionProfileGroup 1470 """ 1471 1472 member_id: int 1473 _: KW_ONLY 1474 weight: P4Weight 1475 1476 def encode(self) -> p4r.ActionProfileGroup.Member: 1477 "Encode P4Member as protobuf." 1478 match self.weight: 1479 case int(weight): 1480 watch_port = None 1481 case (int(weight), int(watch)): 1482 watch_port = encode_watch_port(watch) 1483 case other: # pyright: ignore[reportUnnecessaryComparison] 1484 raise ValueError(f"unexpected weight: {other!r}") 1485 1486 member = p4r.ActionProfileGroup.Member( 1487 member_id=self.member_id, 1488 weight=weight, 1489 ) 1490 1491 if watch_port is not None: 1492 member.watch_port = watch_port 1493 return member 1494 1495 @classmethod 1496 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1497 "Decode protobuf to P4Member." 1498 match msg.WhichOneof("watch_kind"): 1499 case "watch_port": 1500 weight = (msg.weight, decode_watch_port(msg.watch_port)) 1501 case None: 1502 weight = msg.weight 1503 case other: 1504 # "watch" (deprecated) is not supported 1505 raise ValueError(f"unknown oneof: {other!r}") 1506 1507 return cls(member_id=msg.member_id, weight=weight)
Represents an ActionProfileGroup Member.
See Also:
- P4ActionProfileGroup
1476 def encode(self) -> p4r.ActionProfileGroup.Member: 1477 "Encode P4Member as protobuf." 1478 match self.weight: 1479 case int(weight): 1480 watch_port = None 1481 case (int(weight), int(watch)): 1482 watch_port = encode_watch_port(watch) 1483 case other: # pyright: ignore[reportUnnecessaryComparison] 1484 raise ValueError(f"unexpected weight: {other!r}") 1485 1486 member = p4r.ActionProfileGroup.Member( 1487 member_id=self.member_id, 1488 weight=weight, 1489 ) 1490 1491 if watch_port is not None: 1492 member.watch_port = watch_port 1493 return member
Encode P4Member as protobuf.
1495 @classmethod 1496 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1497 "Decode protobuf to P4Member." 1498 match msg.WhichOneof("watch_kind"): 1499 case "watch_port": 1500 weight = (msg.weight, decode_watch_port(msg.watch_port)) 1501 case None: 1502 weight = msg.weight 1503 case other: 1504 # "watch" (deprecated) is not supported 1505 raise ValueError(f"unknown oneof: {other!r}") 1506 1507 return cls(member_id=msg.member_id, weight=weight)
Decode protobuf to P4Member.
823@dataclass(kw_only=True, slots=True) 824class P4MeterConfig: 825 """Represents a P4Runtime MeterConfig. 826 827 Attributes: 828 cir (int): Committed information rate (units/sec). 829 cburst (int): Committed burst size. 830 pir (int): Peak information rate (units/sec). 831 pburst (int): Peak burst size. 832 eburst (int): Excess burst size (only used by srTCM). [default=0] 833 834 Example: 835 ``` 836 config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20) 837 ``` 838 839 See Also: 840 - P4TableEntry 841 - P4MeterEntry 842 - P4DirectMeterEntry 843 """ 844 845 cir: int 846 "Committed information rate (units/sec)." 847 cburst: int 848 "Committed burst size." 849 pir: int 850 "Peak information rate (units/sec)." 851 pburst: int 852 "Peak burst size." 853 eburst: int = 0 854 "Excess burst size (only used by srTCM)." 855 856 def encode(self) -> p4r.MeterConfig: 857 "Encode object as MeterConfig." 858 return p4r.MeterConfig( 859 cir=self.cir, 860 cburst=self.cburst, 861 pir=self.pir, 862 pburst=self.pburst, 863 eburst=self.eburst, 864 ) 865 866 @classmethod 867 def decode(cls, msg: p4r.MeterConfig) -> Self: 868 "Decode MeterConfig." 869 return cls( 870 cir=msg.cir, 871 cburst=msg.cburst, 872 pir=msg.pir, 873 pburst=msg.pburst, 874 eburst=msg.eburst, 875 )
Represents a P4Runtime MeterConfig.
Attributes:
- cir (int): Committed information rate (units/sec).
- cburst (int): Committed burst size.
- pir (int): Peak information rate (units/sec).
- pburst (int): Peak burst size.
- eburst (int): Excess burst size (only used by srTCM). [default=0]
Example:
config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)
See Also:
- P4TableEntry
- P4MeterEntry
- P4DirectMeterEntry
856 def encode(self) -> p4r.MeterConfig: 857 "Encode object as MeterConfig." 858 return p4r.MeterConfig( 859 cir=self.cir, 860 cburst=self.cburst, 861 pir=self.pir, 862 pburst=self.pburst, 863 eburst=self.eburst, 864 )
Encode object as MeterConfig.
866 @classmethod 867 def decode(cls, msg: p4r.MeterConfig) -> Self: 868 "Decode MeterConfig." 869 return cls( 870 cir=msg.cir, 871 cburst=msg.cburst, 872 pir=msg.pir, 873 pburst=msg.pburst, 874 eburst=msg.eburst, 875 )
Decode MeterConfig.
910@dataclass(kw_only=True, slots=True) 911class P4MeterCounterData: 912 """Represents a P4Runtime MeterCounterData that stores per-color counters. 913 914 Attributes: 915 green (CounterData): counter data for packets marked GREEN. 916 yellow (CounterData): counter data for packets marked YELLOW. 917 red (CounterData): counter data for packets marked RED. 918 919 See Also: 920 - P4TableEntry 921 - P4MeterEntry 922 - P4DirectMeterEntry 923 """ 924 925 green: P4CounterData 926 "Counter of packets marked GREEN." 927 yellow: P4CounterData 928 "Counter of packets marked YELLOW." 929 red: P4CounterData 930 "Counter of packets marked RED." 931 932 def encode(self) -> p4r.MeterCounterData: 933 "Encode object as MeterCounterData." 934 return p4r.MeterCounterData( 935 green=self.green.encode(), 936 yellow=self.yellow.encode(), 937 red=self.red.encode(), 938 ) 939 940 @classmethod 941 def decode(cls, msg: p4r.MeterCounterData) -> Self: 942 "Decode MeterCounterData." 943 return cls( 944 green=P4CounterData.decode(msg.green), 945 yellow=P4CounterData.decode(msg.yellow), 946 red=P4CounterData.decode(msg.red), 947 )
Represents a P4Runtime MeterCounterData that stores per-color counters.
Attributes:
- green (CounterData): counter data for packets marked GREEN.
- yellow (CounterData): counter data for packets marked YELLOW.
- red (CounterData): counter data for packets marked RED.
See Also:
- P4TableEntry
- P4MeterEntry
- P4DirectMeterEntry
932 def encode(self) -> p4r.MeterCounterData: 933 "Encode object as MeterCounterData." 934 return p4r.MeterCounterData( 935 green=self.green.encode(), 936 yellow=self.yellow.encode(), 937 red=self.red.encode(), 938 )
Encode object as MeterCounterData.
940 @classmethod 941 def decode(cls, msg: p4r.MeterCounterData) -> Self: 942 "Decode MeterCounterData." 943 return cls( 944 green=P4CounterData.decode(msg.green), 945 yellow=P4CounterData.decode(msg.yellow), 946 red=P4CounterData.decode(msg.red), 947 )
Decode MeterCounterData.
1572@decodable("meter_entry") 1573@dataclass(slots=True) 1574class P4MeterEntry(_P4ModifyOnly): 1575 "Represents a P4Runtime MeterEntry." 1576 1577 meter_id: str = "" 1578 _: KW_ONLY 1579 index: int | None = None 1580 config: P4MeterConfig | None = None 1581 counter_data: P4MeterCounterData | None = None 1582 1583 def encode(self, schema: P4Schema) -> p4r.Entity: 1584 "Encode P4MeterEntry to protobuf." 1585 if not self.meter_id: 1586 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1587 1588 meter = schema.meters[self.meter_id] 1589 1590 if self.index is not None: 1591 index = p4r.Index(index=self.index) 1592 else: 1593 index = None 1594 1595 if self.config is not None: 1596 config = self.config.encode() 1597 else: 1598 config = None 1599 1600 if self.counter_data is not None: 1601 counter_data = self.counter_data.encode() 1602 else: 1603 counter_data = None 1604 1605 entry = p4r.MeterEntry( 1606 meter_id=meter.id, 1607 index=index, 1608 config=config, 1609 counter_data=counter_data, 1610 ) 1611 return p4r.Entity(meter_entry=entry) 1612 1613 @classmethod 1614 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1615 "Decode protobuf to P4MeterEntry." 1616 entry = msg.meter_entry 1617 if not entry.meter_id: 1618 return cls() 1619 1620 meter = schema.meters[entry.meter_id] 1621 1622 if entry.HasField("index"): 1623 index = entry.index.index 1624 else: 1625 index = None 1626 1627 if entry.HasField("config"): 1628 config = P4MeterConfig.decode(entry.config) 1629 else: 1630 config = None 1631 1632 if entry.HasField("counter_data"): 1633 counter_data = P4MeterCounterData.decode(entry.counter_data) 1634 else: 1635 counter_data = None 1636 1637 return cls( 1638 meter_id=meter.alias, 1639 index=index, 1640 config=config, 1641 counter_data=counter_data, 1642 )
Represents a P4Runtime MeterEntry.
1583 def encode(self, schema: P4Schema) -> p4r.Entity: 1584 "Encode P4MeterEntry to protobuf." 1585 if not self.meter_id: 1586 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1587 1588 meter = schema.meters[self.meter_id] 1589 1590 if self.index is not None: 1591 index = p4r.Index(index=self.index) 1592 else: 1593 index = None 1594 1595 if self.config is not None: 1596 config = self.config.encode() 1597 else: 1598 config = None 1599 1600 if self.counter_data is not None: 1601 counter_data = self.counter_data.encode() 1602 else: 1603 counter_data = None 1604 1605 entry = p4r.MeterEntry( 1606 meter_id=meter.id, 1607 index=index, 1608 config=config, 1609 counter_data=counter_data, 1610 ) 1611 return p4r.Entity(meter_entry=entry)
Encode P4MeterEntry to protobuf.
1613 @classmethod 1614 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1615 "Decode protobuf to P4MeterEntry." 1616 entry = msg.meter_entry 1617 if not entry.meter_id: 1618 return cls() 1619 1620 meter = schema.meters[entry.meter_id] 1621 1622 if entry.HasField("index"): 1623 index = entry.index.index 1624 else: 1625 index = None 1626 1627 if entry.HasField("config"): 1628 config = P4MeterConfig.decode(entry.config) 1629 else: 1630 config = None 1631 1632 if entry.HasField("counter_data"): 1633 counter_data = P4MeterCounterData.decode(entry.counter_data) 1634 else: 1635 counter_data = None 1636 1637 return cls( 1638 meter_id=meter.alias, 1639 index=index, 1640 config=config, 1641 counter_data=counter_data, 1642 )
Decode protobuf to P4MeterEntry.
1281@decodable("multicast_group_entry") 1282@dataclass(slots=True) 1283class P4MulticastGroupEntry(_P4Writable): 1284 "Represents a P4Runtime MulticastGroupEntry." 1285 1286 multicast_group_id: int = 0 1287 _: KW_ONLY 1288 replicas: Sequence[_ReplicaType] = () 1289 metadata: bytes = b"" 1290 1291 def encode(self, schema: P4Schema) -> p4r.Entity: 1292 "Encode MulticastGroupEntry data as protobuf." 1293 entry = p4r.MulticastGroupEntry( 1294 multicast_group_id=self.multicast_group_id, 1295 replicas=[encode_replica(replica) for replica in self.replicas], 1296 metadata=self.metadata, 1297 ) 1298 return p4r.Entity( 1299 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1300 multicast_group_entry=entry 1301 ) 1302 ) 1303 1304 @classmethod 1305 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1306 "Decode protobuf to MulticastGroupEntry data." 1307 entry = msg.packet_replication_engine_entry.multicast_group_entry 1308 return cls( 1309 multicast_group_id=entry.multicast_group_id, 1310 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1311 metadata=entry.metadata, 1312 ) 1313 1314 def replicas_str(self) -> str: 1315 "Format the replicas as a human-readable, canonical string." 1316 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime MulticastGroupEntry.
1291 def encode(self, schema: P4Schema) -> p4r.Entity: 1292 "Encode MulticastGroupEntry data as protobuf." 1293 entry = p4r.MulticastGroupEntry( 1294 multicast_group_id=self.multicast_group_id, 1295 replicas=[encode_replica(replica) for replica in self.replicas], 1296 metadata=self.metadata, 1297 ) 1298 return p4r.Entity( 1299 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1300 multicast_group_entry=entry 1301 ) 1302 )
Encode MulticastGroupEntry data as protobuf.
1304 @classmethod 1305 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1306 "Decode protobuf to MulticastGroupEntry data." 1307 entry = msg.packet_replication_engine_entry.multicast_group_entry 1308 return cls( 1309 multicast_group_id=entry.multicast_group_id, 1310 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1311 metadata=entry.metadata, 1312 )
Decode protobuf to MulticastGroupEntry data.
1949@decodable("packet") 1950@dataclass(slots=True) 1951class P4PacketIn: 1952 "Represents a P4Runtime PacketIn." 1953 1954 payload: bytes 1955 _: KW_ONLY 1956 metadata: _MetadataDictType 1957 1958 @classmethod 1959 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1960 "Decode protobuf to PacketIn data." 1961 packet = msg.packet 1962 cpm = schema.controller_packet_metadata.get("packet_in") 1963 if cpm is None: 1964 # There is no controller metadata. Warn if message has any. 1965 pkt_meta = packet.metadata 1966 if pkt_meta: 1967 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 1968 return cls(packet.payload, metadata={}) 1969 1970 return cls( 1971 packet.payload, 1972 metadata=cpm.decode(packet.metadata), 1973 ) 1974 1975 def __getitem__(self, key: str) -> Any: 1976 "Retrieve metadata value." 1977 return self.metadata[key] 1978 1979 def __repr__(self) -> str: 1980 "Return friendlier hexadecimal description of packet." 1981 if self.metadata: 1982 return f"P4PacketIn(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 1983 return f"P4PacketIn(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketIn.
1958 @classmethod 1959 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1960 "Decode protobuf to PacketIn data." 1961 packet = msg.packet 1962 cpm = schema.controller_packet_metadata.get("packet_in") 1963 if cpm is None: 1964 # There is no controller metadata. Warn if message has any. 1965 pkt_meta = packet.metadata 1966 if pkt_meta: 1967 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 1968 return cls(packet.payload, metadata={}) 1969 1970 return cls( 1971 packet.payload, 1972 metadata=cpm.decode(packet.metadata), 1973 )
Decode protobuf to PacketIn data.
1986@dataclass(slots=True) 1987class P4PacketOut: 1988 "Represents a P4Runtime PacketOut." 1989 1990 payload: bytes 1991 _: KW_ONLY 1992 metadata: _MetadataDictType 1993 1994 def __init__(self, __payload: bytes, /, **metadata: Any): 1995 self.payload = __payload 1996 self.metadata = metadata 1997 1998 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 1999 "Encode PacketOut data as protobuf." 2000 cpm = schema.controller_packet_metadata["packet_out"] 2001 return p4r.StreamMessageRequest( 2002 packet=p4r.PacketOut( 2003 payload=self.payload, 2004 metadata=cpm.encode(self.metadata), 2005 ) 2006 ) 2007 2008 def __getitem__(self, key: str) -> Any: 2009 "Retrieve metadata value." 2010 return self.metadata[key] 2011 2012 def __repr__(self) -> str: 2013 "Return friendlier hexadecimal description of packet." 2014 if self.metadata: 2015 return f"P4PacketOut(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 2016 return f"P4PacketOut(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketOut.
1998 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 1999 "Encode PacketOut data as protobuf." 2000 cpm = schema.controller_packet_metadata["packet_out"] 2001 return p4r.StreamMessageRequest( 2002 packet=p4r.PacketOut( 2003 payload=self.payload, 2004 metadata=cpm.encode(self.metadata), 2005 ) 2006 )
Encode PacketOut data as protobuf.
1221@decodable("register_entry") 1222@dataclass(slots=True) 1223class P4RegisterEntry(_P4ModifyOnly): 1224 "Represents a P4Runtime RegisterEntry." 1225 1226 register_id: str = "" 1227 _: KW_ONLY 1228 index: int | None = None 1229 data: _DataValueType | None = None 1230 1231 def encode(self, schema: P4Schema) -> p4r.Entity: 1232 "Encode RegisterEntry data as protobuf." 1233 if not self.register_id: 1234 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1235 1236 register = schema.registers[self.register_id] 1237 1238 if self.index is not None: 1239 index = p4r.Index(index=self.index) 1240 else: 1241 index = None 1242 1243 if self.data is not None: 1244 data = register.type_spec.encode_data(self.data) 1245 else: 1246 data = None 1247 1248 entry = p4r.RegisterEntry( 1249 register_id=register.id, 1250 index=index, 1251 data=data, 1252 ) 1253 return p4r.Entity(register_entry=entry) 1254 1255 @classmethod 1256 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1257 "Decode protobuf to RegisterEntry data." 1258 entry = msg.register_entry 1259 if entry.register_id == 0: 1260 return cls() 1261 1262 register = schema.registers[entry.register_id] 1263 1264 if entry.HasField("index"): 1265 index = entry.index.index 1266 else: 1267 index = None 1268 1269 if entry.HasField("data"): 1270 data = register.type_spec.decode_data(entry.data) 1271 else: 1272 data = None 1273 1274 return cls( 1275 register.alias, 1276 index=index, 1277 data=data, 1278 )
Represents a P4Runtime RegisterEntry.
1231 def encode(self, schema: P4Schema) -> p4r.Entity: 1232 "Encode RegisterEntry data as protobuf." 1233 if not self.register_id: 1234 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1235 1236 register = schema.registers[self.register_id] 1237 1238 if self.index is not None: 1239 index = p4r.Index(index=self.index) 1240 else: 1241 index = None 1242 1243 if self.data is not None: 1244 data = register.type_spec.encode_data(self.data) 1245 else: 1246 data = None 1247 1248 entry = p4r.RegisterEntry( 1249 register_id=register.id, 1250 index=index, 1251 data=data, 1252 ) 1253 return p4r.Entity(register_entry=entry)
Encode RegisterEntry data as protobuf.
1255 @classmethod 1256 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1257 "Decode protobuf to RegisterEntry data." 1258 entry = msg.register_entry 1259 if entry.register_id == 0: 1260 return cls() 1261 1262 register = schema.registers[entry.register_id] 1263 1264 if entry.HasField("index"): 1265 index = entry.index.index 1266 else: 1267 index = None 1268 1269 if entry.HasField("data"): 1270 data = register.type_spec.decode_data(entry.data) 1271 else: 1272 data = None 1273 1274 return cls( 1275 register.alias, 1276 index=index, 1277 data=data, 1278 )
Decode protobuf to RegisterEntry data.
Action is an alias for P4TableAction.
441@dataclass(init=False, slots=True) 442class P4TableAction: 443 """Represents a P4Runtime Action reference for a direct table. 444 445 Attributes: 446 name (str): the name of the action. 447 args (dict[str, Any]): the action's arguments as a dictionary. 448 449 Example: 450 If the name of the action is "ipv4_forward" and it takes a single 451 "port" parameter, you can construct the action as: 452 453 ``` 454 action = P4TableAction("ipv4_forward", port=1) 455 ``` 456 457 Reference "9.1.2 Action Specification": 458 The Action Protobuf has fields: (action_id, params). Finsy translates 459 `name` to the appropriate `action_id` as determined by P4Info. It also 460 translates each named argument in `args` to the appropriate `param_id`. 461 462 See Also: 463 To specify an action for an indirect table, use `P4IndirectAction`. 464 Note that P4TableAction will automatically be promoted to an "indirect" 465 action if needed. 466 467 Operators: 468 A `P4TableAction` supports the multiplication operator (*) for 469 constructing "weighted actions". A weighted action is used in specifying 470 indirect actions. Here is an action with a weight of 3: 471 472 ``` 473 weighted_action = 3 * P4TableAction("ipv4_forward", port=1) 474 ``` 475 476 To specify a weight with a `watch_port`, use a tuple `(weight, port)`. 477 The weight is always a positive integer. 478 479 See Also: 480 - P4TableEntry 481 """ 482 483 name: str 484 "The name of the action." 485 args: dict[str, Any] 486 "The action's arguments as a dictionary." 487 488 def __init__(self, __name: str, /, **args: Any): 489 self.name = __name 490 self.args = args 491 492 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 493 """Encode TableAction data as protobuf. 494 495 If the table is indirect, promote the action to a "one-shot" indirect 496 action. 497 """ 498 try: 499 action = table.actions[self.name] 500 except Exception as ex: 501 raise ValueError(f"{table.name!r}: {ex}") from ex 502 503 action_p4 = self._encode_action(action) 504 505 if table.action_profile is not None: 506 # Promote action to ActionProfileActionSet entry with weight=1. 507 return p4r.TableAction( 508 action_profile_action_set=p4r.ActionProfileActionSet( 509 action_profile_actions=[ 510 p4r.ActionProfileAction(action=action_p4, weight=1) 511 ] 512 ) 513 ) 514 515 return p4r.TableAction(action=action_p4) 516 517 def _fail_missing_params(self, action: P4ActionRef | P4Action) -> NoReturn: 518 "Report missing parameters." 519 seen = {param.name for param in action.params} 520 for name in self.args: 521 param = action.params[name] 522 seen.remove(param.name) 523 524 raise ValueError(f"Action {action.alias!r}: missing parameters {seen}") 525 526 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 527 "Encode Action data as protobuf." 528 action = schema.actions[self.name] 529 return self._encode_action(action) 530 531 def _encode_action(self, action: P4ActionRef | P4Action) -> p4r.Action: 532 "Helper to encode an action." 533 aps = action.params 534 try: 535 params = [ 536 aps[name].encode_param(value) for name, value in self.args.items() 537 ] 538 except ValueError as ex: 539 raise ValueError(f"{action.alias!r}: {ex}") from ex 540 541 # Check for missing action parameters. We always accept an action with 542 # no parameters (for wildcard ReadRequests). 543 param_count = len(params) 544 if param_count > 0 and param_count != len(aps): 545 self._fail_missing_params(action) 546 547 return p4r.Action(action_id=action.id, params=params) 548 549 @classmethod 550 def decode_table_action( 551 cls, msg: p4r.TableAction, table: P4Table 552 ) -> Self | "P4IndirectAction": 553 "Decode protobuf to TableAction data." 554 match msg.WhichOneof("type"): 555 case "action": 556 return cls.decode_action(msg.action, table) 557 case "action_profile_member_id": 558 return P4IndirectAction(member_id=msg.action_profile_member_id) 559 case "action_profile_group_id": 560 return P4IndirectAction(group_id=msg.action_profile_group_id) 561 case "action_profile_action_set": 562 return P4IndirectAction.decode_action_set( 563 msg.action_profile_action_set, table 564 ) 565 case other: 566 raise ValueError(f"unknown oneof: {other!r}") 567 568 @classmethod 569 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 570 "Decode protobuf to Action data." 571 action = parent.actions[msg.action_id] 572 args = {} 573 for param in msg.params: 574 action_param = action.params[param.param_id] 575 value = action_param.decode_param(param) 576 args[action_param.name] = value 577 578 return cls(action.alias, **args) 579 580 def format_str(self, schema: P4Schema | P4Table) -> str: 581 """Format the table action as a human-readable string. 582 583 The result is formatted to look like a function call: 584 585 ``` 586 name(param1=value1, ...) 587 ``` 588 589 Where `name` is the action name, and `(param<N>, value<N>)` are the 590 action parameters. The format of `value<N>` is schema-dependent. 591 """ 592 aps = schema.actions[self.name].params 593 args = [ 594 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 595 ] 596 597 return f"{self.name}({', '.join(args)})" 598 599 def __mul__(self, weight: P4Weight) -> P4WeightedAction: 600 "Make a weighted action." 601 if not isinstance( 602 weight, (int, tuple) 603 ): # pyright: ignore[reportUnnecessaryIsInstance] 604 raise NotImplementedError("expected P4Weight") 605 return (weight, self) 606 607 def __rmul__(self, weight: P4Weight) -> P4WeightedAction: 608 "Make a weighted action." 609 if not isinstance( 610 weight, (int, tuple) 611 ): # pyright: ignore[reportUnnecessaryIsInstance] 612 raise NotImplementedError("expected P4Weight") 613 return (weight, self) 614 615 def __call__(self, **params: Any) -> Self: 616 "Return a new action with the updated parameters." 617 return self.__class__(self.name, **(self.args | params))
Represents a P4Runtime Action reference for a direct table.
Attributes:
- name (str): the name of the action.
- args (dict[str, Any]): the action's arguments as a dictionary.
Example:
If the name of the action is "ipv4_forward" and it takes a single "port" parameter, you can construct the action as:
action = P4TableAction("ipv4_forward", port=1)
Reference "9.1.2 Action Specification":
The Action Protobuf has fields: (action_id, params). Finsy translates
name to the appropriate action_id as determined by P4Info. It also
translates each named argument in args to the appropriate param_id.
See Also:
To specify an action for an indirect table, use
P4IndirectAction. Note that P4TableAction will automatically be promoted to an "indirect" action if needed.
Operators:
A
P4TableActionsupports the multiplication operator (*) for constructing "weighted actions". A weighted action is used in specifying indirect actions. Here is an action with a weight of 3:weighted_action = 3 * P4TableAction("ipv4_forward", port=1)To specify a weight with a
watch_port, use a tuple(weight, port). The weight is always a positive integer.
See Also:
- P4TableEntry
492 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 493 """Encode TableAction data as protobuf. 494 495 If the table is indirect, promote the action to a "one-shot" indirect 496 action. 497 """ 498 try: 499 action = table.actions[self.name] 500 except Exception as ex: 501 raise ValueError(f"{table.name!r}: {ex}") from ex 502 503 action_p4 = self._encode_action(action) 504 505 if table.action_profile is not None: 506 # Promote action to ActionProfileActionSet entry with weight=1. 507 return p4r.TableAction( 508 action_profile_action_set=p4r.ActionProfileActionSet( 509 action_profile_actions=[ 510 p4r.ActionProfileAction(action=action_p4, weight=1) 511 ] 512 ) 513 ) 514 515 return p4r.TableAction(action=action_p4)
Encode TableAction data as protobuf.
If the table is indirect, promote the action to a "one-shot" indirect action.
526 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 527 "Encode Action data as protobuf." 528 action = schema.actions[self.name] 529 return self._encode_action(action)
Encode Action data as protobuf.
549 @classmethod 550 def decode_table_action( 551 cls, msg: p4r.TableAction, table: P4Table 552 ) -> Self | "P4IndirectAction": 553 "Decode protobuf to TableAction data." 554 match msg.WhichOneof("type"): 555 case "action": 556 return cls.decode_action(msg.action, table) 557 case "action_profile_member_id": 558 return P4IndirectAction(member_id=msg.action_profile_member_id) 559 case "action_profile_group_id": 560 return P4IndirectAction(group_id=msg.action_profile_group_id) 561 case "action_profile_action_set": 562 return P4IndirectAction.decode_action_set( 563 msg.action_profile_action_set, table 564 ) 565 case other: 566 raise ValueError(f"unknown oneof: {other!r}")
Decode protobuf to TableAction data.
568 @classmethod 569 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 570 "Decode protobuf to Action data." 571 action = parent.actions[msg.action_id] 572 args = {} 573 for param in msg.params: 574 action_param = action.params[param.param_id] 575 value = action_param.decode_param(param) 576 args[action_param.name] = value 577 578 return cls(action.alias, **args)
Decode protobuf to Action data.
580 def format_str(self, schema: P4Schema | P4Table) -> str: 581 """Format the table action as a human-readable string. 582 583 The result is formatted to look like a function call: 584 585 ``` 586 name(param1=value1, ...) 587 ``` 588 589 Where `name` is the action name, and `(param<N>, value<N>)` are the 590 action parameters. The format of `value<N>` is schema-dependent. 591 """ 592 aps = schema.actions[self.name].params 593 args = [ 594 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 595 ] 596 597 return f"{self.name}({', '.join(args)})"
Format the table action as a human-readable string.
The result is formatted to look like a function call:
name(param1=value1, ...)
Where name is the action name, and (param<N>, value<N>) are the
action parameters. The format of value<N> is schema-dependent.
950@decodable("table_entry") 951@dataclass(slots=True) 952class P4TableEntry(_P4Writable): 953 """Represents a P4Runtime table entry. 954 955 Attributes: 956 table_id (str): Name of the table. 957 match (P4TableMatch | None): Entry's match fields. 958 action (P4TableAction | P4IndirectAction | None): Entry's action. 959 is_default_action (bool): True if entry is the default table entry. 960 priority (int): Priority of a table entry when match implies TCAM lookup. 961 metadata (bytes): Arbitrary controller cookie (1.2.0). 962 controller_metadata (int): Deprecated controller cookie (< 1.2.0). 963 meter_config (P4MeterConfig | None): Meter configuration. 964 counter_data (P4CounterData | None): Counter data for table entry. 965 meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0). 966 idle_timeout_ns (int): Idle timeout in nanoseconds. 967 time_since_last_hit (int | None): Nanoseconds since entry last matched. 968 is_const (bool): True if entry is constant (1.4.0). 969 970 The most commonly used fields are table_id, match, action, is_default_action, 971 and priority. See the P4Runtime Spec for usage examples regarding the other 972 attributes. 973 974 When writing a P4TableEntry, you can specify the type of update using '+', 975 '-', and '~'. 976 977 Examples: 978 ``` 979 # Specify all tables when using "read". 980 entry = fy.P4TableEntry() 981 982 # Specify the table named "ipv4" when using "read". 983 entry = fy.P4TableEntry("ipv4") 984 985 # Specify the default entry in the "ipv4" table when using "read". 986 entry = fy.P4TableEntry("ipv4", is_default_action=True) 987 988 # Insert an entry into the "ipv4" table. 989 update = +fy.P4TableEntry( 990 "ipv4", 991 match=fy.Match(ipv4_dst="10.0.0.0/8"), 992 action=fy.Action("forward", port=1), 993 ) 994 995 # Modify the default action in the "ipv4" table. 996 update = ~fy.P4TableEntry( 997 "ipv4", 998 action=fy.Action("forward", port=5), 999 is_default_action=True 1000 ) 1001 ``` 1002 1003 Operators: 1004 You can retrieve a match field from a table entry using `[]`. For 1005 example, `entry["ipv4_dst"]` is the same as `entry.match["ipv4_dst"]`. 1006 1007 Formatting Helpers: 1008 The `match_str` and `action_str` methods provide P4Info-aware formatting 1009 of the match and action attributes. 1010 """ 1011 1012 table_id: str = "" 1013 "Name of the table." 1014 _: KW_ONLY 1015 match: P4TableMatch | None = None 1016 "Entry's match fields." 1017 action: P4TableAction | P4IndirectAction | None = None 1018 "Entry's action." 1019 is_default_action: bool = False 1020 "True if entry is the default table entry." 1021 priority: int = 0 1022 "Priority of a table entry when match implies TCAM lookup." 1023 metadata: bytes = b"" 1024 "Arbitrary controller cookie. (1.2.0)." 1025 controller_metadata: int = 0 1026 "Deprecated controller cookie (< 1.2.0)." 1027 meter_config: P4MeterConfig | None = None 1028 "Meter configuration." 1029 counter_data: P4CounterData | None = None 1030 "Counter data for table entry." 1031 meter_counter_data: P4MeterCounterData | None = None 1032 "Meter counter data (1.4.0)." 1033 idle_timeout_ns: int = 0 1034 "Idle timeout in nanoseconds." 1035 time_since_last_hit: int | None = None 1036 "Nanoseconds since entry last matched." 1037 is_const: bool = False 1038 "True if entry is constant (1.4.0)." 1039 1040 def __getitem__(self, key: str) -> Any: 1041 "Convenience accessor to retrieve a value from the `match` property." 1042 if self.match is not None: 1043 return self.match[key] 1044 raise KeyError(key) 1045 1046 def encode(self, schema: P4Schema) -> p4r.Entity: 1047 "Encode TableEntry data as protobuf." 1048 return p4r.Entity(table_entry=self.encode_entry(schema)) 1049 1050 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 1051 "Encode TableEntry data as protobuf." 1052 if not self.table_id: 1053 return self._encode_empty() 1054 1055 table = schema.tables[self.table_id] 1056 1057 if self.match: 1058 match = self.match.encode(table) 1059 else: 1060 match = None 1061 1062 if self.action: 1063 action = self.action.encode_table_action(table) 1064 else: 1065 action = None 1066 1067 if self.meter_config: 1068 meter_config = self.meter_config.encode() 1069 else: 1070 meter_config = None 1071 1072 if self.counter_data: 1073 counter_data = self.counter_data.encode() 1074 else: 1075 counter_data = None 1076 1077 if self.meter_counter_data: 1078 meter_counter_data = self.meter_counter_data.encode() 1079 else: 1080 meter_counter_data = None 1081 1082 if self.time_since_last_hit is not None: 1083 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1084 elapsed_ns=self.time_since_last_hit 1085 ) 1086 else: 1087 time_since_last_hit = None 1088 1089 return p4r.TableEntry( 1090 table_id=table.id, 1091 match=match, 1092 action=action, 1093 priority=self.priority, 1094 controller_metadata=self.controller_metadata, 1095 meter_config=meter_config, 1096 counter_data=counter_data, 1097 meter_counter_data=meter_counter_data, 1098 is_default_action=self.is_default_action, 1099 idle_timeout_ns=self.idle_timeout_ns, 1100 time_since_last_hit=time_since_last_hit, 1101 metadata=self.metadata, 1102 is_const=self.is_const, 1103 ) 1104 1105 def _encode_empty(self) -> p4r.TableEntry: 1106 "Encode an empty wildcard request." 1107 if self.counter_data is not None: 1108 counter_data = self.counter_data.encode() 1109 else: 1110 counter_data = None 1111 1112 # FIXME: time_since_last_hit not supported for wildcard reads? 1113 if self.time_since_last_hit is not None: 1114 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1115 elapsed_ns=self.time_since_last_hit 1116 ) 1117 else: 1118 time_since_last_hit = None 1119 1120 return p4r.TableEntry( 1121 counter_data=counter_data, 1122 time_since_last_hit=time_since_last_hit, 1123 ) 1124 1125 @classmethod 1126 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1127 "Decode protobuf to TableEntry data." 1128 return cls.decode_entry(msg.table_entry, schema) 1129 1130 @classmethod 1131 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1132 "Decode protobuf to TableEntry data." 1133 if entry.table_id == 0: 1134 return cls("") 1135 1136 table = schema.tables[entry.table_id] 1137 1138 if entry.match: 1139 match = P4TableMatch.decode(entry.match, table) 1140 else: 1141 match = None 1142 1143 if entry.HasField("action"): 1144 action = P4TableAction.decode_table_action(entry.action, table) 1145 else: 1146 action = None 1147 1148 if entry.HasField("time_since_last_hit"): 1149 last_hit = entry.time_since_last_hit.elapsed_ns 1150 else: 1151 last_hit = None 1152 1153 if entry.HasField("meter_config"): 1154 meter_config = P4MeterConfig.decode(entry.meter_config) 1155 else: 1156 meter_config = None 1157 1158 if entry.HasField("counter_data"): 1159 counter_data = P4CounterData.decode(entry.counter_data) 1160 else: 1161 counter_data = None 1162 1163 if entry.HasField("meter_counter_data"): 1164 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1165 else: 1166 meter_counter_data = None 1167 1168 return cls( 1169 table_id=table.alias, 1170 match=match, 1171 action=action, 1172 priority=entry.priority, 1173 controller_metadata=entry.controller_metadata, 1174 meter_config=meter_config, 1175 counter_data=counter_data, 1176 meter_counter_data=meter_counter_data, 1177 is_default_action=entry.is_default_action, 1178 idle_timeout_ns=entry.idle_timeout_ns, 1179 time_since_last_hit=last_hit, 1180 metadata=entry.metadata, 1181 is_const=entry.is_const, 1182 ) 1183 1184 def match_dict( 1185 self, 1186 schema: P4Schema, 1187 *, 1188 wildcard: str | None = None, 1189 ) -> dict[str, str]: 1190 """Format the match fields as a dictionary of strings. 1191 1192 If `wildcard` is None, only include match fields that have values. If 1193 `wildcard` is set, include all field names but replace unset values with 1194 given wildcard value (e.g. "*") 1195 """ 1196 table = schema.tables[self.table_id] 1197 if self.match is not None: 1198 return self.match.format_dict(table, wildcard=wildcard) 1199 return P4TableMatch().format_dict(table, wildcard=wildcard) 1200 1201 def match_str( 1202 self, 1203 schema: P4Schema, 1204 *, 1205 wildcard: str | None = None, 1206 ) -> str: 1207 "Format the match fields as a human-readable, canonical string." 1208 table = schema.tables[self.table_id] 1209 if self.match is not None: 1210 return self.match.format_str(table, wildcard=wildcard) 1211 return P4TableMatch().format_str(table, wildcard=wildcard) 1212 1213 def action_str(self, schema: P4Schema) -> str: 1214 "Format the actions as a human-readable, canonical string." 1215 table = schema.tables[self.table_id] 1216 if self.action is None: 1217 return NOACTION_STR 1218 return self.action.format_str(table)
Represents a P4Runtime table entry.
Attributes:
- table_id (str): Name of the table.
- match (P4TableMatch | None): Entry's match fields.
- action (P4TableAction | P4IndirectAction | None): Entry's action.
- is_default_action (bool): True if entry is the default table entry.
- priority (int): Priority of a table entry when match implies TCAM lookup.
- metadata (bytes): Arbitrary controller cookie (1.2.0).
- controller_metadata (int): Deprecated controller cookie (< 1.2.0).
- meter_config (P4MeterConfig | None): Meter configuration.
- counter_data (P4CounterData | None): Counter data for table entry.
- meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0).
- idle_timeout_ns (int): Idle timeout in nanoseconds.
- time_since_last_hit (int | None): Nanoseconds since entry last matched.
- is_const (bool): True if entry is constant (1.4.0).
The most commonly used fields are table_id, match, action, is_default_action, and priority. See the P4Runtime Spec for usage examples regarding the other attributes.
When writing a P4TableEntry, you can specify the type of update using '+', '-', and '~'.
Examples:
# Specify all tables when using "read".
entry = fy.P4TableEntry()
# Specify the table named "ipv4" when using "read".
entry = fy.P4TableEntry("ipv4")
# Specify the default entry in the "ipv4" table when using "read".
entry = fy.P4TableEntry("ipv4", is_default_action=True)
# Insert an entry into the "ipv4" table.
update = +fy.P4TableEntry(
"ipv4",
match=fy.Match(ipv4_dst="10.0.0.0/8"),
action=fy.Action("forward", port=1),
)
# Modify the default action in the "ipv4" table.
update = ~fy.P4TableEntry(
"ipv4",
action=fy.Action("forward", port=5),
is_default_action=True
)
Operators:
You can retrieve a match field from a table entry using
[]. For example,entry["ipv4_dst"]is the same asentry.match["ipv4_dst"].
Formatting Helpers:
The
match_strandaction_strmethods provide P4Info-aware formatting of the match and action attributes.
1040 def __getitem__(self, key: str) -> Any: 1041 "Convenience accessor to retrieve a value from the `match` property." 1042 if self.match is not None: 1043 return self.match[key] 1044 raise KeyError(key)
Convenience accessor to retrieve a value from the match property.
1046 def encode(self, schema: P4Schema) -> p4r.Entity: 1047 "Encode TableEntry data as protobuf." 1048 return p4r.Entity(table_entry=self.encode_entry(schema))
Encode TableEntry data as protobuf.
1050 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 1051 "Encode TableEntry data as protobuf." 1052 if not self.table_id: 1053 return self._encode_empty() 1054 1055 table = schema.tables[self.table_id] 1056 1057 if self.match: 1058 match = self.match.encode(table) 1059 else: 1060 match = None 1061 1062 if self.action: 1063 action = self.action.encode_table_action(table) 1064 else: 1065 action = None 1066 1067 if self.meter_config: 1068 meter_config = self.meter_config.encode() 1069 else: 1070 meter_config = None 1071 1072 if self.counter_data: 1073 counter_data = self.counter_data.encode() 1074 else: 1075 counter_data = None 1076 1077 if self.meter_counter_data: 1078 meter_counter_data = self.meter_counter_data.encode() 1079 else: 1080 meter_counter_data = None 1081 1082 if self.time_since_last_hit is not None: 1083 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1084 elapsed_ns=self.time_since_last_hit 1085 ) 1086 else: 1087 time_since_last_hit = None 1088 1089 return p4r.TableEntry( 1090 table_id=table.id, 1091 match=match, 1092 action=action, 1093 priority=self.priority, 1094 controller_metadata=self.controller_metadata, 1095 meter_config=meter_config, 1096 counter_data=counter_data, 1097 meter_counter_data=meter_counter_data, 1098 is_default_action=self.is_default_action, 1099 idle_timeout_ns=self.idle_timeout_ns, 1100 time_since_last_hit=time_since_last_hit, 1101 metadata=self.metadata, 1102 is_const=self.is_const, 1103 )
Encode TableEntry data as protobuf.
1125 @classmethod 1126 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1127 "Decode protobuf to TableEntry data." 1128 return cls.decode_entry(msg.table_entry, schema)
Decode protobuf to TableEntry data.
1130 @classmethod 1131 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1132 "Decode protobuf to TableEntry data." 1133 if entry.table_id == 0: 1134 return cls("") 1135 1136 table = schema.tables[entry.table_id] 1137 1138 if entry.match: 1139 match = P4TableMatch.decode(entry.match, table) 1140 else: 1141 match = None 1142 1143 if entry.HasField("action"): 1144 action = P4TableAction.decode_table_action(entry.action, table) 1145 else: 1146 action = None 1147 1148 if entry.HasField("time_since_last_hit"): 1149 last_hit = entry.time_since_last_hit.elapsed_ns 1150 else: 1151 last_hit = None 1152 1153 if entry.HasField("meter_config"): 1154 meter_config = P4MeterConfig.decode(entry.meter_config) 1155 else: 1156 meter_config = None 1157 1158 if entry.HasField("counter_data"): 1159 counter_data = P4CounterData.decode(entry.counter_data) 1160 else: 1161 counter_data = None 1162 1163 if entry.HasField("meter_counter_data"): 1164 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1165 else: 1166 meter_counter_data = None 1167 1168 return cls( 1169 table_id=table.alias, 1170 match=match, 1171 action=action, 1172 priority=entry.priority, 1173 controller_metadata=entry.controller_metadata, 1174 meter_config=meter_config, 1175 counter_data=counter_data, 1176 meter_counter_data=meter_counter_data, 1177 is_default_action=entry.is_default_action, 1178 idle_timeout_ns=entry.idle_timeout_ns, 1179 time_since_last_hit=last_hit, 1180 metadata=entry.metadata, 1181 is_const=entry.is_const, 1182 )
Decode protobuf to TableEntry data.
1184 def match_dict( 1185 self, 1186 schema: P4Schema, 1187 *, 1188 wildcard: str | None = None, 1189 ) -> dict[str, str]: 1190 """Format the match fields as a dictionary of strings. 1191 1192 If `wildcard` is None, only include match fields that have values. If 1193 `wildcard` is set, include all field names but replace unset values with 1194 given wildcard value (e.g. "*") 1195 """ 1196 table = schema.tables[self.table_id] 1197 if self.match is not None: 1198 return self.match.format_dict(table, wildcard=wildcard) 1199 return P4TableMatch().format_dict(table, wildcard=wildcard)
Format the match fields as a dictionary of strings.
If wildcard is None, only include match fields that have values. If
wildcard is set, include all field names but replace unset values with
given wildcard value (e.g. "*")
1201 def match_str( 1202 self, 1203 schema: P4Schema, 1204 *, 1205 wildcard: str | None = None, 1206 ) -> str: 1207 "Format the match fields as a human-readable, canonical string." 1208 table = schema.tables[self.table_id] 1209 if self.match is not None: 1210 return self.match.format_str(table, wildcard=wildcard) 1211 return P4TableMatch().format_str(table, wildcard=wildcard)
Format the match fields as a human-readable, canonical string.
1213 def action_str(self, schema: P4Schema) -> str: 1214 "Format the actions as a human-readable, canonical string." 1215 table = schema.tables[self.table_id] 1216 if self.action is None: 1217 return NOACTION_STR 1218 return self.action.format_str(table)
Format the actions as a human-readable, canonical string.
Match is an alias for P4TableMatch.
294class P4TableMatch(dict[str, Any]): 295 """Represents a set of P4Runtime field matches. 296 297 Each match field is stored as a dictionary key, where the key is the name 298 of the match field. The field's value should be appropriate for the type 299 of match (EXACT, LPM, TERNARY, etc.) 300 301 Construct a match similar to a dictionary. 302 303 Example: 304 ``` 305 # Keyword arguments: 306 match = P4TableMatch(ipv4_dst="10.0.0.1") 307 308 # Dictionary argument: 309 match = P4TableMatch({"ipv4_dst": "10.0.0.1"}) 310 311 # List of 2-tuples: 312 match = P4TableMatch([("ipv4_dst", "10.0.0.1")]) 313 ``` 314 315 P4TableMatch is implemented as a subclass of `dict`. It supports all of the 316 standard dictionary methods: 317 ``` 318 match = P4TableMatch() 319 match["ipv4_dst"] = "10.0.0.1" 320 assert len(match) == 1 321 ``` 322 323 Reference "9.1.1 Match Format": 324 Each match field is translated to a FieldMatch Protobuf message by 325 translating the entry key to a `field_id`. The type of match (EXACT, 326 LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the 327 value is converted to the Protobuf representation. 328 329 Supported String Values: 330 ``` 331 EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01" 332 333 LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128", 334 "00:00:00:00:00:01/48" 335 (+ all exact formats are promoted to all-1 masks) 336 337 TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255", 338 "2000::1/&128", "00:00:00:00:00:01/&48" 339 (+ all exact formats are promoted to all-1 masks) 340 (+ all lpm formats are promoted to the specified contiguous mask) 341 342 RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9", 343 "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09" 344 (+ all exact formats are promoted to single-value ranges) 345 346 OPTIONAL: Same as exact format. 347 ``` 348 349 See the `p4values.py` module for all supported value classes. 350 351 TODO: 352 - Change range delimiter to '-' (and drop '-' delimited MAC's). 353 - Consider supporting ternary values with just '/' (and drop support 354 for decimal masks; mask must be hexadecimal number). 355 356 See Also: 357 - P4TableEntry 358 """ 359 360 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 361 "Encode TableMatch data as a list of Protobuf fields." 362 result: list[p4r.FieldMatch] = [] 363 match_fields = table.match_fields 364 365 for key, value in self.items(): 366 try: 367 field = match_fields[key].encode_field(value) 368 if field is not None: 369 result.append(field) 370 except Exception as ex: 371 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 372 373 return result 374 375 @classmethod 376 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 377 "Decode Protobuf fields as TableMatch data." 378 result: dict[str, Any] = {} 379 match_fields = table.match_fields 380 381 for field in msgs: 382 fld = match_fields[field.field_id] 383 result[fld.alias] = fld.decode_field(field) 384 385 return cls(result) 386 387 def format_dict( 388 self, 389 table: P4Table, 390 *, 391 wildcard: str | None = None, 392 ) -> dict[str, str]: 393 """Format the table match fields as a human-readable dictionary. 394 395 The result is a dictionary showing the TableMatch data for fields 396 included in the match. If `wildcard` is specified, all fields defined 397 in P4Info will be included with their value set to the wildcard string. 398 399 Values are formatted using the format/type specified in P4Info. 400 """ 401 result: dict[str, str] = {} 402 403 for fld in table.match_fields: 404 value = self.get(fld.alias, None) 405 if value is not None: 406 result[fld.alias] = fld.format_field(value) 407 elif wildcard is not None: 408 result[fld.alias] = wildcard 409 410 return result 411 412 def format_str( 413 self, 414 table: P4Table, 415 *, 416 wildcard: str | None = None, 417 ) -> str: 418 """Format the table match fields as a human-readable string. 419 420 The result is a string showing the TableMatch data for fields included 421 in the match. If `wildcard` is specified, all fields defined in P4Info 422 will be included with their value set to the wildcard string. 423 424 All fields are formatted as "name=value" and they are delimited by 425 spaces. 426 427 Values are formatted using the format/type specified in P4Info. 428 """ 429 result: list[str] = [] 430 431 for fld in table.match_fields: 432 value = self.get(fld.alias, None) 433 if value is not None: 434 result.append(f"{fld.alias}={fld.format_field(value)}") 435 elif wildcard is not None: 436 result.append(f"{fld.alias}={wildcard}") 437 438 return " ".join(result)
Represents a set of P4Runtime field matches.
Each match field is stored as a dictionary key, where the key is the name of the match field. The field's value should be appropriate for the type of match (EXACT, LPM, TERNARY, etc.)
Construct a match similar to a dictionary.
Example:
# Keyword arguments:
match = P4TableMatch(ipv4_dst="10.0.0.1")
# Dictionary argument:
match = P4TableMatch({"ipv4_dst": "10.0.0.1"})
# List of 2-tuples:
match = P4TableMatch([("ipv4_dst", "10.0.0.1")])
P4TableMatch is implemented as a subclass of dict. It supports all of the
standard dictionary methods:
match = P4TableMatch()
match["ipv4_dst"] = "10.0.0.1"
assert len(match) == 1
Reference "9.1.1 Match Format":
Each match field is translated to a FieldMatch Protobuf message by
translating the entry key to a field_id. The type of match (EXACT,
LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the
value is converted to the Protobuf representation.
Supported String Values:
EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"
LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
"00:00:00:00:00:01/48"
(+ all exact formats are promoted to all-1 masks)
TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
"2000::1/&128", "00:00:00:00:00:01/&48"
(+ all exact formats are promoted to all-1 masks)
(+ all lpm formats are promoted to the specified contiguous mask)
RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
"2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
(+ all exact formats are promoted to single-value ranges)
OPTIONAL: Same as exact format.
See the p4values.py module for all supported value classes.
TODO:
- Change range delimiter to '-' (and drop '-' delimited MAC's).
- Consider supporting ternary values with just '/' (and drop support for decimal masks; mask must be hexadecimal number).
See Also:
- P4TableEntry
360 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 361 "Encode TableMatch data as a list of Protobuf fields." 362 result: list[p4r.FieldMatch] = [] 363 match_fields = table.match_fields 364 365 for key, value in self.items(): 366 try: 367 field = match_fields[key].encode_field(value) 368 if field is not None: 369 result.append(field) 370 except Exception as ex: 371 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 372 373 return result
Encode TableMatch data as a list of Protobuf fields.
375 @classmethod 376 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 377 "Decode Protobuf fields as TableMatch data." 378 result: dict[str, Any] = {} 379 match_fields = table.match_fields 380 381 for field in msgs: 382 fld = match_fields[field.field_id] 383 result[fld.alias] = fld.decode_field(field) 384 385 return cls(result)
Decode Protobuf fields as TableMatch data.
387 def format_dict( 388 self, 389 table: P4Table, 390 *, 391 wildcard: str | None = None, 392 ) -> dict[str, str]: 393 """Format the table match fields as a human-readable dictionary. 394 395 The result is a dictionary showing the TableMatch data for fields 396 included in the match. If `wildcard` is specified, all fields defined 397 in P4Info will be included with their value set to the wildcard string. 398 399 Values are formatted using the format/type specified in P4Info. 400 """ 401 result: dict[str, str] = {} 402 403 for fld in table.match_fields: 404 value = self.get(fld.alias, None) 405 if value is not None: 406 result[fld.alias] = fld.format_field(value) 407 elif wildcard is not None: 408 result[fld.alias] = wildcard 409 410 return result
Format the table match fields as a human-readable dictionary.
The result is a dictionary showing the TableMatch data for fields
included in the match. If wildcard is specified, all fields defined
in P4Info will be included with their value set to the wildcard string.
Values are formatted using the format/type specified in P4Info.
412 def format_str( 413 self, 414 table: P4Table, 415 *, 416 wildcard: str | None = None, 417 ) -> str: 418 """Format the table match fields as a human-readable string. 419 420 The result is a string showing the TableMatch data for fields included 421 in the match. If `wildcard` is specified, all fields defined in P4Info 422 will be included with their value set to the wildcard string. 423 424 All fields are formatted as "name=value" and they are delimited by 425 spaces. 426 427 Values are formatted using the format/type specified in P4Info. 428 """ 429 result: list[str] = [] 430 431 for fld in table.match_fields: 432 value = self.get(fld.alias, None) 433 if value is not None: 434 result.append(f"{fld.alias}={fld.format_field(value)}") 435 elif wildcard is not None: 436 result.append(f"{fld.alias}={wildcard}") 437 438 return " ".join(result)
Format the table match fields as a human-readable string.
The result is a string showing the TableMatch data for fields included
in the match. If wildcard is specified, all fields defined in P4Info
will be included with their value set to the wildcard string.
All fields are formatted as "name=value" and they are delimited by spaces.
Values are formatted using the format/type specified in P4Info.
1913@decodable("value_set_entry") 1914@dataclass(slots=True) 1915class P4ValueSetEntry(_P4ModifyOnly): 1916 "Represents a P4Runtime ValueSetEntry." 1917 1918 value_set_id: str 1919 _: KW_ONLY 1920 members: list[P4ValueSetMember] 1921 1922 def encode(self, schema: P4Schema) -> p4r.Entity: 1923 "Encode P4ValueSetEntry as protobuf." 1924 value_set = schema.value_sets[self.value_set_id] 1925 members = [ 1926 p4r.ValueSetMember(match=member.encode(value_set)) 1927 for member in self.members 1928 ] 1929 1930 return p4r.Entity( 1931 value_set_entry=p4r.ValueSetEntry( 1932 value_set_id=value_set.id, members=members 1933 ) 1934 ) 1935 1936 @classmethod 1937 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1938 "Decode protobuf to P4ValueSetEntry." 1939 entry = msg.value_set_entry 1940 value_set = schema.value_sets[entry.value_set_id] 1941 1942 members = [ 1943 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 1944 ] 1945 1946 return cls(value_set.alias, members=members)
Represents a P4Runtime ValueSetEntry.
1922 def encode(self, schema: P4Schema) -> p4r.Entity: 1923 "Encode P4ValueSetEntry as protobuf." 1924 value_set = schema.value_sets[self.value_set_id] 1925 members = [ 1926 p4r.ValueSetMember(match=member.encode(value_set)) 1927 for member in self.members 1928 ] 1929 1930 return p4r.Entity( 1931 value_set_entry=p4r.ValueSetEntry( 1932 value_set_id=value_set.id, members=members 1933 ) 1934 )
Encode P4ValueSetEntry as protobuf.
1936 @classmethod 1937 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1938 "Decode protobuf to P4ValueSetEntry." 1939 entry = msg.value_set_entry 1940 value_set = schema.value_sets[entry.value_set_id] 1941 1942 members = [ 1943 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 1944 ] 1945 1946 return cls(value_set.alias, members=members)
Decode protobuf to P4ValueSetEntry.
162class P4ActionSelectionMode(_EnumBase): 163 "IntEnum equivalent to `p4r.ActionProfileActionSet.ActionSelectionMode` (1.5.0)." 164 165 DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR = ( 166 p4r.ActionProfileActionSet.ActionSelectionMode.DEFAULT_MODE_DETERMINED_BY_ACTION_SELECTOR 167 ) 168 HASH = p4r.ActionProfileActionSet.ActionSelectionMode.HASH 169 RANDOM = p4r.ActionProfileActionSet.ActionSelectionMode.RANDOM 170 171 def vt(self) -> p4r.ActionProfileActionSet.ActionSelectionMode.ValueType: 172 "Cast `self` to `ValueType`." 173 return cast(p4r.ActionProfileActionSet.ActionSelectionMode.ValueType, self)
IntEnum equivalent to p4r.ActionProfileActionSet.ActionSelectionMode (1.5.0).
176class P4ActionSizeSemantics(_EnumBase): 177 "IntEnum equivalent to `p4r.ActionProfileActionSet.SizeSemantics` (1.5.0)." 178 179 DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR = ( 180 p4r.ActionProfileActionSet.SizeSemantics.DEFAULT_SIZE_DETERMINED_BY_ACTION_SELECTOR 181 ) 182 SUM_OF_WEIGHTS = p4r.ActionProfileActionSet.SizeSemantics.SUM_OF_WEIGHTS 183 SUM_OF_MEMBERS = p4r.ActionProfileActionSet.SizeSemantics.SUM_OF_MEMBERS 184 185 def vt(self) -> p4r.ActionProfileActionSet.SizeSemantics.ValueType: 186 "Cast `self` to `ValueType`." 187 return cast(p4r.ActionProfileActionSet.SizeSemantics.ValueType, self)
IntEnum equivalent to p4r.ActionProfileActionSet.SizeSemantics (1.5.0).
124class P4ConfigAction(_EnumBase): 125 "IntEnum equivalent to `p4r.SetForwardingPipelineConfigRequest.Action`." 126 127 UNSPECIFIED = p4r.SetForwardingPipelineConfigRequest.Action.UNSPECIFIED 128 VERIFY = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY 129 VERIFY_AND_SAVE = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_SAVE 130 VERIFY_AND_COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT 131 COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.COMMIT 132 RECONCILE_AND_COMMIT = ( 133 p4r.SetForwardingPipelineConfigRequest.Action.RECONCILE_AND_COMMIT 134 ) 135 136 def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType: 137 "Cast `self` to `ValueType`." 138 return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)
IntEnum equivalent to p4r.SetForwardingPipelineConfigRequest.Action.
527class P4Schema(_ReprMixin): 528 """Represents a P4Info file and its associated P4 blob (optional). 529 530 ``` 531 p4 = P4Schema(Path("basic.p4info.txtpb")) 532 ``` 533 534 This class parses the P4Info contents to produce an in-memory representation 535 of the tables, actions, types, etc. inside. This in-memory graph of the 536 contents of the P4Info file may be shared when we parse identical 537 P4Info files. The sharing of P4Info data is controlled by the 538 `P4SchemaCache` class. 539 """ 540 541 _p4info: p4i.P4Info | None 542 _p4blob: Path | bytes | SupportsBytes | None 543 _p4defs: _P4Defs # possibly shared in-memory representation 544 _p4cookie: int = 0 545 546 def __init__( 547 self, 548 p4info: p4i.P4Info | Path | None = None, 549 p4blob: Path | bytes | SupportsBytes | None = None, 550 ): 551 self._p4blob = p4blob 552 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 553 p4info, 554 self._p4blob, 555 ) 556 557 @property 558 def exists(self) -> bool: 559 "True if p4info is configured." 560 return self._p4info is not None 561 562 @property 563 def is_authoritative(self) -> bool: 564 "True if both p4info and p4blob are configured." 565 return self._p4info is not None and self._p4blob is not None 566 567 @property 568 def p4info(self) -> p4i.P4Info: 569 "P4Info value." 570 if self._p4info is None: 571 raise ValueError("No P4Info configured.") 572 return self._p4info 573 574 def set_p4info(self, p4info: p4i.P4Info) -> None: 575 "Set P4Info using value returned from switch." 576 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 577 p4info, 578 self._p4blob, 579 ) 580 581 def has_p4info(self, p4info: p4i.P4Info) -> bool: 582 "Return true if the current P4Info equals the given P4Info." 583 if self._p4info is None: 584 return False 585 return self._p4info.SerializeToString( 586 deterministic=True 587 ) == p4info.SerializeToString(deterministic=True) 588 589 @property 590 def p4blob(self) -> bytes: 591 "P4Blob value a.k.a p4_device_config." 592 return _blob_bytes(self._p4blob) 593 594 @property 595 def p4cookie(self) -> int: 596 """Cookie value for p4info and p4blob.""" 597 return self._p4cookie 598 599 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 600 """The forwarding pipeline configuration.""" 601 return p4r.ForwardingPipelineConfig( 602 p4info=self.p4info, 603 p4_device_config=self.p4blob, 604 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 605 ) 606 607 def get_pipeline_info(self) -> str: 608 "Concise string description of the pipeline (suitable for logging)." 609 if self.exists: 610 pipeline = self.name 611 version = self.version 612 arch = self.arch 613 return f"{pipeline=} {version=} {arch=}" 614 615 return "<No pipeline exists>" 616 617 @property 618 def name(self) -> str: 619 "Name from pkg_info." 620 if self._p4info is None: 621 return "" 622 return self._p4info.pkg_info.name 623 624 @property 625 def version(self) -> str: 626 "Version from pkg_info." 627 if self._p4info is None: 628 return "" 629 return self._p4info.pkg_info.version 630 631 @property 632 def arch(self) -> str: 633 "Arch from pkg_info." 634 if self._p4info is None: 635 return "" 636 return self._p4info.pkg_info.arch 637 638 @property 639 def pkg_info(self) -> p4i.PkgInfo: 640 """Protobuf message containing original `PkgInfo` header. 641 642 Use this to access less frequently used fields like `contact`, `url`, 643 and `platform_properties`. 644 """ 645 if self._p4info is None: 646 raise ValueError("P4Info: No pipeline configured") 647 return self._p4info.pkg_info 648 649 @property 650 def tables(self) -> P4EntityMap["P4Table"]: 651 "Collection of P4 tables." 652 return self._p4defs.tables 653 654 @property 655 def actions(self) -> P4EntityMap["P4Action"]: 656 "Collection of P4 actions." 657 return self._p4defs.actions 658 659 @property 660 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 661 "Collection of P4 action profiles." 662 return self._p4defs.action_profiles 663 664 @property 665 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 666 "Collection of P4 controller packet metadata." 667 return self._p4defs.controller_packet_metadata 668 669 @property 670 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 671 "Collection of P4 direct counters." 672 return self._p4defs.direct_counters 673 674 @property 675 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 676 "Collection of P4 direct meters." 677 return self._p4defs.direct_meters 678 679 @property 680 def counters(self) -> P4EntityMap["P4Counter"]: 681 "Collection of P4 counters." 682 return self._p4defs.counters 683 684 @property 685 def meters(self) -> P4EntityMap["P4Meter"]: 686 "Collection of P4 meters." 687 return self._p4defs.meters 688 689 @property 690 def registers(self) -> P4EntityMap["P4Register"]: 691 "Collection of P4 registers." 692 return self._p4defs.registers 693 694 @property 695 def digests(self) -> P4EntityMap["P4Digest"]: 696 "Collection of P4 digests." 697 return self._p4defs.digests 698 699 @property 700 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 701 "Collection of P4 value sets." 702 return self._p4defs.value_sets 703 704 @property 705 def type_info(self) -> "P4TypeInfo": 706 "Type Info object." 707 return self._p4defs.type_info 708 709 @property 710 def externs(self) -> "P4ExternMap": 711 "Collection of P4 extern instances." 712 return self._p4defs.externs 713 714 def __str__(self) -> str: 715 if self._p4info is None: 716 return "<P4Info: No pipeline configured>" 717 return str(P4SchemaDescription(self))
Represents a P4Info file and its associated P4 blob (optional).
p4 = P4Schema(Path("basic.p4info.txtpb"))
This class parses the P4Info contents to produce an in-memory representation
of the tables, actions, types, etc. inside. This in-memory graph of the
contents of the P4Info file may be shared when we parse identical
P4Info files. The sharing of P4Info data is controlled by the
P4SchemaCache class.
557 @property 558 def exists(self) -> bool: 559 "True if p4info is configured." 560 return self._p4info is not None
True if p4info is configured.
567 @property 568 def p4info(self) -> p4i.P4Info: 569 "P4Info value." 570 if self._p4info is None: 571 raise ValueError("No P4Info configured.") 572 return self._p4info
P4Info value.
574 def set_p4info(self, p4info: p4i.P4Info) -> None: 575 "Set P4Info using value returned from switch." 576 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 577 p4info, 578 self._p4blob, 579 )
Set P4Info using value returned from switch.
581 def has_p4info(self, p4info: p4i.P4Info) -> bool: 582 "Return true if the current P4Info equals the given P4Info." 583 if self._p4info is None: 584 return False 585 return self._p4info.SerializeToString( 586 deterministic=True 587 ) == p4info.SerializeToString(deterministic=True)
Return true if the current P4Info equals the given P4Info.
589 @property 590 def p4blob(self) -> bytes: 591 "P4Blob value a.k.a p4_device_config." 592 return _blob_bytes(self._p4blob)
P4Blob value a.k.a p4_device_config.
599 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 600 """The forwarding pipeline configuration.""" 601 return p4r.ForwardingPipelineConfig( 602 p4info=self.p4info, 603 p4_device_config=self.p4blob, 604 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 605 )
The forwarding pipeline configuration.
607 def get_pipeline_info(self) -> str: 608 "Concise string description of the pipeline (suitable for logging)." 609 if self.exists: 610 pipeline = self.name 611 version = self.version 612 arch = self.arch 613 return f"{pipeline=} {version=} {arch=}" 614 615 return "<No pipeline exists>"
Concise string description of the pipeline (suitable for logging).
617 @property 618 def name(self) -> str: 619 "Name from pkg_info." 620 if self._p4info is None: 621 return "" 622 return self._p4info.pkg_info.name
Name from pkg_info.
624 @property 625 def version(self) -> str: 626 "Version from pkg_info." 627 if self._p4info is None: 628 return "" 629 return self._p4info.pkg_info.version
Version from pkg_info.
631 @property 632 def arch(self) -> str: 633 "Arch from pkg_info." 634 if self._p4info is None: 635 return "" 636 return self._p4info.pkg_info.arch
Arch from pkg_info.
638 @property 639 def pkg_info(self) -> p4i.PkgInfo: 640 """Protobuf message containing original `PkgInfo` header. 641 642 Use this to access less frequently used fields like `contact`, `url`, 643 and `platform_properties`. 644 """ 645 if self._p4info is None: 646 raise ValueError("P4Info: No pipeline configured") 647 return self._p4info.pkg_info
Protobuf message containing original PkgInfo header.
Use this to access less frequently used fields like contact, url,
and platform_properties.
649 @property 650 def tables(self) -> P4EntityMap["P4Table"]: 651 "Collection of P4 tables." 652 return self._p4defs.tables
Collection of P4 tables.
654 @property 655 def actions(self) -> P4EntityMap["P4Action"]: 656 "Collection of P4 actions." 657 return self._p4defs.actions
Collection of P4 actions.
659 @property 660 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 661 "Collection of P4 action profiles." 662 return self._p4defs.action_profiles
Collection of P4 action profiles.
664 @property 665 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 666 "Collection of P4 controller packet metadata." 667 return self._p4defs.controller_packet_metadata
Collection of P4 controller packet metadata.
669 @property 670 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 671 "Collection of P4 direct counters." 672 return self._p4defs.direct_counters
Collection of P4 direct counters.
674 @property 675 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 676 "Collection of P4 direct meters." 677 return self._p4defs.direct_meters
Collection of P4 direct meters.
679 @property 680 def counters(self) -> P4EntityMap["P4Counter"]: 681 "Collection of P4 counters." 682 return self._p4defs.counters
Collection of P4 counters.
684 @property 685 def meters(self) -> P4EntityMap["P4Meter"]: 686 "Collection of P4 meters." 687 return self._p4defs.meters
Collection of P4 meters.
689 @property 690 def registers(self) -> P4EntityMap["P4Register"]: 691 "Collection of P4 registers." 692 return self._p4defs.registers
Collection of P4 registers.
694 @property 695 def digests(self) -> P4EntityMap["P4Digest"]: 696 "Collection of P4 digests." 697 return self._p4defs.digests
Collection of P4 digests.
699 @property 700 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 701 "Collection of P4 value sets." 702 return self._p4defs.value_sets
Collection of P4 value sets.
154@final 155class Switch: 156 """Represents a P4Runtime Switch. 157 158 A `Switch` is constructed with a `name`, `address` and an optional 159 `SwitchOptions` configuration. 160 161 The `name` is up to the user but should uniquely identify the switch. 162 163 The `address` identifies the target endpoint of the GRPC channel. It should 164 have the format `<ADDRESS>:<PORT>` where `<ADDRESS>` can be a domain name, 165 IPv4 address, or IPv6 address in square brackets, and `<PORT>` is the TCP 166 port number. 167 168 The `options` is a `SwitchOptions` object that specifies how the `Switch` 169 will behave. 170 171 ``` 172 opts = SwitchOptions(p4info=..., p4blob=...) 173 sw1 = Switch('sw1', '10.0.0.1:50000', opts) 174 ``` 175 176 The `stash` is an optional dictionary for storing arbitrary per-switch 177 information. The dictionary keys are strings. You can use this to store 178 anything that you need to manage the switch. 179 180 Each switch object has an event emitter `ee`. Use the EventEmitter to listen 181 for port change events like PORT_UP and PORT_DOWN. See the `SwitchEvent` 182 class for a list of other switch events. 183 """ 184 185 _name: str 186 _address: str 187 _options: SwitchOptions 188 _stash: dict[str, Any] 189 _ee: "SwitchEmitter" 190 _p4client: P4Client | None 191 _p4schema: P4Schema 192 _tasks: "SwitchTasks | None" 193 _packet_queues: list[tuple[Callable[[bytes], bool], Queue[p4entity.P4PacketIn]]] 194 _digest_queues: dict[str, Queue[p4entity.P4DigestList]] 195 _timeout_queue: Queue[p4entity.P4IdleTimeoutNotification] | None 196 _arbitrator: "Arbitrator" 197 _gnmi_client: GNMIClient | None 198 _ports: SwitchPortList 199 _is_channel_up: bool = False 200 _api_version: ApiVersion = ApiVersion(1, 0, 0, "") 201 _control_task: asyncio.Task[Any] | None = None 202 203 def __init__( 204 self, 205 name: str, 206 address: str, 207 options: SwitchOptions | None = None, 208 *, 209 stash: dict[str, Any] | None = None, 210 ) -> None: 211 """Initialize switch with name, address and options. 212 213 Args: 214 name: A human-readable name to uniquely identify the switch. 215 address: The target address of the P4Runtime GRPC channel. 216 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or 217 IP address, and `<PORT>` is the TCP port number. 218 options: Configuration options for the switch. 219 stash: Optional user-controlled dictionary. 220 Used to store information that user code needs to access or share. 221 """ 222 if options is None: 223 options = SwitchOptions() 224 225 self._name = name 226 self._address = address 227 self._options = options 228 self._stash = stash or {} 229 self._ee = SwitchEmitter(self) 230 self._p4client = None 231 self._p4schema = P4Schema(options.p4info, options.p4blob) 232 self._tasks = None 233 self._packet_queues = [] 234 self._digest_queues = {} 235 self._timeout_queue = None 236 self._arbitrator = Arbitrator( 237 options.initial_election_id, options.role_name, options.role_config 238 ) 239 self._gnmi_client = None 240 self._ports = SwitchPortList() 241 242 @property 243 def name(self) -> str: 244 "Name of the switch." 245 return self._name 246 247 @property 248 def address(self) -> str: 249 """Address of the switch formatted as a GRPC target. 250 251 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP 252 address, and `<PORT>` is the TCP port number. 253 """ 254 return self._address 255 256 @property 257 def options(self) -> SwitchOptions: 258 "Switch options." 259 return self._options 260 261 @options.setter 262 def options(self, opts: SwitchOptions) -> None: 263 "Set switch options to a new value." 264 if self._p4client is not None: 265 raise RuntimeError("Cannot change switch options while client is open.") 266 267 self._options = opts 268 self._p4schema = P4Schema(opts.p4info, opts.p4blob) 269 self._arbitrator = Arbitrator( 270 opts.initial_election_id, opts.role_name, opts.role_config 271 ) 272 273 @property 274 def stash(self) -> dict[str, Any]: 275 "Switch stash. Used to store per-switch data for any purpose." 276 return self._stash 277 278 @property 279 def ee(self) -> "SwitchEmitter": 280 "Switch event emitter. See `SwitchEvent` for more details on events." 281 return self._ee 282 283 @property 284 def device_id(self) -> int: 285 "Switch's device ID." 286 return self._options.device_id 287 288 @property 289 def is_up(self) -> bool: 290 "True if switch is UP." 291 return self._is_channel_up 292 293 @property 294 def is_primary(self) -> bool: 295 "True if switch is primary." 296 return self._arbitrator.is_primary 297 298 @property 299 def primary_id(self) -> int: 300 "Election ID of switch that is currently primary." 301 return self._arbitrator.primary_id 302 303 @property 304 def election_id(self) -> int: 305 "Switch's current election ID." 306 return self._arbitrator.election_id 307 308 @property 309 def role_name(self) -> str: 310 "Switch's current role name." 311 return self._arbitrator.role_name 312 313 @property 314 def p4info(self) -> P4Schema: 315 "Switch's P4 schema." 316 return self._p4schema 317 318 @property 319 def gnmi_client(self) -> GNMIClient | None: 320 "Switch's gNMI client." 321 return self._gnmi_client 322 323 @property 324 def ports(self) -> SwitchPortList: 325 "Switch's list of interfaces." 326 return self._ports 327 328 @property 329 def api_version(self) -> ApiVersion: 330 "P4Runtime protocol version." 331 return self._api_version 332 333 @overload 334 async def read( 335 self, 336 entities: _ET, 337 ) -> AsyncGenerator[_ET, None]: 338 "Overload for read of a single P4Entity subtype." 339 ... # pragma: no cover 340 341 @overload 342 async def read( 343 self, 344 entities: Iterable[_ET], 345 ) -> AsyncGenerator[_ET, None]: 346 "Overload for read of an iterable of the same P4Entity subtype." 347 ... # pragma: no cover 348 349 @overload 350 async def read( 351 self, 352 entities: Iterable[p4entity.P4EntityList], 353 ) -> AsyncGenerator[p4entity.P4Entity, None]: 354 "Most general overload: we can't determine the return type exactly." 355 ... # pragma: no cover 356 357 async def read( 358 self, 359 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 360 ) -> AsyncGenerator[p4entity.P4Entity, None]: 361 "Async iterator that reads entities from the switch." 362 assert self._p4client is not None 363 364 if not entities: 365 return 366 367 if isinstance(entities, p4entity.P4Entity): 368 entities = [entities] 369 370 request = p4r.ReadRequest( 371 device_id=self.device_id, 372 entities=p4entity.encode_entities(entities, self.p4info), 373 ) 374 375 async for reply in self._p4client.request_iter(request): 376 for ent in reply.entities: 377 yield p4entity.decode_entity(ent, self.p4info) 378 379 async def read_packets( 380 self, 381 *, 382 queue_size: int = _DEFAULT_QUEUE_SIZE, 383 eth_types: Iterable[int] | None = None, 384 ) -> AsyncIterator["p4entity.P4PacketIn"]: 385 "Async iterator for incoming packets (P4PacketIn)." 386 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 387 388 if eth_types is None: 389 390 def _pkt_filter(_payload: bytes) -> bool: 391 return True 392 393 else: 394 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 395 396 def _pkt_filter(_payload: bytes) -> bool: 397 return _payload[12:14] in _filter 398 399 queue = Queue[p4entity.P4PacketIn](queue_size) 400 queue_filter = (_pkt_filter, queue) 401 self._packet_queues.append(queue_filter) 402 403 try: 404 while True: 405 yield await queue.get() 406 finally: 407 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 408 self._packet_queues.remove(queue_filter) 409 410 async def read_digests( 411 self, 412 digest_id: str, 413 *, 414 queue_size: int = _DEFAULT_QUEUE_SIZE, 415 ) -> AsyncIterator["p4entity.P4DigestList"]: 416 "Async iterator for incoming digest lists (P4DigestList)." 417 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 418 419 if digest_id in self._digest_queues: 420 raise ValueError(f"queue for digest_id {digest_id!r} already open") 421 422 queue = Queue[p4entity.P4DigestList](queue_size) 423 self._digest_queues[digest_id] = queue 424 try: 425 while True: 426 yield await queue.get() 427 finally: 428 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 429 del self._digest_queues[digest_id] 430 431 async def read_idle_timeouts( 432 self, 433 *, 434 queue_size: int = _DEFAULT_QUEUE_SIZE, 435 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 436 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 437 LOGGER.debug("read_idle_timeouts: opening queue") 438 439 if self._timeout_queue is not None: 440 raise ValueError("timeout queue already open") 441 442 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 443 self._timeout_queue = queue 444 try: 445 while True: 446 yield await queue.get() 447 finally: 448 LOGGER.debug("read_idle_timeouts: closing queue") 449 self._timeout_queue = None 450 451 async def write( 452 self, 453 entities: Iterable[p4entity.P4UpdateList], 454 *, 455 strict: bool = True, 456 warn_only: bool = False, 457 ) -> None: 458 """Write updates and stream messages to the switch. 459 460 If `strict` is False, MODIFY and DELETE operations will NOT raise an 461 error if the entity does not exist (NOT_FOUND). 462 463 If `warn_only` is True, no operations will raise an error. Instead, 464 the exception will be logged as a WARNING and the method will return 465 normally. 466 """ 467 assert self._p4client is not None 468 469 if not entities: 470 return 471 472 msgs = p4entity.encode_updates(entities, self.p4info) 473 474 updates: list[p4r.Update] = [] 475 for msg in msgs: 476 if isinstance(msg, p4r.StreamMessageRequest): 477 # StreamMessageRequests are transmitted immediately. 478 # TODO: Understand what happens with backpressure? 479 await self._p4client.send(msg) 480 else: 481 updates.append(msg) 482 483 if updates: 484 await self._write_request(updates, strict, warn_only) 485 486 async def insert( 487 self, 488 entities: Iterable[p4entity.P4EntityList], 489 *, 490 warn_only: bool = False, 491 ) -> None: 492 """Insert the specified entities. 493 494 If `warn_only` is True, errors will be logged as warnings instead of 495 raising an exception. 496 """ 497 if entities: 498 await self._write_request( 499 [ 500 p4r.Update(type=p4r.Update.INSERT, entity=ent) 501 for ent in p4entity.encode_entities(entities, self.p4info) 502 ], 503 True, 504 warn_only, 505 ) 506 507 async def modify( 508 self, 509 entities: Iterable[p4entity.P4EntityList], 510 *, 511 strict: bool = True, 512 warn_only: bool = False, 513 ) -> None: 514 """Modify the specified entities. 515 516 If `strict` is False, NOT_FOUND errors will be ignored. 517 518 If `warn_only` is True, errors will be logged as warnings instead of 519 raising an exception. 520 """ 521 if entities: 522 await self._write_request( 523 [ 524 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 525 for ent in p4entity.encode_entities(entities, self.p4info) 526 ], 527 strict, 528 warn_only, 529 ) 530 531 async def delete( 532 self, 533 entities: Iterable[p4entity.P4EntityList], 534 *, 535 strict: bool = True, 536 warn_only: bool = False, 537 ) -> None: 538 """Delete the specified entities. 539 540 If `strict` is False, NOT_FOUND errors will be ignored. 541 542 If `warn_only` is True, errors will be logged as warnings instead of 543 raising an exception. 544 """ 545 if entities: 546 await self._write_request( 547 [ 548 p4r.Update(type=p4r.Update.DELETE, entity=ent) 549 for ent in p4entity.encode_entities(entities, self.p4info) 550 ], 551 strict, 552 warn_only, 553 ) 554 555 async def delete_all(self) -> None: 556 """Delete all entities if no parameter is passed. Otherwise, delete 557 items that match `entities`. 558 559 This method does not attempt to delete entries in const tables. 560 561 TODO: This method does not affect indirect counters, meters or 562 value_sets. 563 """ 564 await self.delete_many( 565 [ 566 p4entity.P4TableEntry(), 567 p4entity.P4MulticastGroupEntry(), 568 p4entity.P4CloneSessionEntry(), 569 ] 570 ) 571 572 # Reset all default table entries. 573 default_entries = [ 574 p4entity.P4TableEntry(table.alias, is_default_action=True) 575 for table in self.p4info.tables 576 if table.const_default_action is None and table.action_profile is None 577 ] 578 if default_entries: 579 await self.modify(default_entries) 580 581 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 582 # We do this after deleting the P4TableEntry's in case a client is using 583 # "one-shot" references; these are incompatible with separate 584 # action profiles. 585 await self.delete_many( 586 [ 587 p4entity.P4ActionProfileGroup(), 588 p4entity.P4ActionProfileMember(), 589 ] 590 ) 591 592 # Delete DigestEntry separately. Wildcard reads are not supported. 593 digest_entries = [ 594 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 595 ] 596 if digest_entries: 597 await self.delete(digest_entries, strict=False) 598 599 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 600 """Delete entities that match a wildcard read. 601 602 This method always skips over entries in const tables. It is an error 603 to attempt to delete those. 604 """ 605 assert self._p4client is not None 606 607 request = p4r.ReadRequest( 608 device_id=self.device_id, 609 entities=p4entity.encode_entities(entities, self.p4info), 610 ) 611 612 # Compute set of all const table ID's (may be empty). 613 to_skip = {table.id for table in self.p4info.tables if table.is_const} 614 615 async for reply in self._p4client.request_iter(request): 616 if reply.entities: 617 if to_skip: 618 await self.delete( 619 reply 620 for reply in reply.entities 621 if reply.HasField("table_entry") 622 and reply.table_entry.table_id not in to_skip 623 ) 624 else: 625 await self.delete(reply.entities) 626 627 async def run(self) -> None: 628 "Run the switch's lifecycle repeatedly." 629 assert self._p4client is None 630 assert self._tasks is None 631 632 self._tasks = SwitchTasks(self._options.fail_fast) 633 self._p4client = P4Client(self._address, self._options.channel_credentials) 634 self._switch_start() 635 636 try: 637 while True: 638 # If the switch fails and restarts too quickly, slow it down. 639 async with _throttle_failure(): 640 self.create_task(self._run(), background=True) 641 await self._tasks.wait() 642 self._arbitrator.reset() 643 644 finally: 645 self._p4client = None 646 self._tasks = None 647 self._switch_stop() 648 649 def create_task( 650 self, 651 coro: Coroutine[Any, Any, _T], 652 *, 653 background: bool = False, 654 name: str | None = None, 655 ) -> asyncio.Task[_T]: 656 "Create an asyncio task tied to the Switch's lifecycle." 657 assert self._tasks is not None 658 659 return self._tasks.create_task( 660 coro, 661 switch=self, 662 background=background, 663 name=name, 664 ) 665 666 async def _run(self): 667 "Main Switch task runs the stream." 668 assert not self._is_channel_up 669 assert self._p4client is not None 670 671 try: 672 await self._p4client.open( 673 schema=self.p4info, 674 complete_request=self._arbitrator.complete_request, 675 ) 676 await self._arbitrator.handshake(self) 677 await self._fetch_capabilities() 678 await self._start_gnmi() 679 self._channel_up() 680 681 # Receive messages from the stream until it closes. 682 await self._receive_until_closed() 683 684 finally: 685 await self._stop_gnmi() 686 await self._p4client.close() 687 self._channel_down() 688 689 async def _receive_until_closed(self): 690 "Receive messages from stream until EOF." 691 assert self._p4client is not None 692 693 client = self._p4client 694 695 while True: 696 try: 697 msg = await client.receive() 698 except P4ClientError as ex: 699 if not ex.is_election_id_used: 700 raise 701 # Handle "election ID in use" error. 702 await self._arbitrator.handshake(self, conflict=True) 703 else: 704 await self._handle_stream_message(msg) 705 706 async def _handle_stream_message(self, msg: p4r.StreamMessageResponse): 707 "Handle a P4Runtime StreamMessageResponse." 708 match msg.WhichOneof("update"): 709 case "packet": 710 self._stream_packet_message(msg) 711 case "digest": 712 self._stream_digest_message(msg) 713 case "idle_timeout_notification": 714 self._stream_timeout_message(msg) 715 case "arbitration": 716 await self._arbitrator.update(self, msg.arbitration) 717 case "error": 718 self._stream_error_message(msg) 719 case other: 720 LOGGER.error("_handle_stream_message: unknown update %r", other) 721 722 async def __aenter__(self) -> Self: 723 "Similar to run() but provides a one-time context manager interface." 724 assert self._p4client is None 725 assert self._tasks is None 726 727 self._tasks = SwitchTasks(self._options.fail_fast) 728 self._p4client = P4Client( 729 self._address, 730 self._options.channel_credentials, 731 wait_for_ready=False, 732 ) 733 self._switch_start() 734 735 try: 736 # Start the switch's `_run` task in the background. Then, wait for 737 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 738 # cannot connect or fails in some other way, it will finish before 739 # the `ready` future. We need to handle the error in this case. 740 741 run = self.create_task(self._run(), background=True) 742 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 743 done, _ = await asyncio.wait( 744 [run, ready], return_when=asyncio.FIRST_COMPLETED 745 ) 746 if run in done: 747 await run 748 749 except BaseException: 750 await self.__aexit__(None, None, None) 751 raise 752 753 return self 754 755 async def __aexit__( 756 self, 757 _exc_type: type[BaseException] | None, 758 _exc_val: BaseException | None, 759 _exc_tb: TracebackType | None, 760 ) -> bool | None: 761 "Similar to run() but provides a one-time context manager interface." 762 assert self._p4client is not None 763 assert self._tasks is not None 764 765 self._tasks.cancel_all() 766 await self._tasks.wait() 767 self._arbitrator.reset() 768 self._p4client = None 769 self._tasks = None 770 self._switch_stop() 771 772 def _switch_start(self): 773 "Called when switch starts its run() cycle." 774 assert not self._is_channel_up 775 776 LOGGER.info( 777 "Switch start (name=%r, address=%r, device_id=%r, role_name=%r, initial_election_id=%r)", 778 self.name, 779 self.address, 780 self.device_id, 781 self.role_name, 782 self.options.initial_election_id, 783 ) 784 self.ee.emit(SwitchEvent.SWITCH_START) 785 786 def _switch_stop(self): 787 "Called when switch stops its run() cycle." 788 assert not self._is_channel_up 789 790 LOGGER.info( 791 "Switch stop (name=%r, address=%r, device_id=%r, role_name=%r)", 792 self.name, 793 self.address, 794 self.device_id, 795 self.role_name, 796 ) 797 self.ee.emit(SwitchEvent.SWITCH_STOP) 798 799 def _channel_up(self): 800 "Called when P4Runtime channel is UP." 801 assert not self._is_channel_up 802 803 ports = " ".join(f"({port.id}){port.name}" for port in self.ports) 804 LOGGER.info( 805 "Channel up (is_primary=%r, role_name=%r, p4r=%s): %s", 806 self.is_primary, 807 self.role_name, 808 self.api_version, 809 ports, 810 ) 811 self._is_channel_up = True 812 self.create_task(self._ready()) 813 814 self.ee.emit(SwitchEvent.CHANNEL_UP, self) 815 816 def _channel_down(self): 817 "Called when P4Runtime channel is DOWN." 818 if not self._is_channel_up: 819 return # do nothing! 820 821 LOGGER.info( 822 "Channel down (is_primary=%r, role_name=%r)", 823 self.is_primary, 824 self.role_name, 825 ) 826 self._is_channel_up = False 827 828 self.ee.emit(SwitchEvent.CHANNEL_DOWN, self) 829 830 def _become_primary(self): 831 "Called when a P4Runtime backup channel becomes the primary." 832 assert self._tasks is not None 833 834 LOGGER.info( 835 "Become primary (is_primary=%r, role_name=%r)", 836 self.is_primary, 837 self.role_name, 838 ) 839 840 self._tasks.cancel_primary() 841 self.create_task(self._ready()) 842 843 self.ee.emit(SwitchEvent.BECOME_PRIMARY, self) 844 845 def _become_backup(self): 846 "Called when a P4Runtime primary channel becomes a backup." 847 assert self._tasks is not None 848 849 LOGGER.info( 850 "Become backup (is_primary=%r, role_name=%r)", 851 self.is_primary, 852 self.role_name, 853 ) 854 855 self._tasks.cancel_primary() 856 self.create_task(self._ready()) 857 858 self.ee.emit(SwitchEvent.BECOME_BACKUP, self) 859 860 def _channel_ready(self): 861 "Called when a P4Runtime channel is READY." 862 LOGGER.info( 863 "Channel ready (is_primary=%r, role_name=%r): %s", 864 self.is_primary, 865 self.role_name, 866 self.p4info.get_pipeline_info(), 867 ) 868 869 if self._options.ready_handler: 870 self.create_task(self._options.ready_handler(self)) 871 872 self.ee.emit(SwitchEvent.CHANNEL_READY, self) 873 874 def _stream_packet_message(self, msg: p4r.StreamMessageResponse): 875 "Called when a P4Runtime packet-in response is received." 876 packet = p4entity.decode_stream(msg, self.p4info) 877 878 was_queued = False 879 for pkt_filter, queue in self._packet_queues: 880 if not queue.full() and pkt_filter(packet.payload): 881 queue.put_nowait(packet) 882 was_queued = True 883 884 if not was_queued: 885 LOGGER.warning("packet ignored: %r", packet) 886 887 def _stream_digest_message(self, msg: p4r.StreamMessageResponse): 888 "Called when a P4Runtime digest response is received." 889 try: 890 # Decode the digest list message. 891 digest: p4entity.P4DigestList = p4entity.decode_stream(msg, self.p4info) 892 except ValueError as ex: 893 # It's possible to receive a digest for a different P4Info file, or 894 # even before a P4Info is fetched from the switch. 895 LOGGER.warning("digest decode failed: %s", ex) 896 else: 897 # Place the decoded digest list in a queue, if one is waiting. 898 queue = self._digest_queues.get(digest.digest_id) 899 if queue is not None and not queue.full(): 900 queue.put_nowait(digest) 901 else: 902 LOGGER.warning("digest ignored: %r", digest) 903 904 def _stream_timeout_message(self, msg: p4r.StreamMessageResponse): 905 "Called when a P4Runtime timeout notification is received." 906 timeout: p4entity.P4IdleTimeoutNotification = p4entity.decode_stream( 907 msg, self.p4info 908 ) 909 queue = self._timeout_queue 910 911 if queue is not None and not queue.full(): 912 queue.put_nowait(timeout) 913 else: 914 LOGGER.warning("timeout ignored: %r", timeout) 915 916 def _stream_error_message(self, msg: p4r.StreamMessageResponse): 917 "Called when a P4Runtime stream error response is received." 918 assert self._p4client is not None 919 920 # Log the message at the ERROR level. 921 pbutil.log_msg(self._p4client.channel, msg, self.p4info, level=logging.ERROR) 922 923 self.ee.emit(SwitchEvent.STREAM_ERROR, self, msg) 924 925 async def _ready(self): 926 "Prepare the pipeline." 927 if self.p4info.is_authoritative and self.is_primary: 928 await self._set_pipeline() 929 else: 930 await self._get_pipeline() 931 932 self._channel_ready() 933 934 async def _get_pipeline(self): 935 "Get the switch's P4Info." 936 has_pipeline = False 937 938 try: 939 reply = await self._get_pipeline_config_request( 940 response_type=P4ConfigResponseType.P4INFO_AND_COOKIE 941 ) 942 943 if reply.config.HasField("p4info"): 944 has_pipeline = True 945 p4info = reply.config.p4info 946 if not self.p4info.exists: 947 # If we don't have P4Info yet, set it. 948 self.p4info.set_p4info(p4info) 949 elif not self.p4info.has_p4info(p4info): 950 # If P4Info is not identical, log a warning message. 951 LOGGER.warning("Retrieved P4Info is different than expected!") 952 953 except P4ClientError as ex: 954 if not ex.is_pipeline_missing: 955 raise 956 957 if not has_pipeline and self.p4info.exists: 958 LOGGER.warning("Forwarding pipeline is not configured") 959 960 async def _set_pipeline(self): 961 """Set up the pipeline. 962 963 If `p4force` is false (the default), we first retrieve the cookie for 964 the current pipeline and see if it matches the new pipeline's cookie. 965 If the cookies match, we are done; there is no need to set the pipeline. 966 967 If `p4force` is true, we always load the new pipeline. 968 """ 969 cookie = -1 970 try: 971 if not self.options.p4force: 972 reply = await self._get_pipeline_config_request() 973 if reply.config.HasField("cookie"): 974 cookie = reply.config.cookie.cookie 975 976 except P4ClientError as ex: 977 if not ex.is_pipeline_missing: 978 raise 979 980 if cookie != self.p4info.p4cookie: 981 LOGGER.debug( 982 "cookie %#x does not match expected %#x", cookie, self.p4info.p4cookie 983 ) 984 await self._set_pipeline_config_request( 985 config=self.p4info.get_pipeline_config() 986 ) 987 LOGGER.info("Pipeline installed: %s", self.p4info.get_pipeline_info()) 988 989 async def _get_pipeline_config_request( 990 self, 991 *, 992 response_type: P4ConfigResponseType = P4ConfigResponseType.COOKIE_ONLY, 993 ) -> p4r.GetForwardingPipelineConfigResponse: 994 "Send a GetForwardingPipelineConfigRequest and await the response." 995 assert self._p4client is not None 996 997 return await self._p4client.request( 998 p4r.GetForwardingPipelineConfigRequest( 999 device_id=self.device_id, 1000 response_type=response_type.vt(), 1001 ) 1002 ) 1003 1004 async def _set_pipeline_config_request( 1005 self, 1006 *, 1007 action: P4ConfigAction = P4ConfigAction.VERIFY_AND_COMMIT, 1008 config: p4r.ForwardingPipelineConfig, 1009 ) -> p4r.SetForwardingPipelineConfigResponse: 1010 "Send a SetForwardingPipelineConfigRequest and await the response." 1011 assert self._p4client is not None 1012 1013 return await self._p4client.request( 1014 p4r.SetForwardingPipelineConfigRequest( 1015 device_id=self.device_id, 1016 action=action.vt(), 1017 config=config, 1018 ) 1019 ) 1020 1021 async def _write_request( 1022 self, 1023 updates: list[p4r.Update], 1024 strict: bool, 1025 warn_only: bool, 1026 ): 1027 "Send a P4Runtime WriteRequest." 1028 assert self._p4client is not None 1029 1030 try: 1031 await self._p4client.request( 1032 p4r.WriteRequest( 1033 device_id=self.device_id, 1034 updates=updates, 1035 ) 1036 ) 1037 except P4ClientError as ex: 1038 if strict or not ex.is_not_found_only: 1039 if warn_only: 1040 LOGGER.warning( 1041 "WriteRequest with `warn_only=True` failed", 1042 exc_info=True, 1043 ) 1044 else: 1045 raise 1046 1047 assert (not strict and ex.is_not_found_only) or warn_only 1048 1049 async def _fetch_capabilities(self): 1050 "Check the P4Runtime protocol version supported by the other end." 1051 assert self._p4client is not None 1052 1053 try: 1054 reply = await self._p4client.request(p4r.CapabilitiesRequest()) 1055 self._api_version = ApiVersion.parse(reply.p4runtime_api_version) 1056 1057 # TODO: Do something with the `experimental` option added to 1058 # CapabilitiesResponse in P4Runtime 1.5.0. 1059 1060 except P4ClientError as ex: 1061 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1062 raise 1063 LOGGER.warning("CapabilitiesRequest is not implemented") 1064 1065 async def _start_gnmi(self): 1066 "Start the associated gNMI client." 1067 assert self._gnmi_client is None 1068 assert self._p4client is not None 1069 1070 self._gnmi_client = GNMIClient(self._address, self._options.channel_credentials) 1071 await self._gnmi_client.open(channel=self._p4client.channel) 1072 1073 try: 1074 await self._ports.subscribe(self._gnmi_client) 1075 if self._ports: 1076 self.create_task(self._ports.listen(), background=True, name="_ports") 1077 1078 except GNMIClientError as ex: 1079 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1080 raise 1081 LOGGER.warning("gNMI is not implemented") 1082 await self._gnmi_client.close() 1083 self._gnmi_client = None 1084 1085 async def _stop_gnmi(self): 1086 "Stop the associated gNMI client." 1087 if self._gnmi_client is not None: 1088 self._ports.close() 1089 await self._gnmi_client.close() 1090 self._gnmi_client = None 1091 1092 def __repr__(self) -> str: 1093 "Return string representation of switch." 1094 return f"Switch(name={self._name!r}, address={self._address!r})"
Represents a P4Runtime Switch.
A Switch is constructed with a name, address and an optional
SwitchOptions configuration.
The name is up to the user but should uniquely identify the switch.
The address identifies the target endpoint of the GRPC channel. It should
have the format <ADDRESS>:<PORT> where <ADDRESS> can be a domain name,
IPv4 address, or IPv6 address in square brackets, and <PORT> is the TCP
port number.
The options is a SwitchOptions object that specifies how the Switch
will behave.
opts = SwitchOptions(p4info=..., p4blob=...)
sw1 = Switch('sw1', '10.0.0.1:50000', opts)
The stash is an optional dictionary for storing arbitrary per-switch
information. The dictionary keys are strings. You can use this to store
anything that you need to manage the switch.
Each switch object has an event emitter ee. Use the EventEmitter to listen
for port change events like PORT_UP and PORT_DOWN. See the SwitchEvent
class for a list of other switch events.
203 def __init__( 204 self, 205 name: str, 206 address: str, 207 options: SwitchOptions | None = None, 208 *, 209 stash: dict[str, Any] | None = None, 210 ) -> None: 211 """Initialize switch with name, address and options. 212 213 Args: 214 name: A human-readable name to uniquely identify the switch. 215 address: The target address of the P4Runtime GRPC channel. 216 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or 217 IP address, and `<PORT>` is the TCP port number. 218 options: Configuration options for the switch. 219 stash: Optional user-controlled dictionary. 220 Used to store information that user code needs to access or share. 221 """ 222 if options is None: 223 options = SwitchOptions() 224 225 self._name = name 226 self._address = address 227 self._options = options 228 self._stash = stash or {} 229 self._ee = SwitchEmitter(self) 230 self._p4client = None 231 self._p4schema = P4Schema(options.p4info, options.p4blob) 232 self._tasks = None 233 self._packet_queues = [] 234 self._digest_queues = {} 235 self._timeout_queue = None 236 self._arbitrator = Arbitrator( 237 options.initial_election_id, options.role_name, options.role_config 238 ) 239 self._gnmi_client = None 240 self._ports = SwitchPortList()
Initialize switch with name, address and options.
Arguments:
- name: A human-readable name to uniquely identify the switch.
- address: The target address of the P4Runtime GRPC channel.
Format is
<ADDRESS>:<PORT>where<ADDRESS>is a DNS name or IP address, and<PORT>is the TCP port number. - options: Configuration options for the switch.
- stash: Optional user-controlled dictionary. Used to store information that user code needs to access or share.
247 @property 248 def address(self) -> str: 249 """Address of the switch formatted as a GRPC target. 250 251 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP 252 address, and `<PORT>` is the TCP port number. 253 """ 254 return self._address
Address of the switch formatted as a GRPC target.
Format is <ADDRESS>:<PORT> where <ADDRESS> is a DNS name or IP
address, and <PORT> is the TCP port number.
256 @property 257 def options(self) -> SwitchOptions: 258 "Switch options." 259 return self._options
Switch options.
273 @property 274 def stash(self) -> dict[str, Any]: 275 "Switch stash. Used to store per-switch data for any purpose." 276 return self._stash
Switch stash. Used to store per-switch data for any purpose.
278 @property 279 def ee(self) -> "SwitchEmitter": 280 "Switch event emitter. See `SwitchEvent` for more details on events." 281 return self._ee
Switch event emitter. See SwitchEvent for more details on events.
283 @property 284 def device_id(self) -> int: 285 "Switch's device ID." 286 return self._options.device_id
Switch's device ID.
288 @property 289 def is_up(self) -> bool: 290 "True if switch is UP." 291 return self._is_channel_up
True if switch is UP.
293 @property 294 def is_primary(self) -> bool: 295 "True if switch is primary." 296 return self._arbitrator.is_primary
True if switch is primary.
298 @property 299 def primary_id(self) -> int: 300 "Election ID of switch that is currently primary." 301 return self._arbitrator.primary_id
Election ID of switch that is currently primary.
303 @property 304 def election_id(self) -> int: 305 "Switch's current election ID." 306 return self._arbitrator.election_id
Switch's current election ID.
308 @property 309 def role_name(self) -> str: 310 "Switch's current role name." 311 return self._arbitrator.role_name
Switch's current role name.
318 @property 319 def gnmi_client(self) -> GNMIClient | None: 320 "Switch's gNMI client." 321 return self._gnmi_client
Switch's gNMI client.
323 @property 324 def ports(self) -> SwitchPortList: 325 "Switch's list of interfaces." 326 return self._ports
Switch's list of interfaces.
328 @property 329 def api_version(self) -> ApiVersion: 330 "P4Runtime protocol version." 331 return self._api_version
P4Runtime protocol version.
357 async def read( 358 self, 359 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 360 ) -> AsyncGenerator[p4entity.P4Entity, None]: 361 "Async iterator that reads entities from the switch." 362 assert self._p4client is not None 363 364 if not entities: 365 return 366 367 if isinstance(entities, p4entity.P4Entity): 368 entities = [entities] 369 370 request = p4r.ReadRequest( 371 device_id=self.device_id, 372 entities=p4entity.encode_entities(entities, self.p4info), 373 ) 374 375 async for reply in self._p4client.request_iter(request): 376 for ent in reply.entities: 377 yield p4entity.decode_entity(ent, self.p4info)
Async iterator that reads entities from the switch.
379 async def read_packets( 380 self, 381 *, 382 queue_size: int = _DEFAULT_QUEUE_SIZE, 383 eth_types: Iterable[int] | None = None, 384 ) -> AsyncIterator["p4entity.P4PacketIn"]: 385 "Async iterator for incoming packets (P4PacketIn)." 386 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 387 388 if eth_types is None: 389 390 def _pkt_filter(_payload: bytes) -> bool: 391 return True 392 393 else: 394 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 395 396 def _pkt_filter(_payload: bytes) -> bool: 397 return _payload[12:14] in _filter 398 399 queue = Queue[p4entity.P4PacketIn](queue_size) 400 queue_filter = (_pkt_filter, queue) 401 self._packet_queues.append(queue_filter) 402 403 try: 404 while True: 405 yield await queue.get() 406 finally: 407 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 408 self._packet_queues.remove(queue_filter)
Async iterator for incoming packets (P4PacketIn).
410 async def read_digests( 411 self, 412 digest_id: str, 413 *, 414 queue_size: int = _DEFAULT_QUEUE_SIZE, 415 ) -> AsyncIterator["p4entity.P4DigestList"]: 416 "Async iterator for incoming digest lists (P4DigestList)." 417 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 418 419 if digest_id in self._digest_queues: 420 raise ValueError(f"queue for digest_id {digest_id!r} already open") 421 422 queue = Queue[p4entity.P4DigestList](queue_size) 423 self._digest_queues[digest_id] = queue 424 try: 425 while True: 426 yield await queue.get() 427 finally: 428 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 429 del self._digest_queues[digest_id]
Async iterator for incoming digest lists (P4DigestList).
431 async def read_idle_timeouts( 432 self, 433 *, 434 queue_size: int = _DEFAULT_QUEUE_SIZE, 435 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 436 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 437 LOGGER.debug("read_idle_timeouts: opening queue") 438 439 if self._timeout_queue is not None: 440 raise ValueError("timeout queue already open") 441 442 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 443 self._timeout_queue = queue 444 try: 445 while True: 446 yield await queue.get() 447 finally: 448 LOGGER.debug("read_idle_timeouts: closing queue") 449 self._timeout_queue = None
Async iterator for incoming idle timeouts (P4IdleTimeoutNotification).
451 async def write( 452 self, 453 entities: Iterable[p4entity.P4UpdateList], 454 *, 455 strict: bool = True, 456 warn_only: bool = False, 457 ) -> None: 458 """Write updates and stream messages to the switch. 459 460 If `strict` is False, MODIFY and DELETE operations will NOT raise an 461 error if the entity does not exist (NOT_FOUND). 462 463 If `warn_only` is True, no operations will raise an error. Instead, 464 the exception will be logged as a WARNING and the method will return 465 normally. 466 """ 467 assert self._p4client is not None 468 469 if not entities: 470 return 471 472 msgs = p4entity.encode_updates(entities, self.p4info) 473 474 updates: list[p4r.Update] = [] 475 for msg in msgs: 476 if isinstance(msg, p4r.StreamMessageRequest): 477 # StreamMessageRequests are transmitted immediately. 478 # TODO: Understand what happens with backpressure? 479 await self._p4client.send(msg) 480 else: 481 updates.append(msg) 482 483 if updates: 484 await self._write_request(updates, strict, warn_only)
Write updates and stream messages to the switch.
If strict is False, MODIFY and DELETE operations will NOT raise an
error if the entity does not exist (NOT_FOUND).
If warn_only is True, no operations will raise an error. Instead,
the exception will be logged as a WARNING and the method will return
normally.
486 async def insert( 487 self, 488 entities: Iterable[p4entity.P4EntityList], 489 *, 490 warn_only: bool = False, 491 ) -> None: 492 """Insert the specified entities. 493 494 If `warn_only` is True, errors will be logged as warnings instead of 495 raising an exception. 496 """ 497 if entities: 498 await self._write_request( 499 [ 500 p4r.Update(type=p4r.Update.INSERT, entity=ent) 501 for ent in p4entity.encode_entities(entities, self.p4info) 502 ], 503 True, 504 warn_only, 505 )
Insert the specified entities.
If warn_only is True, errors will be logged as warnings instead of
raising an exception.
507 async def modify( 508 self, 509 entities: Iterable[p4entity.P4EntityList], 510 *, 511 strict: bool = True, 512 warn_only: bool = False, 513 ) -> None: 514 """Modify the specified entities. 515 516 If `strict` is False, NOT_FOUND errors will be ignored. 517 518 If `warn_only` is True, errors will be logged as warnings instead of 519 raising an exception. 520 """ 521 if entities: 522 await self._write_request( 523 [ 524 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 525 for ent in p4entity.encode_entities(entities, self.p4info) 526 ], 527 strict, 528 warn_only, 529 )
Modify the specified entities.
If strict is False, NOT_FOUND errors will be ignored.
If warn_only is True, errors will be logged as warnings instead of
raising an exception.
531 async def delete( 532 self, 533 entities: Iterable[p4entity.P4EntityList], 534 *, 535 strict: bool = True, 536 warn_only: bool = False, 537 ) -> None: 538 """Delete the specified entities. 539 540 If `strict` is False, NOT_FOUND errors will be ignored. 541 542 If `warn_only` is True, errors will be logged as warnings instead of 543 raising an exception. 544 """ 545 if entities: 546 await self._write_request( 547 [ 548 p4r.Update(type=p4r.Update.DELETE, entity=ent) 549 for ent in p4entity.encode_entities(entities, self.p4info) 550 ], 551 strict, 552 warn_only, 553 )
Delete the specified entities.
If strict is False, NOT_FOUND errors will be ignored.
If warn_only is True, errors will be logged as warnings instead of
raising an exception.
555 async def delete_all(self) -> None: 556 """Delete all entities if no parameter is passed. Otherwise, delete 557 items that match `entities`. 558 559 This method does not attempt to delete entries in const tables. 560 561 TODO: This method does not affect indirect counters, meters or 562 value_sets. 563 """ 564 await self.delete_many( 565 [ 566 p4entity.P4TableEntry(), 567 p4entity.P4MulticastGroupEntry(), 568 p4entity.P4CloneSessionEntry(), 569 ] 570 ) 571 572 # Reset all default table entries. 573 default_entries = [ 574 p4entity.P4TableEntry(table.alias, is_default_action=True) 575 for table in self.p4info.tables 576 if table.const_default_action is None and table.action_profile is None 577 ] 578 if default_entries: 579 await self.modify(default_entries) 580 581 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 582 # We do this after deleting the P4TableEntry's in case a client is using 583 # "one-shot" references; these are incompatible with separate 584 # action profiles. 585 await self.delete_many( 586 [ 587 p4entity.P4ActionProfileGroup(), 588 p4entity.P4ActionProfileMember(), 589 ] 590 ) 591 592 # Delete DigestEntry separately. Wildcard reads are not supported. 593 digest_entries = [ 594 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 595 ] 596 if digest_entries: 597 await self.delete(digest_entries, strict=False)
Delete all entities if no parameter is passed. Otherwise, delete
items that match entities.
This method does not attempt to delete entries in const tables.
TODO: This method does not affect indirect counters, meters or value_sets.
599 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 600 """Delete entities that match a wildcard read. 601 602 This method always skips over entries in const tables. It is an error 603 to attempt to delete those. 604 """ 605 assert self._p4client is not None 606 607 request = p4r.ReadRequest( 608 device_id=self.device_id, 609 entities=p4entity.encode_entities(entities, self.p4info), 610 ) 611 612 # Compute set of all const table ID's (may be empty). 613 to_skip = {table.id for table in self.p4info.tables if table.is_const} 614 615 async for reply in self._p4client.request_iter(request): 616 if reply.entities: 617 if to_skip: 618 await self.delete( 619 reply 620 for reply in reply.entities 621 if reply.HasField("table_entry") 622 and reply.table_entry.table_id not in to_skip 623 ) 624 else: 625 await self.delete(reply.entities)
Delete entities that match a wildcard read.
This method always skips over entries in const tables. It is an error to attempt to delete those.
627 async def run(self) -> None: 628 "Run the switch's lifecycle repeatedly." 629 assert self._p4client is None 630 assert self._tasks is None 631 632 self._tasks = SwitchTasks(self._options.fail_fast) 633 self._p4client = P4Client(self._address, self._options.channel_credentials) 634 self._switch_start() 635 636 try: 637 while True: 638 # If the switch fails and restarts too quickly, slow it down. 639 async with _throttle_failure(): 640 self.create_task(self._run(), background=True) 641 await self._tasks.wait() 642 self._arbitrator.reset() 643 644 finally: 645 self._p4client = None 646 self._tasks = None 647 self._switch_stop()
Run the switch's lifecycle repeatedly.
649 def create_task( 650 self, 651 coro: Coroutine[Any, Any, _T], 652 *, 653 background: bool = False, 654 name: str | None = None, 655 ) -> asyncio.Task[_T]: 656 "Create an asyncio task tied to the Switch's lifecycle." 657 assert self._tasks is not None 658 659 return self._tasks.create_task( 660 coro, 661 switch=self, 662 background=background, 663 name=name, 664 )
Create an asyncio task tied to the Switch's lifecycle.
722 async def __aenter__(self) -> Self: 723 "Similar to run() but provides a one-time context manager interface." 724 assert self._p4client is None 725 assert self._tasks is None 726 727 self._tasks = SwitchTasks(self._options.fail_fast) 728 self._p4client = P4Client( 729 self._address, 730 self._options.channel_credentials, 731 wait_for_ready=False, 732 ) 733 self._switch_start() 734 735 try: 736 # Start the switch's `_run` task in the background. Then, wait for 737 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 738 # cannot connect or fails in some other way, it will finish before 739 # the `ready` future. We need to handle the error in this case. 740 741 run = self.create_task(self._run(), background=True) 742 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 743 done, _ = await asyncio.wait( 744 [run, ready], return_when=asyncio.FIRST_COMPLETED 745 ) 746 if run in done: 747 await run 748 749 except BaseException: 750 await self.__aexit__(None, None, None) 751 raise 752 753 return self
Similar to run() but provides a one-time context manager interface.
1097class SwitchEvent(str, enum.Enum): 1098 "Events for Switch class." 1099 1100 CONTROLLER_ENTER = "controller_enter" # (switch) 1101 CONTROLLER_LEAVE = "controller_leave" # (switch) 1102 SWITCH_START = "switch_start" # (switch) 1103 SWITCH_STOP = "switch_stop" # (switch) 1104 CHANNEL_UP = "channel_up" # (switch) 1105 CHANNEL_DOWN = "channel_down" # (switch) 1106 CHANNEL_READY = "channel_ready" # (switch) 1107 BECOME_PRIMARY = "become_primary" # (switch) 1108 BECOME_BACKUP = "become_backup" # (switch) 1109 PORT_UP = "port_up" # (switch, port) 1110 PORT_DOWN = "port_down" # (switch, port) 1111 STREAM_ERROR = "stream_error" # (switch, p4r.StreamMessageResponse)
Events for Switch class.
70@final 71@dataclasses.dataclass(frozen=True) 72class SwitchOptions: 73 """Represents the configuration options for a `Switch`. 74 75 ``` 76 opts = SwitchOptions( 77 p4info=Path("basic.p4info.txtpb"), 78 p4blob=Path("basic.json"), 79 ready_handler=on_ready, 80 ) 81 ``` 82 83 Each `SwitchOptions` object is immutable and may be shared by multiple 84 switches. You should treat all values as read-only. 85 86 You can use function call syntax to return a copy of a `SwitchOptions` with 87 one or more propertise altered. 88 89 ``` 90 new_opts = opts(device_id=6) 91 ``` 92 """ 93 94 p4info: Path | None = None 95 "Path to P4Info protobuf text file." 96 97 p4blob: Path | SupportsBytes | None = None 98 "Path to P4Blob file, or an object that can provide the bytes value." 99 100 p4force: bool = False 101 "If true, always load the P4 program after initial handshake." 102 103 device_id: int = 1 104 "Default P4Runtime device ID." 105 106 initial_election_id: int = 10 107 "Initial P4Runtime election ID." 108 109 channel_credentials: GRPCCredentialsTLS | None = None 110 "P4Runtime channel credentials. Used for TLS support." 111 112 role_name: str = "" 113 "P4Runtime role configuration." 114 115 role_config: pbutil.PBMessage | None = None 116 "P4Runtime role configuration." 117 118 ready_handler: Callable[["Switch"], Coroutine[Any, Any, None]] | None = None 119 "Ready handler async function callback." 120 121 fail_fast: bool = False 122 """If true, log switch errors as CRITICAL and immediately abort when the 123 switch is running in a Controller.""" 124 125 def __call__(self, **kwds: Any) -> Self: 126 return dataclasses.replace(self, **kwds)
Represents the configuration options for a Switch.
opts = SwitchOptions(
p4info=Path("basic.p4info.txtpb"),
p4blob=Path("basic.json"),
ready_handler=on_ready,
)
Each SwitchOptions object is immutable and may be shared by multiple
switches. You should treat all values as read-only.
You can use function call syntax to return a copy of a SwitchOptions with
one or more propertise altered.
new_opts = opts(device_id=6)
Path to P4Blob file, or an object that can provide the bytes value.
43@dataclass 44class SwitchPort: 45 "Represents a switch port." 46 47 id: int 48 name: str 49 oper_status: OperStatus = OperStatus.UNKNOWN 50 51 @property 52 def up(self) -> bool: 53 "Return true if port is basically up." 54 return self.oper_status == OperStatus.UP
Represents a switch port.
57class SwitchPortList: 58 "Represents a list of switch ports." 59 60 _ports: dict[str, SwitchPort] 61 _subscription: GNMISubscription | None = None 62 63 def __init__(self): 64 self._ports = {} 65 66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key] 69 70 def __len__(self) -> int: 71 "Return number of switch ports." 72 return len(self._ports) 73 74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values()) 77 78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist") 88 89 async def listen(self, switch: "_sw.Switch | None" = None) -> None: 90 "Listen for switch port updates." 91 assert self._subscription is not None 92 93 async for update in self._subscription.updates(): 94 self._update(update, switch) 95 96 def close(self) -> None: 97 "Close the switch port subscription." 98 if self._subscription is not None: 99 self._subscription.cancel() 100 self._subscription = None 101 self._ports = {} 102 103 async def _get_ports(self, client: GNMIClient) -> dict[str, SwitchPort]: 104 "Retrieve ID and name of each port." 105 ports: dict[str, SwitchPort] = {} 106 107 result = await client.get(_ifIndex) 108 for update in result: 109 path = update.path 110 assert path.last == _ifIndex.last 111 112 port = SwitchPort(update.value, path["name"]) 113 ports[port.name] = port 114 115 return ports 116 117 async def _get_subscription(self, client: GNMIClient) -> GNMISubscription: 118 sub = client.subscribe() 119 120 # Subscribe to change notifications. 121 for port in self._ports.values(): 122 sub.on_change(_ifOperStatus.set(name=port.name)) 123 124 # Synchronize initial settings for ports. 125 async for update in sub.synchronize(): 126 self._update(update, None) 127 128 return sub 129 130 def _update(self, update: GNMIUpdate, switch: "_sw.Switch | None"): 131 path = update.path 132 if path.last == _ifOperStatus.last: 133 status = OperStatus(update.value) 134 self._update_port(path["name"], status, switch) 135 else: 136 LOGGER.warning(f"PortList: unknown gNMI path: {path}") 137 138 def _update_port(self, name: str, status: OperStatus, switch: "_sw.Switch | None"): 139 port = self._ports[name] 140 141 prev_up = port.up 142 port.oper_status = status 143 curr_up = port.up 144 145 if switch is not None and curr_up != prev_up: 146 if curr_up: 147 switch.ee.emit(_sw.SwitchEvent.PORT_UP, switch, port) 148 else: 149 switch.ee.emit(_sw.SwitchEvent.PORT_DOWN, switch, port)
Represents a list of switch ports.
66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key]
Retrieve interface by ID.
74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values())
Iterate over switch ports.
78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist")
Obtain the initial list of ports and subscribe to switch port status updates using GNMI.
119class GNMIClient: 120 """Async GNMI client. 121 122 This client implements `get`, `set`, `subscribe` and `capabilities`. 123 124 The API depends on the protobuf definition of `gnmi.TypedValue`. 125 126 Get usage: 127 ``` 128 client = GNMIClient('127.0.0.1:9339') 129 await client.open() 130 131 path = GNMIPath("interfaces/interface") 132 async for update in client.get(path): 133 print(update) 134 ``` 135 136 Subscribe usage: 137 ``` 138 path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status") 139 sub = client.subscribe() 140 sub.on_change(path) 141 142 async for initial_state in sub.synchronize(): 143 print(initial_state) 144 145 async for update in sub.updates(): 146 print(update) 147 ``` 148 149 Set usage: 150 ``` 151 enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled") 152 153 await client.set(update={ 154 enabled: gnmi.TypedValue(boolValue=True), 155 }) 156 ``` 157 """ 158 159 _address: str 160 _credentials: GRPCCredentialsTLS | None 161 _channel: grpc.aio.Channel | None = None 162 _stub: gnmi_grpc.gNMIStub | None = None 163 _channel_reused: bool = False 164 165 def __init__( 166 self, 167 address: str, 168 credentials: GRPCCredentialsTLS | None = None, 169 ): 170 self._address = address 171 self._credentials = credentials 172 173 async def __aenter__(self) -> Self: 174 await self.open() 175 return self 176 177 async def __aexit__(self, *_args: Any) -> bool | None: 178 await self.close() 179 180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel) 205 206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False 216 217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), # type: ignore 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result 255 256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix) 292 293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), # type: ignore 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply 311 312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), # type: ignore 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp 374 375 def _log_msg(self, msg: "pbutil.PBMessage"): 376 "Log a gNMI message." 377 pbutil.log_msg(self._channel, msg, None)
Async GNMI client.
This client implements get, set, subscribe and capabilities.
The API depends on the protobuf definition of gnmi.TypedValue.
Get usage:
client = GNMIClient('127.0.0.1:9339')
await client.open()
path = GNMIPath("interfaces/interface")
async for update in client.get(path):
print(update)
Subscribe usage:
path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
sub = client.subscribe()
sub.on_change(path)
async for initial_state in sub.synchronize():
print(initial_state)
async for update in sub.updates():
print(update)
Set usage:
enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")
await client.set(update={
enabled: gnmi.TypedValue(boolValue=True),
})
180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel)
Open the client channel.
Note: This method is async for forward-compatible reasons.
206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False
Close the client channel.
217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), # type: ignore 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result
Retrieve value(s) using a GetRequest.
256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix)
Subscribe to gNMI change notifications.
Usage:
sub = client.subscribe()
sub.on_change(path1, ...)
sub.sample(path3, path4, sample_interval=1000000000)
async for update in sub.synchronize():
# do something with initial state
async for update in sub.updates():
# do something with updates
You can also subscribe in "ONCE" mode:
sub = client.subscribe()
sub.once(path1, path2, ...)
async for info in sub.synchronize():
# do something with info
The subscription object is not re-entrant, but a fully consumed subscription may be reused.
293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), # type: ignore 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply
Issue a CapabilitiesRequest.
312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), # type: ignore 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp
Set value(s) using SetRequest.
Returns the timestamp from the successful SetResponse.
25class GNMIPath: 26 """Concrete class for working with `gnmi.Path` objects. 27 28 A `GNMIPath` should be treated as an immutable object. You can access 29 the wrapped `gnmi.Path` protobuf class using the `.path` property. 30 `GNMIPath` objects are hashable, but you must be careful that you do not 31 mutate them via the underlying `gnmi.Path`. 32 33 You can construct a GNMIPath from a string: 34 ``` 35 path = GNMIPath("interfaces/interface[name=eth1]/state") 36 ``` 37 38 You can construct a GNMIPath object from a `gnmi.Path` directly. 39 ``` 40 path = GNMIPath(update.path) 41 ``` 42 43 You can create paths by using an existing path as a template, without 44 modifying the original path. Use the `set` method: 45 ``` 46 operStatus = GNMIPath("interfaces/interface/state/oper-status") 47 path = operStatus.set("interface", name="eth1") 48 ``` 49 50 Use [] to access the name/key of path elements: 51 ``` 52 path[1] == "interface" 53 path["interface", "name"] == "eth1" 54 path["name"] == "eth1" 55 ``` 56 """ 57 58 path: gnmi.Path 59 "The underlying `gnmi.Path` protobuf representation." 60 61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path 83 84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name 88 89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name 93 94 @property 95 def origin(self) -> str: 96 "Return the path's origin." 97 return self.path.origin 98 99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target 103 104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result 119 120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path)) 123 124 @overload 125 def __getitem__( 126 self, key: int | str | tuple[int | str, str] 127 ) -> str: ... # pragma: no cover 128 129 @overload 130 def __getitem__(self, key: slice) -> "GNMIPath": ... # pragma: no cover 131 132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}") 156 157 def __eq__(self, rhs: Any) -> bool: 158 "Return True if path's are equal." 159 if not isinstance(rhs, GNMIPath): 160 return False 161 return self.path == rhs.path # pyright: ignore[reportUnknownVariableType] 162 163 def __hash__(self) -> int: 164 "Return hash of path." 165 return hash(tuple(_to_tuple(elem) for elem in self.path.elem)) 166 167 def __contains__(self, name: str) -> bool: 168 "Return True if element name is in path." 169 for elem in self.path.elem: 170 if elem.name == name: 171 return True 172 return False 173 174 def __repr__(self) -> str: 175 "Return string representation of path." 176 path = gnmistring.to_str(self.path) 177 if self.target or self.origin: 178 return f"GNMIPath({path!r}, origin={self.origin!r}, target={self.target!r})" 179 return f"GNMIPath({path!r})" 180 181 def __str__(self) -> str: 182 "Return path as string." 183 return gnmistring.to_str(self.path) 184 185 def __len__(self) -> int: 186 "Return the length of the path." 187 return len(self.path.elem) 188 189 def __truediv__(self, rhs: "GNMIPath | str") -> "GNMIPath": 190 "Append values to end of path." 191 if not isinstance(rhs, GNMIPath): 192 rhs = GNMIPath(rhs) 193 194 result = gnmi.Path( 195 elem=itertools.chain(self.path.elem, rhs.path.elem), 196 origin=self.origin, 197 target=self.target, 198 ) 199 return GNMIPath(result) 200 201 def __rtruediv__(self, lhs: str) -> "GNMIPath": 202 "Prepend values to the beginning of the path." 203 path = GNMIPath(lhs) 204 205 result = gnmi.Path(elem=itertools.chain(path.path.elem, self.path.elem)) 206 return GNMIPath(result) 207 208 def _slice( 209 self, start: int | None, stop: int | None, step: int | None 210 ) -> "GNMIPath": 211 "Return specified slice of GNMIPath." 212 if start is None: 213 start = 0 214 215 if stop is None: 216 stop = len(self) 217 elif stop < 0: 218 stop = len(self) + stop 219 220 if step is None: 221 step = 1 222 223 path = gnmi.Path() 224 for i in range(start, stop, step): 225 elem = self.path.elem[i] 226 path.elem.append(gnmi.PathElem(name=elem.name, key=elem.key)) 227 228 return GNMIPath(path) 229 230 def _rekey(self, keys: dict[str, Any]) -> "GNMIPath": 231 """Construct a new path with specified keys replaced.""" 232 if not keys: 233 raise ValueError("empty keys") 234 235 result = self.copy() 236 237 found = False 238 for elem in result.path.elem: 239 for key, value in keys.items(): 240 if key in elem.key: 241 elem.key[key] = str(value) 242 found = True 243 244 if not found: 245 raise ValueError(f"no keys found in path: {keys!r}") 246 247 return result
Concrete class for working with gnmi.Path objects.
A GNMIPath should be treated as an immutable object. You can access
the wrapped gnmi.Path protobuf class using the .path property.
GNMIPath objects are hashable, but you must be careful that you do not
mutate them via the underlying gnmi.Path.
You can construct a GNMIPath from a string:
path = GNMIPath("interfaces/interface[name=eth1]/state")
You can construct a GNMIPath object from a gnmi.Path directly.
path = GNMIPath(update.path)
You can create paths by using an existing path as a template, without
modifying the original path. Use the set method:
operStatus = GNMIPath("interfaces/interface/state/oper-status")
path = operStatus.set("interface", name="eth1")
Use [] to access the name/key of path elements:
path[1] == "interface"
path["interface", "name"] == "eth1"
path["name"] == "eth1"
61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path
Construct a GNMIPath from a string or gnmi.Path.
84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name
Return the first element of the path.
89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name
Return the last element of the path.
99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target
Return the path's target.
104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result
Construct a new GNMIPath with keys set for the given elem.
120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path))
Return a copy of the path.
132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}")
Return the specified element or key value.
385class GNMISubscription: 386 """Represents a gNMI subscription stream. 387 388 Returned from `GNMIClient.subscribe()`. 389 """ 390 391 _client: GNMIClient 392 _stream: _StreamTypeAlias | None 393 394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path) 403 404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub) 411 412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub) 422 423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub) 442 443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None 456 457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None 467 468 def cancel(self) -> None: 469 "Cancel the subscription." 470 if self._stream is not None: 471 self._stream.cancel() 472 self._stream = None 473 474 async def _subscribe(self): 475 assert self._stream is None 476 assert self._client._stub is not None 477 478 self._stream = cast( 479 _StreamTypeAlias, 480 self._client._stub.Subscribe(wait_for_ready=True), # type: ignore 481 ) 482 483 request = gnmi.SubscribeRequest(subscribe=self._sublist) 484 self._client._log_msg(request) 485 try: 486 await self._stream.write(request) 487 except grpc.RpcError as ex: 488 raise GNMIClientError(ex) from None 489 490 async def _read(self, stop_at_sync: bool) -> AsyncIterator[GNMIUpdate]: 491 assert self._stream is not None 492 493 while True: 494 msg = cast( 495 gnmi.SubscribeResponse, 496 await self._stream.read(), 497 ) 498 if msg is GRPC_EOF: 499 LOGGER.warning("gNMI _read: unexpected EOF") 500 return 501 502 self._client._log_msg(msg) 503 504 match msg.WhichOneof("response"): 505 case "update": 506 for update in _read_updates(msg.update): 507 yield update 508 case "sync_response": 509 if stop_at_sync: 510 if self._is_once(): 511 # FIXME? I expected Stratum to issue an EOF after 512 # the sync_response in "ONCE" mode, but it doesn't. 513 # For now, cancel the stream explictly. 514 self.cancel() 515 # Let grpc react to the cancellation immediately. 516 await asyncio.sleep(0) 517 return # All done! 518 LOGGER.warning("gNMI _read: ignored sync_response") 519 case other: 520 LOGGER.warning("gNMI _read: unexpected oneof: %s", other) 521 522 def _is_once(self) -> bool: 523 "Return true if the subscription is in ONCE mode." 524 return self._sublist.mode == gnmi.SubscriptionList.Mode.ONCE
Represents a gNMI subscription stream.
Returned from GNMIClient.subscribe().
394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path)
Initialize a GNMISubscription.
404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub)
Subscribe in ONCE mode to given paths.
412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub)
Subscribe in ON_CHANGE mode to given paths.
423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub)
Subscribe in SAMPLE mode to given paths.
443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None
Async iterator for initial subscription updates.
Retrieve all updates up to sync_response message.
457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None
Async iterator for all remaining subscription updates.
63@dataclass 64class GNMIUpdate: 65 "Represents a gNMI update returned from GNMIClient." 66 67 timestamp: int 68 path: GNMIPath 69 typed_value: gnmi.TypedValue | None 70 71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr) 81 82 def __repr__(self) -> str: 83 "Override repr to strip newline from end of `TypedValue`." 84 value = repr(self.typed_value).rstrip() 85 return f"GNMIUpdate(timestamp={self.timestamp!r}, path={self.path!r}, typed_value=`{value}`)"
Represents a gNMI update returned from GNMIClient.
71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr)
Return the value as a Python value.
118@dataclass(kw_only=True) 119class GRPCCredentialsTLS: 120 "GRPC channel credentials for Transport Layer Security (TLS)." 121 122 cacert: Path | bytes | None 123 """Certificate authority used to authenticate the certificate at the other 124 end of the connection.""" 125 126 cert: Path | bytes | None 127 "Certificate that identifies this side of the connection." 128 129 private_key: Path | bytes | None 130 "Private key associated with this side's certificate identity." 131 132 target_name_override: str = "" 133 "Override the target name used for TLS host name checking (useful for testing)." 134 135 call_credentials: grpc.AuthMetadataPlugin | None = None 136 """Optional GRPC call credentials for the client channel. Be aware that the 137 auth plugin's callback takes place in a different thread.""" 138 139 def to_client_credentials(self) -> grpc.ChannelCredentials: 140 "Create native SSL client credentials object." 141 root_certificates = _coerce_tls_path(self.cacert) 142 certificate_chain = _coerce_tls_path(self.cert) 143 private_key = _coerce_tls_path(self.private_key) 144 145 return self._compose_credentials( 146 grpc.ssl_channel_credentials( 147 root_certificates=root_certificates, 148 private_key=private_key, 149 certificate_chain=certificate_chain, 150 ) 151 ) 152 153 def to_server_credentials(self) -> grpc.ServerCredentials: 154 """Create native SSL server credentials object. 155 156 On the server side, we ignore the `call_credentials`. 157 """ 158 root_certificates = _coerce_tls_path(self.cacert) 159 certificate_chain = _coerce_tls_path(self.cert) 160 private_key = _coerce_tls_path(self.private_key) 161 162 if not private_key: 163 raise ValueError("Empty private key in server credentials") 164 165 if not certificate_chain: 166 raise ValueError("Empty certificate chain in server credential") 167 168 return grpc.ssl_server_credentials( 169 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 170 root_certificates=root_certificates, 171 require_client_auth=True, 172 ) 173 174 def _compose_credentials( 175 self, channel_cred: grpc.ChannelCredentials 176 ) -> grpc.ChannelCredentials: 177 "Compose call credentials with channel credentials." 178 if not self.call_credentials: 179 return channel_cred 180 181 call_cred = grpc.metadata_call_credentials(self.call_credentials) 182 return grpc.composite_channel_credentials(channel_cred, call_cred)
GRPC channel credentials for Transport Layer Security (TLS).
Certificate authority used to authenticate the certificate at the other end of the connection.
Private key associated with this side's certificate identity.
Override the target name used for TLS host name checking (useful for testing).
Optional GRPC call credentials for the client channel. Be aware that the auth plugin's callback takes place in a different thread.
139 def to_client_credentials(self) -> grpc.ChannelCredentials: 140 "Create native SSL client credentials object." 141 root_certificates = _coerce_tls_path(self.cacert) 142 certificate_chain = _coerce_tls_path(self.cert) 143 private_key = _coerce_tls_path(self.private_key) 144 145 return self._compose_credentials( 146 grpc.ssl_channel_credentials( 147 root_certificates=root_certificates, 148 private_key=private_key, 149 certificate_chain=certificate_chain, 150 ) 151 )
Create native SSL client credentials object.
153 def to_server_credentials(self) -> grpc.ServerCredentials: 154 """Create native SSL server credentials object. 155 156 On the server side, we ignore the `call_credentials`. 157 """ 158 root_certificates = _coerce_tls_path(self.cacert) 159 certificate_chain = _coerce_tls_path(self.cert) 160 private_key = _coerce_tls_path(self.private_key) 161 162 if not private_key: 163 raise ValueError("Empty private key in server credentials") 164 165 if not certificate_chain: 166 raise ValueError("Empty certificate chain in server credential") 167 168 return grpc.ssl_server_credentials( 169 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 170 root_certificates=root_certificates, 171 require_client_auth=True, 172 )
Create native SSL server credentials object.
On the server side, we ignore the call_credentials.
42class GRPCStatusCode(_EnumBase): 43 "IntEnum equivalent to `grpc.StatusCode`." 44 45 OK = rpc_code.OK 46 CANCELLED = rpc_code.CANCELLED 47 UNKNOWN = rpc_code.UNKNOWN 48 FAILED_PRECONDITION = rpc_code.FAILED_PRECONDITION 49 INVALID_ARGUMENT = rpc_code.INVALID_ARGUMENT 50 DEADLINE_EXCEEDED = rpc_code.DEADLINE_EXCEEDED 51 NOT_FOUND = rpc_code.NOT_FOUND 52 ALREADY_EXISTS = rpc_code.ALREADY_EXISTS 53 PERMISSION_DENIED = rpc_code.PERMISSION_DENIED 54 UNAUTHENTICATED = rpc_code.UNAUTHENTICATED 55 RESOURCE_EXHAUSTED = rpc_code.RESOURCE_EXHAUSTED 56 ABORTED = rpc_code.ABORTED 57 OUT_OF_RANGE = rpc_code.OUT_OF_RANGE 58 UNIMPLEMENTED = rpc_code.UNIMPLEMENTED 59 INTERNAL = rpc_code.INTERNAL 60 UNAVAILABLE = rpc_code.UNAVAILABLE 61 DATA_LOSS = rpc_code.DATA_LOSS 62 63 @classmethod 64 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 65 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 66 n: Any = val.value[0] 67 assert isinstance(n, int) 68 return GRPCStatusCode(n) 69 70 @staticmethod 71 def _validate_enum() -> None: 72 "Verify that GRPCStatusCode covers every possible grpc.StatusCode." 73 for value in grpc.StatusCode: 74 n: Any = value.value[0] 75 assert isinstance(n, int) 76 assert GRPCStatusCode[value.name].value == n, value.name
IntEnum equivalent to grpc.StatusCode.
63 @classmethod 64 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 65 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 66 n: Any = val.value[0] 67 assert isinstance(n, int) 68 return GRPCStatusCode(n)
Create corresponding GRPCStatusCode from a grpc.StatusCode object.