finsy
Finsy P4Runtime Controller Library
Finsy is a P4Runtime controller library written in Python using asyncio. Finsy includes support for gNMI.
Check out the examples directory for some demonstration programs.
Installation
Finsy requires Python 3.10 or later. To install the latest version, type pip install finsy
.
P4Runtime Scripts
With Finsy, you can write a Python script that reads/writes P4Runtime entities for a single switch.
Here is a complete example that retrieves the P4Info from a switch:
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Print out a description of the switch's P4Info, if one is configured.
print(sw1.p4info)
fy.run(main())
Here is another example that prints out all non-default table entries.
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Do a wildcard read for table entries.
async for entry in sw1.read(fy.P4TableEntry()):
print(entry)
fy.run(main())
P4Runtime Controller
You can also write a P4Runtime controller that manages multiple switches independently. Your controller can react to events from the Switch by changing the contents of P4 tables.
Each switch is managed by an async ready_handler
function. Your ready_handler
function can read or
update various P4Runtime entities in the switch. It can also create tasks to listen for
packets or digests.
When you write P4Runtime updates to the switch, you use a unary operator (+, -, \~) to specify the operation: INSERT (+), DELETE (-) or MODIFY (\~).
import finsy as fy
async def ready_handler(sw: fy.Switch):
await sw.delete_all()
await sw.write(
[
# Insert (+) multicast group with ports 1, 2, 3 and CONTROLLER.
+fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3, 255]),
# Modify (~) default table entry to flood all unmatched packets.
~fy.P4TableEntry(
"ipv4",
action=fy.Action("flood"),
is_default_action=True,
),
]
)
async for packet in sw.read_packets():
print(f"{sw.name}: {packet}")
Use the SwitchOptions
class to specify each switch's settings, including the p4info/p4blob and
ready_handler
. Use the Controller
class to drive multiple switch connections. Each switch will call back
into your ready_handler
function after the P4Runtime connection is established.
from pathlib import Path
options = fy.SwitchOptions(
p4info=Path("hello.p4info.txt"),
p4blob=Path("hello.json"),
ready_handler=ready_handler,
)
controller = fy.Controller([
fy.Switch("sw1", "127.0.0.1:50001", options),
fy.Switch("sw2", "127.0.0.1:50002", options),
fy.Switch("sw3", "127.0.0.1:50003", options),
])
fy.run(controller.run())
Your ready_handler
can spawn concurrent tasks with the Switch.create_task
method. Tasks
created this way will have their lifetimes managed by the switch object.
If the switch disconnects or its role changes to backup, the task running your ready_handler
(and any tasks it spawned) will be cancelled and the ready_handler
will begin again.
For more examples, see the examples directory.
Switch Read/Write API
The Switch
class provides the API for interacting with P4Runtime switches. You will control
a Switch object with a "ready handler" function. The ready handler
is an
async function that is called when the switch is ready to accept commands.
Your ready handler
will typically write some control entities to the switch, then
listen for incoming events and react to them with more writes. You may occasionally read entities
from the switch.
When your ready handler
is invoked, there is already a P4Runtime channel established, with client
arbitration completed, and pipeline configured as specified in SwitchOptions
.
Here is an example skeleton program. The ready handler
is named ready()
.
async def ready(switch: fy.Switch):
# Check if switch is the primary. If not, we may want to proceed
# in read-only mode. In this example, ignore switch if it's a backup.
if not switch.is_primary:
return
# If we're reconnecting to a switch, it will already have runtime state.
# In this example, we just delete all entities and start over.
await switch.delete_all()
# Provision the pipeline with one or more `write` transactions. Each
# `write` is a single WriteRequest which may contain multiple updates.
await switch.write(
# [Next section will cover what goes here.]
)
# Listen for events and respond to them. This "infinite" loop will
# continue until the Switch disconnects, changes primary/backup status,
# or the controller is stopped.
async for packet in switch.read_packets():
await handle_packet(switch, packet)
The Switch class provides a switch.create_task
method to start a managed task.
Tasks allow you to perform concurrent operations on the same switch. We could have
written the last stanza above that reads packets in an infinite loop as a separate
task. It's okay for the ready handler function to return early; any tasks it
created will still run.
Writes
Use the write()
method to write one or more P4Runtime updates and packets.
A P4Runtime update supports one of three operations: INSERT, MODIFY or DELETE. Some entities support all three operations. Other entities only support MODIFY.
Entity | Operations Permitted | Related Classes |
---|---|---|
P4TableEntry |
INSERT, MODIFY, DELETE | Match , Action , IndirectAction , P4MeterConfig , P4CounterData , P4MeterCounterData |
P4ActionProfileMember |
INSERT, MODIFY, DELETE | |
P4ActionProfileGroup |
INSERT, MODIFY, DELETE | P4Member |
P4MulticastGroupEntry |
INSERT, MODIFY, DELETE | |
P4CloneSessionEntry |
INSERT, MODIFY, DELETE | |
P4DigestEntry |
INSERT, MODIFY, DELETE | |
P4ExternEntry |
INSERT, MODIFY, DELETE | |
P4RegisterEntry |
MODIFY | |
P4CounterEntry |
MODIFY | P4CounterData |
P4DirectCounterEntry |
MODIFY | P4CounterData |
P4MeterEntry |
MODIFY | P4MeterConfig , P4MeterCounterData |
P4DirectMeterEntry |
MODIFY | |
P4ValueSetEntry |
MODIFY | P4ValueSetMember |
Insert/Modify/Delete Updates
To specify the operation, use a unary +
(INSERT), ~
(MODIFY), or -
(DELETE). If you
do not specify the operation, write
will raise a ValueError
exception.
Here is an example showing how to insert and delete two different entities in the same WriteRequest.
await switch.write([
+fy.P4TableEntry( # unary + means INSERT
"ipv4",
match=fy.Match(dest="192.168.1.0/24"),
action=fy.Action("forward", port=1),
),
-fy.P4TableEntry( # unary - means DELETE
"ipv4",
match=fy.Match(dest="192.168.2.0/24"),
action=fy.Action("forward", port=2),
),
])
You should not insert, modify or delete the same entry in the same WriteRequest.
If you are performing the same operation on all entities, you can use the Switch
insert
, delete
, or modify
methods.
await switch.insert([
fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3]),
fy.P4MulticastGroupEntry(2, replicas=[4, 5, 6]),
])
Modify-Only Updates
For entities that only support the modify operation, you do not need to specify the operation. (You can
optionally use ~
.)
await switch.write([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
You can also use the modify
method:
await switch.modify([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
If you pass a modify-only entity to the insert
or delete
methods, the P4Runtime server will
return an error.
Sending Packets
Use the write
method to send a packet.
await switch.write([fy.P4PacketOut(b"a payload.....", port=3)])
You can include other entities in the same call. Any non-update objects (e.g. P4PacketOut, P4DigestListAck) will be sent before the WriteRequest.
Listening for Packets
To receive packets, use the async iterator Switch.read_packets()
.
In this example, pkt
is a P4PacketIn
object.
read_packets
can filter for a specific eth_type
.
# Read packets filtering only for ARP (eth_type == 0x0806).
async for pkt in switch.read_packets(eth_types={0x0806}):
# You can access the packet payload `pkt.payload` or any metadata value,
# e.g. `pkt['ingress_port']`
print(pkt.payload)
print(pkt['ingress_port'])
Listening for Digests
To receive digests, use the async iterator Switch.read_digests
. You must specify
the name of the digest from your P4 program.
async for digest in switch.read_digests("digest_t"):
# You can access the digest metadata e.g. `digest['ingress_port']`
# Your code may need to update table entries based on the digest data.
# To ack the digest, write `digest.ack()`.
await switch.write([entry, ...])
await switch.write([digest.ack()])
To acknowledge the digest entry, you can write digest.ack()
.
Listening for Idle Timeouts
To receive idle timeout notifications, use the async iterator Switch.read_idle_timeouts
.
You will receive a P4IdleTimeoutNotification
which contains multiple table
entries -- one for each entry that timed out.
async for timeout in switch.read_idle_timeouts():
for entry in timeout.table_entry:
print(timeout.timestamp, entry)
Other Events
A P4 switch may report other events using the EventEmitter
API. See
the SwitchEvent
class for the event types. Each switch has a switch.ee
attribute that lets your code register for event callbacks.
Development and Testing
Perform these steps to set up your local environment for Finsy development, or try the codespace. Finsy requires Python 3.10 or later. If poetry is not installed, follow these directions to install it.
Clone and Prepare a Virtual Environment
The poetry install
command installs all development dependencies into the
virtual environment (venv).
$ git clone https://github.com/byllyfish/finsy.git
$ cd finsy
$ python3 -m venv .venv
$ poetry install
Run Unit Tests
When you run pytest from the top level of the repository, you will run the unit tests.
$ poetry run pytest
Run Integration Tests
When you run pytest from within the examples
directory, you will run the integration
tests instead of the unit tests. The integration tests run the example programs against a
Mininet network. Docker or podman are required.
$ cd examples
$ poetry run pytest
API Documentation
1""" 2.. include:: ../README.md 3 4# API Documentation 5""" 6 7# Copyright (c) 2022-2023 Bill Fisher 8# 9# Licensed under the Apache License, Version 2.0 (the "License"); 10# you may not use this file except in compliance with the License. 11# You may obtain a copy of the License at 12# 13# http://www.apache.org/licenses/LICENSE-2.0 14# 15# Unless required by applicable law or agreed to in writing, software 16# distributed under the License is distributed on an "AS IS" BASIS, 17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18# See the License for the specific language governing permissions and 19# limitations under the License. 20 21__version__ = "0.25.0" 22 23import sys 24 25if sys.version_info < (3, 10): # pragma: no cover 26 raise RuntimeError("Requires Python 3.10 or later.") 27 28from .controller import Controller 29from .gnmiclient import GNMIClient, GNMISubscription, GNMIUpdate 30from .gnmipath import GNMIPath 31from .grpcutil import GRPCCredentialsTLS, GRPCStatusCode 32from .log import LoggerAdapter 33from .macaddr import MACAddress 34from .p4client import P4Client, P4ClientError, P4Error 35from .p4entity import ( 36 P4ActionProfileGroup, 37 P4ActionProfileMember, 38 P4CloneSessionEntry, 39 P4CounterData, 40 P4CounterEntry, 41 P4DigestEntry, 42 P4DigestList, 43 P4DigestListAck, 44 P4DirectCounterEntry, 45 P4DirectMeterEntry, 46 P4ExternEntry, 47 P4IndirectAction, 48 P4Member, 49 P4MeterConfig, 50 P4MeterCounterData, 51 P4MeterEntry, 52 P4MulticastGroupEntry, 53 P4PacketIn, 54 P4PacketOut, 55 P4RegisterEntry, 56 P4TableAction, 57 P4TableEntry, 58 P4TableMatch, 59 P4ValueSetEntry, 60) 61from .p4schema import P4ConfigAction, P4CounterUnit, P4Schema 62from .ports import SwitchPort, SwitchPortList 63from .runner import run 64from .switch import Switch, SwitchEvent, SwitchOptions 65 66Match = P4TableMatch 67"`Match` is an alias for P4TableMatch." 68 69Action = P4TableAction 70"`Action` is an alias for P4TableAction." 71 72IndirectAction = P4IndirectAction 73"`IndirectAction` is an alias for P4IndirectAction." 74 75__all__ = [ 76 "run", 77 "Controller", 78 "LoggerAdapter", 79 "MACAddress", 80 "P4ActionProfileGroup", 81 "P4ActionProfileMember", 82 "P4Client", 83 "P4ClientError", 84 "P4CloneSessionEntry", 85 "P4CounterData", 86 "P4CounterEntry", 87 "P4CounterUnit", 88 "P4DigestEntry", 89 "P4DigestList", 90 "P4DigestListAck", 91 "P4DirectCounterEntry", 92 "P4DirectMeterEntry", 93 "P4Error", 94 "P4ExternEntry", 95 "IndirectAction", # alias for P4IndirectAction 96 "P4IndirectAction", 97 "P4Member", 98 "P4MeterConfig", 99 "P4MeterCounterData", 100 "P4MeterEntry", 101 "P4MulticastGroupEntry", 102 "P4PacketIn", 103 "P4PacketOut", 104 "P4RegisterEntry", 105 "Action", # alias for P4TableAction 106 "P4TableAction", 107 "P4TableEntry", 108 "Match", # alias for P4TableMatch 109 "P4TableMatch", 110 "P4ValueSetEntry", 111 "P4ConfigAction", 112 "P4Schema", 113 "Switch", 114 "SwitchEvent", 115 "SwitchOptions", 116 "SwitchPort", 117 "SwitchPortList", 118 "GNMIClient", 119 "GNMIPath", 120 "GNMISubscription", 121 "GNMIUpdate", 122 "GRPCCredentialsTLS", 123 "GRPCStatusCode", 124]
53def run(coro: Coroutine[Any, Any, None]) -> None: 54 """`finsy.run` provides a useful wrapper around `asyncio.run`. 55 56 This function implements common boilerplate for running a Finsy application. 57 58 - Set up basic logging to stderr at the INFO log level. 59 - Set up a signal handler for SIGTERM that shuts down gracefully. 60 - Set up caching of P4Info data so common definitions are re-used. 61 62 Example: 63 64 ```python 65 import finsy as fy 66 67 async def main(): 68 async with fy.Switch("sw", "127.0.0.1:50001") as sw: 69 print(sw.p4info) 70 71 if __name__ == "__main__": 72 fy.run(main()) 73 ``` 74 75 If you choose to use `asyncio.run` instead, your P4Schema/P4Info objects 76 will not be eligible for sharing. You can create your own `P4SchemaCache` 77 context manager to implement this. 78 """ 79 try: 80 asyncio.run(_finsy_boilerplate(coro)) 81 except (KeyboardInterrupt, asyncio.CancelledError): 82 pass
finsy.run
provides a useful wrapper around asyncio.run
.
This function implements common boilerplate for running a Finsy application.
- Set up basic logging to stderr at the INFO log level.
- Set up a signal handler for SIGTERM that shuts down gracefully.
- Set up caching of P4Info data so common definitions are re-used.
Example:
import finsy as fy
async def main():
async with fy.Switch("sw", "127.0.0.1:50001") as sw:
print(sw.p4info)
if __name__ == "__main__":
fy.run(main())
If you choose to use asyncio.run
instead, your P4Schema/P4Info objects
will not be eligible for sharing. You can create your own P4SchemaCache
context manager to implement this.
30@final 31class Controller: 32 """Represents a collection of P4Runtime switches. 33 34 Each `Switch` in the Controller is identified by its name. Each name must 35 be unique. 36 37 ``` 38 switches = [ 39 fy.Switch("sw1", "10.0.0.1:50000"), 40 fy.Switch("sw2", "10.0.0.2:50000"), 41 ] 42 controller = fy.Controller(switches) 43 await controller.run() 44 ``` 45 46 A Controller can be running or stopped. There are two ways to run a 47 Controller. You can use the `Controller.run()` method, or you can use 48 a Controller as a context manager. 49 50 ``` 51 controller = fy.Controller(switches) 52 async with controller: 53 # Let controller run for 60 seconds. 54 await asyncio.sleep(60) 55 ``` 56 57 You can `add` or `remove` switches regardless of whether a Controller is 58 running or not. If the Controller is not running, adding or removing a Switch is 59 instantaneous. If the Controller is running, adding a Switch will start 60 it running asynchronously. Removing a Switch will schedule the Switch to stop, 61 but defer actual removal until the Switch has stopped asynchronously. 62 63 When a switch starts inside a Controller, it fires the `CONTROLLER_ENTER` 64 event. When a switch stops inside a Controller, it fires the `CONTROLLER_LEAVE` 65 event. 66 67 A Controller supports these methods to access its contents: 68 69 - len(controller): Return number of switches in the controller. 70 - controller[name]: Return the switch with the given name. 71 - controller.get(name): Return the switch with the given name, or None if not found. 72 73 You can iterate over the switches in a Controller using a for loop: 74 75 ``` 76 for switch in controller: 77 print(switch.name) 78 ``` 79 80 Any task or sub-task running inside a controller can retrieve its 81 Controller object using the `Controller.current()` method. 82 """ 83 84 _switches: dict[str, Switch] 85 _pending_removal: set[Switch] 86 _task_count: CountdownFuture 87 88 _control_task: asyncio.Task[Any] | None = None 89 "Keep track of controller's main task." 90 91 def __init__(self, switches: Iterable[Switch] = ()): 92 self._switches = {} 93 self._pending_removal = set() 94 self._task_count = CountdownFuture() 95 96 for switch in switches: 97 if switch.name in self._switches: 98 raise ValueError(f"Switch named {switch.name!r} already exists") 99 self._switches[switch.name] = switch 100 101 @property 102 def running(self) -> bool: 103 "True if Controller is running." 104 return self._control_task is not None 105 106 async def run(self) -> None: 107 "Run the controller." 108 async with self: 109 await wait_for_cancel() 110 111 def stop(self) -> None: 112 "Stop the controller if it is running." 113 if self._control_task is not None: 114 self._control_task.cancel() 115 116 async def __aenter__(self) -> Self: 117 "Run the controller as a context manager (see also run())." 118 assert not self.running, "Controller.__aenter__ is not re-entrant" 119 assert self._task_count.value() == 0 120 assert not self._pending_removal 121 122 self._control_task = asyncio.current_task() 123 _CONTROLLER.set(self) 124 125 try: 126 # Start each switch running. 127 for switch in self: 128 self._start_switch(switch) 129 except Exception: 130 self._control_task = None 131 _CONTROLLER.set(None) 132 raise 133 134 return self 135 136 async def __aexit__( 137 self, 138 _exc_type: type[BaseException] | None, 139 _exc_val: BaseException | None, 140 _exc_tb: TracebackType | None, 141 ) -> bool | None: 142 "Run the controller as a context manager (see also run())." 143 assert self.running 144 145 try: 146 # Stop all the switches. 147 for switch in self: 148 self._stop_switch(switch) 149 150 # Wait for switch tasks to finish. 151 await self._task_count.wait() 152 153 finally: 154 self._control_task = None 155 _CONTROLLER.set(None) 156 157 def add(self, switch: Switch) -> None: 158 """Add a switch to the controller. 159 160 If the controller is running, tell the switch to start. 161 """ 162 if switch.name in self._switches: 163 raise ValueError(f"Switch named {switch.name!r} already exists") 164 165 self._switches[switch.name] = switch 166 if self.running: 167 self._start_switch(switch) 168 169 def remove(self, switch: Switch) -> asyncio.Event: 170 """Remove a switch from the controller. 171 172 If the controller is running, tell the switch to stop and schedule it 173 for removal when it fully stops. 174 """ 175 name = switch.name 176 if self._switches.get(name, None) is not switch: 177 raise ValueError(f"Switch named {name!r} not found") 178 179 del self._switches[name] 180 181 event = asyncio.Event() 182 if self.running: 183 # When controller is running, event will complete when switch 184 # is actually stopped. 185 self._stop_switch(switch) 186 self._pending_removal.add(switch) 187 188 def _controller_leave(sw: Switch): 189 self._pending_removal.discard(sw) 190 event.set() 191 192 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 193 else: 194 # When controller is not running, event completes immediately. 195 event.set() 196 197 return event 198 199 def _start_switch(self, switch: Switch): 200 "Start the switch's control task." 201 LOGGER.debug("Controller._start_switch: %r", switch) 202 assert switch._control_task is None # pyright: ignore[reportPrivateUsage] 203 204 switch.ee.emit(SwitchEvent.CONTROLLER_ENTER, switch) 205 206 task = asyncio.create_task(switch.run(), name=f"fy:{switch.name}") 207 switch._control_task = task # pyright: ignore[reportPrivateUsage] 208 self._task_count.increment() 209 210 def _switch_done(done: asyncio.Task[Any]): 211 switch._control_task = None # pyright: ignore[reportPrivateUsage] 212 switch.ee.emit(SwitchEvent.CONTROLLER_LEAVE, switch) 213 self._task_count.decrement() 214 215 if not done.cancelled(): 216 ex = done.exception() 217 if ex is not None: 218 if not isinstance(ex, SwitchFailFastError): 219 # The `fail_fast` error has already been logged. If 220 # it's any other error, log it. (There shouldn't be 221 # any other error.) 222 LOGGER.critical( 223 "Controller task %r failed", 224 done.get_name(), 225 exc_info=ex, 226 ) 227 # Shutdown the program cleanly due to switch failure. 228 raise SystemExit(99) 229 230 task.add_done_callback(_switch_done) 231 232 def _stop_switch(self, switch: Switch): 233 "Stop the switch's control task." 234 LOGGER.debug("Controller._stop_switch: %r", switch) 235 236 if switch._control_task is not None: # pyright: ignore[reportPrivateUsage] 237 switch._control_task.cancel() # pyright: ignore[reportPrivateUsage] 238 239 def __len__(self) -> int: 240 "Return switch count." 241 return len(self._switches) 242 243 def __iter__(self) -> Iterator[Switch]: 244 "Iterate over switches." 245 return iter(self._switches.values()) 246 247 def __getitem__(self, name: str) -> Switch: 248 "Retrieve switch by name." 249 return self._switches[name] 250 251 def get(self, name: str) -> Switch | None: 252 "Retrieve switch by name, or return None if not found." 253 return self._switches.get(name) 254 255 @staticmethod 256 def current() -> "Controller": 257 "Return the current Controller object." 258 result = _CONTROLLER.get() 259 if result is None: 260 raise RuntimeError("controller does not exist") 261 return result
Represents a collection of P4Runtime switches.
Each Switch
in the Controller is identified by its name. Each name must
be unique.
switches = [
fy.Switch("sw1", "10.0.0.1:50000"),
fy.Switch("sw2", "10.0.0.2:50000"),
]
controller = fy.Controller(switches)
await controller.run()
A Controller can be running or stopped. There are two ways to run a
Controller. You can use the Controller.run()
method, or you can use
a Controller as a context manager.
controller = fy.Controller(switches)
async with controller:
# Let controller run for 60 seconds.
await asyncio.sleep(60)
You can add
or remove
switches regardless of whether a Controller is
running or not. If the Controller is not running, adding or removing a Switch is
instantaneous. If the Controller is running, adding a Switch will start
it running asynchronously. Removing a Switch will schedule the Switch to stop,
but defer actual removal until the Switch has stopped asynchronously.
When a switch starts inside a Controller, it fires the CONTROLLER_ENTER
event. When a switch stops inside a Controller, it fires the CONTROLLER_LEAVE
event.
A Controller supports these methods to access its contents:
- len(controller): Return number of switches in the controller.
- controller[name]: Return the switch with the given name.
- controller.get(name): Return the switch with the given name, or None if not found.
You can iterate over the switches in a Controller using a for loop:
for switch in controller:
print(switch.name)
Any task or sub-task running inside a controller can retrieve its
Controller object using the Controller.current()
method.
91 def __init__(self, switches: Iterable[Switch] = ()): 92 self._switches = {} 93 self._pending_removal = set() 94 self._task_count = CountdownFuture() 95 96 for switch in switches: 97 if switch.name in self._switches: 98 raise ValueError(f"Switch named {switch.name!r} already exists") 99 self._switches[switch.name] = switch
101 @property 102 def running(self) -> bool: 103 "True if Controller is running." 104 return self._control_task is not None
True if Controller is running.
106 async def run(self) -> None: 107 "Run the controller." 108 async with self: 109 await wait_for_cancel()
Run the controller.
111 def stop(self) -> None: 112 "Stop the controller if it is running." 113 if self._control_task is not None: 114 self._control_task.cancel()
Stop the controller if it is running.
116 async def __aenter__(self) -> Self: 117 "Run the controller as a context manager (see also run())." 118 assert not self.running, "Controller.__aenter__ is not re-entrant" 119 assert self._task_count.value() == 0 120 assert not self._pending_removal 121 122 self._control_task = asyncio.current_task() 123 _CONTROLLER.set(self) 124 125 try: 126 # Start each switch running. 127 for switch in self: 128 self._start_switch(switch) 129 except Exception: 130 self._control_task = None 131 _CONTROLLER.set(None) 132 raise 133 134 return self
Run the controller as a context manager (see also run()).
157 def add(self, switch: Switch) -> None: 158 """Add a switch to the controller. 159 160 If the controller is running, tell the switch to start. 161 """ 162 if switch.name in self._switches: 163 raise ValueError(f"Switch named {switch.name!r} already exists") 164 165 self._switches[switch.name] = switch 166 if self.running: 167 self._start_switch(switch)
Add a switch to the controller.
If the controller is running, tell the switch to start.
169 def remove(self, switch: Switch) -> asyncio.Event: 170 """Remove a switch from the controller. 171 172 If the controller is running, tell the switch to stop and schedule it 173 for removal when it fully stops. 174 """ 175 name = switch.name 176 if self._switches.get(name, None) is not switch: 177 raise ValueError(f"Switch named {name!r} not found") 178 179 del self._switches[name] 180 181 event = asyncio.Event() 182 if self.running: 183 # When controller is running, event will complete when switch 184 # is actually stopped. 185 self._stop_switch(switch) 186 self._pending_removal.add(switch) 187 188 def _controller_leave(sw: Switch): 189 self._pending_removal.discard(sw) 190 event.set() 191 192 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 193 else: 194 # When controller is not running, event completes immediately. 195 event.set() 196 197 return event
Remove a switch from the controller.
If the controller is running, tell the switch to stop and schedule it for removal when it fully stops.
243 def __iter__(self) -> Iterator[Switch]: 244 "Iterate over switches." 245 return iter(self._switches.values())
Iterate over switches.
247 def __getitem__(self, name: str) -> Switch: 248 "Retrieve switch by name." 249 return self._switches[name]
Retrieve switch by name.
251 def get(self, name: str) -> Switch | None: 252 "Retrieve switch by name, or return None if not found." 253 return self._switches.get(name)
Retrieve switch by name, or return None if not found.
68class LoggerAdapter(_BaseLoggerAdapter): 69 """Custom log adapter to include the name of the current task.""" 70 71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs 81 82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name represention for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
Custom log adapter to include the name of the current task.
71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs
Process the logging message and keyword arguments passed in to a logging call to insert contextual information.
82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name represention for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
INFO level uses a concise task name represention for readability.
Inherited Members
- logging.LoggerAdapter
- LoggerAdapter
- logger
- extra
- debug
- warning
- warn
- error
- exception
- critical
- log
- isEnabledFor
- setLevel
- getEffectiveLevel
- hasHandlers
- manager
- name
24@functools.total_ordering 25class MACAddress: 26 """Concrete class for a MAC address.""" 27 28 __slots__ = ("_mac", "__weakref__") 29 _mac: int 30 31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value)) 44 45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac) 49 50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH 54 55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0 59 60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0 64 65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private 69 70 @property 71 def is_unspecified(self) -> bool: 72 "Return true if MAC address is all zeros." 73 return self._mac == 0 74 75 @property 76 def is_broadcast(self) -> bool: 77 "Return true if MAC address is the broadcast address." 78 return self._mac == (1 << _BIT_WIDTH) - 1 79 80 def __int__(self) -> int: 81 return self._mac 82 83 def __eq__(self, rhs: object) -> bool: 84 if not isinstance(rhs, MACAddress): 85 return NotImplemented 86 return self._mac == rhs._mac 87 88 def __lt__(self, rhs: object) -> bool: 89 if not isinstance(rhs, MACAddress): 90 return NotImplemented 91 return self._mac < rhs._mac 92 93 def __repr__(self) -> str: 94 return f"MACAddress({_to_string(self._mac)!r})" 95 96 def __str__(self) -> str: 97 return _to_string(self._mac) 98 99 def __hash__(self) -> int: 100 return hash(hex(self._mac))
Concrete class for a MAC address.
31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value))
Construct a MAC address from a string, integer or bytes object.
45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac)
Return the MAC address a byte string.
50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH
Return the maximum prefix length (48 bits).
55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0
Return true if MAC address has the multicast bit set.
60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0
Return true if MAC address has the locally administered bit set.
65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private
Return true if the locally administered bit is not set.
1445@decodable("action_profile_group") 1446@dataclass(slots=True) 1447class P4ActionProfileGroup(_P4Writable): 1448 "Represents a P4Runtime ActionProfileGroup." 1449 1450 action_profile_id: str = "" 1451 _: KW_ONLY 1452 group_id: int = 0 1453 max_size: int = 0 1454 members: Sequence[P4Member] | None = None 1455 1456 def encode(self, schema: P4Schema) -> p4r.Entity: 1457 "Encode P4ActionProfileGroup as protobuf." 1458 if not self.action_profile_id: 1459 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1460 1461 profile = schema.action_profiles[self.action_profile_id] 1462 1463 if self.members is not None: 1464 members = [member.encode() for member in self.members] 1465 else: 1466 members = None 1467 1468 entry = p4r.ActionProfileGroup( 1469 action_profile_id=profile.id, 1470 group_id=self.group_id, 1471 members=members, 1472 max_size=self.max_size, 1473 ) 1474 return p4r.Entity(action_profile_group=entry) 1475 1476 @classmethod 1477 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1478 "Decode protobuf to ActionProfileGroup data." 1479 entry = msg.action_profile_group 1480 if entry.action_profile_id == 0: 1481 return cls() 1482 1483 profile = schema.action_profiles[entry.action_profile_id] 1484 1485 if entry.members: 1486 members = [P4Member.decode(member) for member in entry.members] 1487 else: 1488 members = None 1489 1490 return cls( 1491 action_profile_id=profile.alias, 1492 group_id=entry.group_id, 1493 max_size=entry.max_size, 1494 members=members, 1495 ) 1496 1497 def action_str(self, _schema: P4Schema) -> str: 1498 "Return string representation of the weighted members." 1499 if not self.members: 1500 return "" 1501 1502 return " ".join( 1503 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1504 )
Represents a P4Runtime ActionProfileGroup.
1456 def encode(self, schema: P4Schema) -> p4r.Entity: 1457 "Encode P4ActionProfileGroup as protobuf." 1458 if not self.action_profile_id: 1459 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1460 1461 profile = schema.action_profiles[self.action_profile_id] 1462 1463 if self.members is not None: 1464 members = [member.encode() for member in self.members] 1465 else: 1466 members = None 1467 1468 entry = p4r.ActionProfileGroup( 1469 action_profile_id=profile.id, 1470 group_id=self.group_id, 1471 members=members, 1472 max_size=self.max_size, 1473 ) 1474 return p4r.Entity(action_profile_group=entry)
Encode P4ActionProfileGroup as protobuf.
1476 @classmethod 1477 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1478 "Decode protobuf to ActionProfileGroup data." 1479 entry = msg.action_profile_group 1480 if entry.action_profile_id == 0: 1481 return cls() 1482 1483 profile = schema.action_profiles[entry.action_profile_id] 1484 1485 if entry.members: 1486 members = [P4Member.decode(member) for member in entry.members] 1487 else: 1488 members = None 1489 1490 return cls( 1491 action_profile_id=profile.alias, 1492 group_id=entry.group_id, 1493 max_size=entry.max_size, 1494 members=members, 1495 )
Decode protobuf to ActionProfileGroup data.
1497 def action_str(self, _schema: P4Schema) -> str: 1498 "Return string representation of the weighted members." 1499 if not self.members: 1500 return "" 1501 1502 return " ".join( 1503 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1504 )
Return string representation of the weighted members.
Inherited Members
- finsy.p4entity._P4Writable
- encode_update
1343@decodable("action_profile_member") 1344@dataclass(slots=True) 1345class P4ActionProfileMember(_P4Writable): 1346 "Represents a P4Runtime ActionProfileMember." 1347 1348 action_profile_id: str = "" 1349 _: KW_ONLY 1350 member_id: int = 0 1351 action: P4TableAction | None = None 1352 1353 def encode(self, schema: P4Schema) -> p4r.Entity: 1354 "Encode P4ActionProfileMember as protobuf." 1355 if not self.action_profile_id: 1356 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1357 1358 profile = schema.action_profiles[self.action_profile_id] 1359 1360 if self.action: 1361 action = self.action.encode_action(schema) 1362 else: 1363 action = None 1364 1365 entry = p4r.ActionProfileMember( 1366 action_profile_id=profile.id, 1367 member_id=self.member_id, 1368 action=action, 1369 ) 1370 return p4r.Entity(action_profile_member=entry) 1371 1372 @classmethod 1373 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1374 "Decode protobuf to ActionProfileMember data." 1375 entry = msg.action_profile_member 1376 if entry.action_profile_id == 0: 1377 return cls() 1378 1379 profile = schema.action_profiles[entry.action_profile_id] 1380 1381 if entry.HasField("action"): 1382 action = P4TableAction.decode_action(entry.action, schema) 1383 else: 1384 action = None 1385 1386 return cls( 1387 action_profile_id=profile.alias, 1388 member_id=entry.member_id, 1389 action=action, 1390 ) 1391 1392 def action_str(self, schema: P4Schema) -> str: 1393 "Format the action as a human-readable, canonical string." 1394 if self.action is None: 1395 return NOACTION_STR 1396 return self.action.format_str(schema)
Represents a P4Runtime ActionProfileMember.
1353 def encode(self, schema: P4Schema) -> p4r.Entity: 1354 "Encode P4ActionProfileMember as protobuf." 1355 if not self.action_profile_id: 1356 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1357 1358 profile = schema.action_profiles[self.action_profile_id] 1359 1360 if self.action: 1361 action = self.action.encode_action(schema) 1362 else: 1363 action = None 1364 1365 entry = p4r.ActionProfileMember( 1366 action_profile_id=profile.id, 1367 member_id=self.member_id, 1368 action=action, 1369 ) 1370 return p4r.Entity(action_profile_member=entry)
Encode P4ActionProfileMember as protobuf.
1372 @classmethod 1373 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1374 "Decode protobuf to ActionProfileMember data." 1375 entry = msg.action_profile_member 1376 if entry.action_profile_id == 0: 1377 return cls() 1378 1379 profile = schema.action_profiles[entry.action_profile_id] 1380 1381 if entry.HasField("action"): 1382 action = P4TableAction.decode_action(entry.action, schema) 1383 else: 1384 action = None 1385 1386 return cls( 1387 action_profile_id=profile.alias, 1388 member_id=entry.member_id, 1389 action=action, 1390 )
Decode protobuf to ActionProfileMember data.
1392 def action_str(self, schema: P4Schema) -> str: 1393 "Format the action as a human-readable, canonical string." 1394 if self.action is None: 1395 return NOACTION_STR 1396 return self.action.format_str(schema)
Format the action as a human-readable, canonical string.
Inherited Members
- finsy.p4entity._P4Writable
- encode_update
239class P4Client: 240 "Implements a P4Runtime client." 241 242 _address: str 243 _credentials: GRPCCredentialsTLS | None 244 _wait_for_ready: bool 245 _channel: grpc.aio.Channel | None = None 246 _stub: p4r_grpc.P4RuntimeStub | None = None 247 _stream: _P4StreamTypeAlias | None = None 248 _complete_request: Callable[[pbutil.PBMessage], None] | None = None 249 250 _schema: P4Schema | None = None 251 "Annotate log messages using this optional P4Info schema." 252 253 def __init__( 254 self, 255 address: str, 256 credentials: GRPCCredentialsTLS | None = None, 257 *, 258 wait_for_ready: bool = True, 259 ) -> None: 260 self._address = address 261 self._credentials = credentials 262 self._wait_for_ready = wait_for_ready 263 264 @property 265 def channel(self) -> grpc.aio.Channel | None: 266 "Return the GRPC channel object, or None if the channel is not open." 267 return self._channel 268 269 async def __aenter__(self) -> Self: 270 await self.open() 271 return self 272 273 async def __aexit__(self, *args: Any) -> bool | None: 274 await self.close() 275 276 async def open( 277 self, 278 *, 279 schema: P4Schema | None = None, 280 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 281 ) -> None: 282 """Open the client channel. 283 284 Note: This method is `async` for forward-compatible reasons. 285 """ 286 assert self._channel is None 287 assert self._stub is None 288 289 # Increase max_metadata_size from 8 KB to 64 KB. 290 options = GRPCOptions( 291 max_metadata_size=64 * 1024, # 64 kilobytes 292 max_reconnect_backoff_ms=15000, # 15.0 seconds 293 ) 294 295 self._channel = grpc_channel( 296 self._address, 297 credentials=self._credentials, 298 options=options, 299 client_type="P4Client", 300 ) 301 302 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 303 self._schema = schema 304 self._complete_request = complete_request 305 306 async def close(self) -> None: 307 "Close the client channel." 308 if self._channel is not None: 309 LOGGER.debug("P4Client: close channel %r", self._address) 310 311 if self._stream is not None: 312 self._stream.cancel() 313 self._stream = None 314 315 await self._channel.close() 316 self._channel = None 317 self._stub = None 318 self._schema = None 319 self._complete_request = None 320 321 async def send(self, msg: p4r.StreamMessageRequest) -> None: 322 """Send a message to the stream.""" 323 assert self._stub is not None 324 325 if not self._stream or self._stream.done(): 326 self._stream = cast( 327 _P4StreamTypeAlias, 328 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 329 ) 330 331 self._log_msg(msg) 332 333 try: 334 await self._stream.write(msg) 335 except grpc.RpcError as ex: 336 raise P4ClientError(ex, "send") from None 337 338 async def receive(self) -> p4r.StreamMessageResponse: 339 """Read a message from the stream.""" 340 assert self._stream is not None 341 342 try: 343 msg = cast( 344 p4r.StreamMessageResponse, 345 await self._stream.read(), # type: ignore 346 ) 347 if msg is GRPC_EOF: 348 # Treat EOF as a protocol violation. 349 raise RuntimeError("P4Client.receive got EOF!") 350 351 except grpc.RpcError as ex: 352 raise P4ClientError(ex, "receive") from None 353 354 self._log_msg(msg) 355 return msg 356 357 @overload 358 async def request( 359 self, msg: p4r.WriteRequest 360 ) -> p4r.WriteResponse: ... # pragma: no cover 361 362 @overload 363 async def request( 364 self, msg: p4r.GetForwardingPipelineConfigRequest 365 ) -> p4r.GetForwardingPipelineConfigResponse: ... # pragma: no cover 366 367 @overload 368 async def request( 369 self, msg: p4r.SetForwardingPipelineConfigRequest 370 ) -> p4r.SetForwardingPipelineConfigResponse: ... # pragma: no cover 371 372 @overload 373 async def request( 374 self, msg: p4r.CapabilitiesRequest 375 ) -> p4r.CapabilitiesResponse: ... # pragma: no cover 376 377 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 378 "Send a unary-unary P4Runtime request and wait for the response." 379 assert self._stub is not None 380 381 if self._complete_request: 382 self._complete_request(msg) 383 384 msg_type = type(msg).__name__ 385 assert msg_type.endswith("Request") 386 rpc_method = getattr(self._stub, msg_type[:-7]) 387 388 self._log_msg(msg) 389 try: 390 reply = await rpc_method( 391 msg, 392 timeout=_DEFAULT_RPC_TIMEOUT, 393 ) 394 except grpc.RpcError as ex: 395 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 396 397 self._log_msg(reply) 398 return reply 399 400 async def request_iter( 401 self, msg: p4r.ReadRequest 402 ) -> AsyncIterator[p4r.ReadResponse]: 403 "Send a unary-stream P4Runtime read request and wait for the responses." 404 assert self._stub is not None 405 406 if self._complete_request: 407 self._complete_request(msg) 408 409 msg_type = type(msg).__name__ 410 assert msg_type.endswith("Request") 411 rpc_method = getattr(self._stub, msg_type[:-7]) 412 413 self._log_msg(msg) 414 try: 415 async for reply in rpc_method( 416 msg, 417 timeout=_DEFAULT_RPC_TIMEOUT, 418 ): 419 self._log_msg(reply) 420 yield reply 421 except grpc.RpcError as ex: 422 raise P4ClientError(ex, msg_type) from None 423 424 def _log_msg(self, msg: pbutil.PBMessage) -> None: 425 "Log a P4Runtime request or response." 426 pbutil.log_msg(self._channel, msg, self._schema)
Implements a P4Runtime client.
264 @property 265 def channel(self) -> grpc.aio.Channel | None: 266 "Return the GRPC channel object, or None if the channel is not open." 267 return self._channel
Return the GRPC channel object, or None if the channel is not open.
276 async def open( 277 self, 278 *, 279 schema: P4Schema | None = None, 280 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 281 ) -> None: 282 """Open the client channel. 283 284 Note: This method is `async` for forward-compatible reasons. 285 """ 286 assert self._channel is None 287 assert self._stub is None 288 289 # Increase max_metadata_size from 8 KB to 64 KB. 290 options = GRPCOptions( 291 max_metadata_size=64 * 1024, # 64 kilobytes 292 max_reconnect_backoff_ms=15000, # 15.0 seconds 293 ) 294 295 self._channel = grpc_channel( 296 self._address, 297 credentials=self._credentials, 298 options=options, 299 client_type="P4Client", 300 ) 301 302 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 303 self._schema = schema 304 self._complete_request = complete_request
Open the client channel.
Note: This method is async
for forward-compatible reasons.
306 async def close(self) -> None: 307 "Close the client channel." 308 if self._channel is not None: 309 LOGGER.debug("P4Client: close channel %r", self._address) 310 311 if self._stream is not None: 312 self._stream.cancel() 313 self._stream = None 314 315 await self._channel.close() 316 self._channel = None 317 self._stub = None 318 self._schema = None 319 self._complete_request = None
Close the client channel.
321 async def send(self, msg: p4r.StreamMessageRequest) -> None: 322 """Send a message to the stream.""" 323 assert self._stub is not None 324 325 if not self._stream or self._stream.done(): 326 self._stream = cast( 327 _P4StreamTypeAlias, 328 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 329 ) 330 331 self._log_msg(msg) 332 333 try: 334 await self._stream.write(msg) 335 except grpc.RpcError as ex: 336 raise P4ClientError(ex, "send") from None
Send a message to the stream.
338 async def receive(self) -> p4r.StreamMessageResponse: 339 """Read a message from the stream.""" 340 assert self._stream is not None 341 342 try: 343 msg = cast( 344 p4r.StreamMessageResponse, 345 await self._stream.read(), # type: ignore 346 ) 347 if msg is GRPC_EOF: 348 # Treat EOF as a protocol violation. 349 raise RuntimeError("P4Client.receive got EOF!") 350 351 except grpc.RpcError as ex: 352 raise P4ClientError(ex, "receive") from None 353 354 self._log_msg(msg) 355 return msg
Read a message from the stream.
377 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 378 "Send a unary-unary P4Runtime request and wait for the response." 379 assert self._stub is not None 380 381 if self._complete_request: 382 self._complete_request(msg) 383 384 msg_type = type(msg).__name__ 385 assert msg_type.endswith("Request") 386 rpc_method = getattr(self._stub, msg_type[:-7]) 387 388 self._log_msg(msg) 389 try: 390 reply = await rpc_method( 391 msg, 392 timeout=_DEFAULT_RPC_TIMEOUT, 393 ) 394 except grpc.RpcError as ex: 395 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 396 397 self._log_msg(reply) 398 return reply
Send a unary-unary P4Runtime request and wait for the response.
400 async def request_iter( 401 self, msg: p4r.ReadRequest 402 ) -> AsyncIterator[p4r.ReadResponse]: 403 "Send a unary-stream P4Runtime read request and wait for the responses." 404 assert self._stub is not None 405 406 if self._complete_request: 407 self._complete_request(msg) 408 409 msg_type = type(msg).__name__ 410 assert msg_type.endswith("Request") 411 rpc_method = getattr(self._stub, msg_type[:-7]) 412 413 self._log_msg(msg) 414 try: 415 async for reply in rpc_method( 416 msg, 417 timeout=_DEFAULT_RPC_TIMEOUT, 418 ): 419 self._log_msg(reply) 420 yield reply 421 except grpc.RpcError as ex: 422 raise P4ClientError(ex, msg_type) from None
Send a unary-stream P4Runtime read request and wait for the responses.
125class P4ClientError(Exception): 126 "Wrap `grpc.RpcError`." 127 128 _operation: str 129 _status: P4RpcStatus 130 _outer_code: GRPCStatusCode 131 _outer_message: str 132 _schema: P4Schema | None = None # for annotating sub-value details 133 134 def __init__( 135 self, 136 error: grpc.RpcError, 137 operation: str, 138 *, 139 msg: pbutil.PBMessage | None = None, 140 schema: P4Schema | None = None, 141 ): 142 super().__init__() 143 assert isinstance(error, grpc.aio.AioRpcError) 144 145 self._operation = operation 146 self._status = P4RpcStatus.from_rpc_error(error) 147 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 148 self._outer_message = error.details() or "" 149 150 if msg is not None and self.details: 151 self._attach_details(msg) 152 self._schema = schema 153 154 LOGGER.debug("%s failed: %s", operation, self) 155 156 @property 157 def code(self) -> GRPCStatusCode: 158 "GRPC status code." 159 return self._status.code 160 161 @property 162 def message(self) -> str: 163 "GRPC status message." 164 return self._status.message 165 166 @property 167 def details(self) -> dict[int, P4Error]: 168 "Optional details about P4Runtime Write updates that failed." 169 return self._status.details 170 171 @property 172 def is_not_found_only(self) -> bool: 173 """Return True if the only sub-errors are NOT_FOUND.""" 174 if self.code != GRPCStatusCode.UNKNOWN: 175 return False 176 177 for err in self.details.values(): 178 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 179 return False 180 return True 181 182 @property 183 def is_election_id_used(self) -> bool: 184 """Return true if error is that election ID is in use.""" 185 return ( 186 self.code == GRPCStatusCode.INVALID_ARGUMENT 187 and _ELECTION_ID_EXISTS.search(self.message) is not None 188 ) 189 190 @property 191 def is_pipeline_missing(self) -> bool: 192 "Return true if error is that no pipeline config is set." 193 return ( 194 self.code == GRPCStatusCode.FAILED_PRECONDITION 195 and _NO_PIPELINE_CONFIG.search(self.message) is not None 196 ) 197 198 def _attach_details(self, msg: pbutil.PBMessage): 199 "Attach the subvalue(s) from the message that caused the error." 200 if isinstance(msg, p4r.WriteRequest): 201 for key, value in self.details.items(): 202 value.subvalue = msg.updates[key] 203 204 def __str__(self) -> str: 205 "Return string representation of P4ClientError object." 206 if self.details: 207 208 def _show(value: P4Error): 209 s = repr(value) 210 if self._schema: 211 s = pbutil.log_annotate(s, self._schema) 212 s = s.replace("\n}\n)", "\n})") # tidy multiline repr 213 return s.replace("\n", "\n" + " " * 6) # indent 6 spaces 214 215 items = [""] + [ 216 f" [details.{key}] {_show(val)}" for key, val in self.details.items() 217 ] 218 details = "\n".join(items) 219 else: 220 details = "" 221 222 if self.code == self._outer_code and self.message == self._outer_message: 223 return ( 224 f"operation={self._operation} code={self.code!r} " 225 f"message={self.message!r} {details}" 226 ) 227 return ( 228 f"code={self.code!r} message={self.message!r} " 229 f"details={self.details!r} operation={self._operation} " 230 f"_outer_message={self._outer_message!r} _outer_code={self._outer_code!r}" 231 )
Wrap grpc.RpcError
.
134 def __init__( 135 self, 136 error: grpc.RpcError, 137 operation: str, 138 *, 139 msg: pbutil.PBMessage | None = None, 140 schema: P4Schema | None = None, 141 ): 142 super().__init__() 143 assert isinstance(error, grpc.aio.AioRpcError) 144 145 self._operation = operation 146 self._status = P4RpcStatus.from_rpc_error(error) 147 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 148 self._outer_message = error.details() or "" 149 150 if msg is not None and self.details: 151 self._attach_details(msg) 152 self._schema = schema 153 154 LOGGER.debug("%s failed: %s", operation, self)
156 @property 157 def code(self) -> GRPCStatusCode: 158 "GRPC status code." 159 return self._status.code
GRPC status code.
161 @property 162 def message(self) -> str: 163 "GRPC status message." 164 return self._status.message
GRPC status message.
166 @property 167 def details(self) -> dict[int, P4Error]: 168 "Optional details about P4Runtime Write updates that failed." 169 return self._status.details
Optional details about P4Runtime Write updates that failed.
171 @property 172 def is_not_found_only(self) -> bool: 173 """Return True if the only sub-errors are NOT_FOUND.""" 174 if self.code != GRPCStatusCode.UNKNOWN: 175 return False 176 177 for err in self.details.values(): 178 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 179 return False 180 return True
Return True if the only sub-errors are NOT_FOUND.
182 @property 183 def is_election_id_used(self) -> bool: 184 """Return true if error is that election ID is in use.""" 185 return ( 186 self.code == GRPCStatusCode.INVALID_ARGUMENT 187 and _ELECTION_ID_EXISTS.search(self.message) is not None 188 )
Return true if error is that election ID is in use.
190 @property 191 def is_pipeline_missing(self) -> bool: 192 "Return true if error is that no pipeline config is set." 193 return ( 194 self.code == GRPCStatusCode.FAILED_PRECONDITION 195 and _NO_PIPELINE_CONFIG.search(self.message) is not None 196 )
Return true if error is that no pipeline config is set.
Inherited Members
- builtins.BaseException
- with_traceback
- args
1254@decodable("clone_session_entry") 1255@dataclass(slots=True) 1256class P4CloneSessionEntry(_P4Writable): 1257 "Represents a P4Runtime CloneSessionEntry." 1258 1259 session_id: int = 0 1260 _: KW_ONLY 1261 class_of_service: int = 0 1262 packet_length_bytes: int = 0 1263 replicas: Sequence[_ReplicaType] = () 1264 1265 def encode(self, schema: P4Schema) -> p4r.Entity: 1266 "Encode CloneSessionEntry data as protobuf." 1267 entry = p4r.CloneSessionEntry( 1268 session_id=self.session_id, 1269 class_of_service=self.class_of_service, 1270 packet_length_bytes=self.packet_length_bytes, 1271 replicas=[encode_replica(replica) for replica in self.replicas], 1272 ) 1273 return p4r.Entity( 1274 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1275 clone_session_entry=entry 1276 ) 1277 ) 1278 1279 @classmethod 1280 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1281 "Decode protobuf to CloneSessionEntry data." 1282 entry = msg.packet_replication_engine_entry.clone_session_entry 1283 return cls( 1284 session_id=entry.session_id, 1285 class_of_service=entry.class_of_service, 1286 packet_length_bytes=entry.packet_length_bytes, 1287 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1288 ) 1289 1290 def replicas_str(self) -> str: 1291 "Format the replicas as a human-readable, canonical string." 1292 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime CloneSessionEntry.
1265 def encode(self, schema: P4Schema) -> p4r.Entity: 1266 "Encode CloneSessionEntry data as protobuf." 1267 entry = p4r.CloneSessionEntry( 1268 session_id=self.session_id, 1269 class_of_service=self.class_of_service, 1270 packet_length_bytes=self.packet_length_bytes, 1271 replicas=[encode_replica(replica) for replica in self.replicas], 1272 ) 1273 return p4r.Entity( 1274 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1275 clone_session_entry=entry 1276 ) 1277 )
Encode CloneSessionEntry data as protobuf.
1279 @classmethod 1280 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1281 "Decode protobuf to CloneSessionEntry data." 1282 entry = msg.packet_replication_engine_entry.clone_session_entry 1283 return cls( 1284 session_id=entry.session_id, 1285 class_of_service=entry.class_of_service, 1286 packet_length_bytes=entry.packet_length_bytes, 1287 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1288 )
Decode protobuf to CloneSessionEntry data.
1290 def replicas_str(self) -> str: 1291 "Format the replicas as a human-readable, canonical string." 1292 return " ".join(format_replica(rep) for rep in self.replicas)
Format the replicas as a human-readable, canonical string.
Inherited Members
- finsy.p4entity._P4Writable
- encode_update
813@dataclass(kw_only=True, slots=True) 814class P4CounterData: 815 """Represents a P4Runtime object that keeps statistics of bytes and packets. 816 817 Attributes: 818 byte_count (int): the number of octets 819 packet_count (int): the number of packets 820 821 See Also: 822 - P4TableEntry 823 - P4MeterCounterData 824 - P4CounterEntry 825 - P4DirectCounterEntry 826 """ 827 828 byte_count: int = 0 829 "The number of octets." 830 packet_count: int = 0 831 "The number of packets." 832 833 def encode(self) -> p4r.CounterData: 834 "Encode object as CounterData." 835 return p4r.CounterData( 836 byte_count=self.byte_count, packet_count=self.packet_count 837 ) 838 839 @classmethod 840 def decode(cls, msg: p4r.CounterData) -> Self: 841 "Decode CounterData." 842 return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)
Represents a P4Runtime object that keeps statistics of bytes and packets.
Attributes: byte_count (int): the number of octets packet_count (int): the number of packets
See Also: - P4TableEntry - P4MeterCounterData - P4CounterEntry - P4DirectCounterEntry
1640@decodable("counter_entry") 1641@dataclass(slots=True) 1642class P4CounterEntry(_P4ModifyOnly): 1643 "Represents a P4Runtime CounterEntry." 1644 1645 counter_id: str = "" 1646 _: KW_ONLY 1647 index: int | None = None 1648 data: P4CounterData | None = None 1649 1650 @property 1651 def packet_count(self) -> int: 1652 "Packet count from counter data (or 0 if there is no data)." 1653 if self.data is not None: 1654 return self.data.packet_count 1655 return 0 1656 1657 @property 1658 def byte_count(self) -> int: 1659 "Byte count from counter data (or 0 if there is no data)." 1660 if self.data is not None: 1661 return self.data.byte_count 1662 return 0 1663 1664 def encode(self, schema: P4Schema) -> p4r.Entity: 1665 "Encode P4CounterEntry as protobuf." 1666 if not self.counter_id: 1667 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1668 1669 counter = schema.counters[self.counter_id] 1670 1671 if self.index is not None: 1672 index = p4r.Index(index=self.index) 1673 else: 1674 index = None 1675 1676 if self.data is not None: 1677 data = self.data.encode() 1678 else: 1679 data = None 1680 1681 entry = p4r.CounterEntry( 1682 counter_id=counter.id, 1683 index=index, 1684 data=data, 1685 ) 1686 return p4r.Entity(counter_entry=entry) 1687 1688 @classmethod 1689 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1690 "Decode protobuf to P4CounterEntry." 1691 entry = msg.counter_entry 1692 if not entry.counter_id: 1693 return cls() 1694 1695 counter = schema.counters[entry.counter_id] 1696 1697 if entry.HasField("index"): 1698 index = entry.index.index 1699 else: 1700 index = None 1701 1702 if entry.HasField("data"): 1703 data = P4CounterData.decode(entry.data) 1704 else: 1705 data = None 1706 1707 return cls(counter_id=counter.alias, index=index, data=data)
Represents a P4Runtime CounterEntry.
1650 @property 1651 def packet_count(self) -> int: 1652 "Packet count from counter data (or 0 if there is no data)." 1653 if self.data is not None: 1654 return self.data.packet_count 1655 return 0
Packet count from counter data (or 0 if there is no data).
1657 @property 1658 def byte_count(self) -> int: 1659 "Byte count from counter data (or 0 if there is no data)." 1660 if self.data is not None: 1661 return self.data.byte_count 1662 return 0
Byte count from counter data (or 0 if there is no data).
1664 def encode(self, schema: P4Schema) -> p4r.Entity: 1665 "Encode P4CounterEntry as protobuf." 1666 if not self.counter_id: 1667 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1668 1669 counter = schema.counters[self.counter_id] 1670 1671 if self.index is not None: 1672 index = p4r.Index(index=self.index) 1673 else: 1674 index = None 1675 1676 if self.data is not None: 1677 data = self.data.encode() 1678 else: 1679 data = None 1680 1681 entry = p4r.CounterEntry( 1682 counter_id=counter.id, 1683 index=index, 1684 data=data, 1685 ) 1686 return p4r.Entity(counter_entry=entry)
Encode P4CounterEntry as protobuf.
1688 @classmethod 1689 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1690 "Decode protobuf to P4CounterEntry." 1691 entry = msg.counter_entry 1692 if not entry.counter_id: 1693 return cls() 1694 1695 counter = schema.counters[entry.counter_id] 1696 1697 if entry.HasField("index"): 1698 index = entry.index.index 1699 else: 1700 index = None 1701 1702 if entry.HasField("data"): 1703 data = P4CounterData.decode(entry.data) 1704 else: 1705 data = None 1706 1707 return cls(counter_id=counter.alias, index=index, data=data)
Decode protobuf to P4CounterEntry.
Inherited Members
- finsy.p4entity._P4ModifyOnly
- encode_update
79class P4CounterUnit(_EnumBase): 80 "IntEnum equivalent to `p4i.CounterSpec.Unit`." 81 UNSPECIFIED = p4i.CounterSpec.Unit.UNSPECIFIED 82 BYTES = p4i.CounterSpec.Unit.BYTES 83 PACKETS = p4i.CounterSpec.Unit.PACKETS 84 BOTH = p4i.CounterSpec.Unit.BOTH
IntEnum equivalent to p4i.CounterSpec.Unit
.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
1295@decodable("digest_entry") 1296@dataclass(slots=True) 1297class P4DigestEntry(_P4Writable): 1298 "Represents a P4Runtime DigestEntry." 1299 1300 digest_id: str = "" 1301 _: KW_ONLY 1302 max_list_size: int = 0 1303 max_timeout_ns: int = 0 1304 ack_timeout_ns: int = 0 1305 1306 def encode(self, schema: P4Schema) -> p4r.Entity: 1307 "Encode DigestEntry data as protobuf." 1308 if not self.digest_id: 1309 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1310 1311 digest = schema.digests[self.digest_id] 1312 1313 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1314 config = None 1315 else: 1316 config = p4r.DigestEntry.Config( 1317 max_timeout_ns=self.max_timeout_ns, 1318 max_list_size=self.max_list_size, 1319 ack_timeout_ns=self.ack_timeout_ns, 1320 ) 1321 1322 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1323 return p4r.Entity(digest_entry=entry) 1324 1325 @classmethod 1326 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1327 "Decode protobuf to DigestEntry data." 1328 entry = msg.digest_entry 1329 if entry.digest_id == 0: 1330 return cls() 1331 1332 digest = schema.digests[entry.digest_id] 1333 1334 config = entry.config 1335 return cls( 1336 digest.alias, 1337 max_list_size=config.max_list_size, 1338 max_timeout_ns=config.max_timeout_ns, 1339 ack_timeout_ns=config.ack_timeout_ns, 1340 )
Represents a P4Runtime DigestEntry.
1306 def encode(self, schema: P4Schema) -> p4r.Entity: 1307 "Encode DigestEntry data as protobuf." 1308 if not self.digest_id: 1309 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1310 1311 digest = schema.digests[self.digest_id] 1312 1313 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1314 config = None 1315 else: 1316 config = p4r.DigestEntry.Config( 1317 max_timeout_ns=self.max_timeout_ns, 1318 max_list_size=self.max_list_size, 1319 ack_timeout_ns=self.ack_timeout_ns, 1320 ) 1321 1322 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1323 return p4r.Entity(digest_entry=entry)
Encode DigestEntry data as protobuf.
1325 @classmethod 1326 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1327 "Decode protobuf to DigestEntry data." 1328 entry = msg.digest_entry 1329 if entry.digest_id == 0: 1330 return cls() 1331 1332 digest = schema.digests[entry.digest_id] 1333 1334 config = entry.config 1335 return cls( 1336 digest.alias, 1337 max_list_size=config.max_list_size, 1338 max_timeout_ns=config.max_timeout_ns, 1339 ack_timeout_ns=config.ack_timeout_ns, 1340 )
Decode protobuf to DigestEntry data.
Inherited Members
- finsy.p4entity._P4Writable
- encode_update
1954@decodable("digest") 1955@dataclass(slots=True) 1956class P4DigestList: 1957 "Represents a P4Runtime DigestList." 1958 1959 digest_id: str 1960 _: KW_ONLY 1961 list_id: int 1962 timestamp: int 1963 data: list[_DataValueType] 1964 1965 @classmethod 1966 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1967 "Decode protobuf to DigestList data." 1968 digest_list = msg.digest 1969 digest = schema.digests[digest_list.digest_id] 1970 1971 type_spec = digest.type_spec 1972 return cls( 1973 digest_id=digest.alias, 1974 list_id=digest_list.list_id, 1975 timestamp=digest_list.timestamp, 1976 data=[type_spec.decode_data(item) for item in digest_list.data], 1977 ) 1978 1979 def __len__(self) -> int: 1980 "Return number of values in digest list." 1981 return len(self.data) 1982 1983 def __getitem__(self, key: int) -> _DataValueType: 1984 "Retrieve value at given index from digest list." 1985 return self.data[key] 1986 1987 def __iter__(self) -> Iterator[_DataValueType]: 1988 "Iterate over values in digest list." 1989 return iter(self.data) 1990 1991 def ack(self) -> "P4DigestListAck": 1992 "Return the corresponding DigestListAck message." 1993 return P4DigestListAck(self.digest_id, self.list_id)
Represents a P4Runtime DigestList.
1965 @classmethod 1966 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1967 "Decode protobuf to DigestList data." 1968 digest_list = msg.digest 1969 digest = schema.digests[digest_list.digest_id] 1970 1971 type_spec = digest.type_spec 1972 return cls( 1973 digest_id=digest.alias, 1974 list_id=digest_list.list_id, 1975 timestamp=digest_list.timestamp, 1976 data=[type_spec.decode_data(item) for item in digest_list.data], 1977 )
Decode protobuf to DigestList data.
1979 def __len__(self) -> int: 1980 "Return number of values in digest list." 1981 return len(self.data)
Return number of values in digest list.
1983 def __getitem__(self, key: int) -> _DataValueType: 1984 "Retrieve value at given index from digest list." 1985 return self.data[key]
Retrieve value at given index from digest list.
1987 def __iter__(self) -> Iterator[_DataValueType]: 1988 "Iterate over values in digest list." 1989 return iter(self.data)
Iterate over values in digest list.
1996@dataclass(slots=True) 1997class P4DigestListAck: 1998 "Represents a P4Runtime DigestListAck." 1999 2000 digest_id: str 2001 list_id: int 2002 2003 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2004 "Encode DigestListAck data as protobuf." 2005 digest = schema.digests[self.digest_id] 2006 2007 return p4r.StreamMessageRequest( 2008 digest_ack=p4r.DigestListAck( 2009 digest_id=digest.id, 2010 list_id=self.list_id, 2011 ) 2012 )
Represents a P4Runtime DigestListAck.
2003 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2004 "Encode DigestListAck data as protobuf." 2005 digest = schema.digests[self.digest_id] 2006 2007 return p4r.StreamMessageRequest( 2008 digest_ack=p4r.DigestListAck( 2009 digest_id=digest.id, 2010 list_id=self.list_id, 2011 ) 2012 )
Encode DigestListAck data as protobuf.
1710@decodable("direct_counter_entry") 1711@dataclass(slots=True) 1712class P4DirectCounterEntry(_P4ModifyOnly): 1713 "Represents a P4Runtime DirectCounterEntry." 1714 1715 counter_id: str = "" 1716 _: KW_ONLY 1717 table_entry: P4TableEntry | None = None 1718 data: P4CounterData | None = None 1719 1720 @property 1721 def table_id(self) -> str: 1722 "Return table_id of related table." 1723 if self.table_entry is None: 1724 return "" 1725 return self.table_entry.table_id 1726 1727 @property 1728 def packet_count(self) -> int: 1729 "Packet count from counter data (or 0 if there is no data)." 1730 if self.data is not None: 1731 return self.data.packet_count 1732 return 0 1733 1734 @property 1735 def byte_count(self) -> int: 1736 "Byte count from counter data (or 0 if there is no data)." 1737 if self.data is not None: 1738 return self.data.byte_count 1739 return 0 1740 1741 def encode(self, schema: P4Schema) -> p4r.Entity: 1742 "Encode P4DirectCounterEntry as protobuf." 1743 if self.table_entry is None: 1744 # Use `counter_id` to construct a `P4TableEntry` with the proper 1745 # table name. 1746 if self.counter_id: 1747 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1748 table_entry = P4TableEntry(tb_name) 1749 else: 1750 table_entry = P4TableEntry() 1751 else: 1752 table_entry = self.table_entry 1753 1754 if self.data is not None: 1755 data = self.data.encode() 1756 else: 1757 data = None 1758 1759 entry = p4r.DirectCounterEntry( 1760 table_entry=table_entry.encode_entry(schema), 1761 data=data, 1762 ) 1763 return p4r.Entity(direct_counter_entry=entry) 1764 1765 @classmethod 1766 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1767 "Decode protobuf to P4DirectCounterEntry." 1768 entry = msg.direct_counter_entry 1769 1770 if entry.HasField("table_entry"): 1771 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1772 else: 1773 table_entry = None 1774 1775 if entry.HasField("data"): 1776 data = P4CounterData.decode(entry.data) 1777 else: 1778 data = None 1779 1780 # Determine `counter_id` from table_entry. 1781 counter_id = "" 1782 if table_entry is not None and table_entry.table_id: 1783 direct_counter = schema.tables[table_entry.table_id].direct_counter 1784 assert direct_counter is not None 1785 counter_id = direct_counter.alias 1786 1787 return cls(counter_id, table_entry=table_entry, data=data)
Represents a P4Runtime DirectCounterEntry.
1720 @property 1721 def table_id(self) -> str: 1722 "Return table_id of related table." 1723 if self.table_entry is None: 1724 return "" 1725 return self.table_entry.table_id
Return table_id of related table.
1727 @property 1728 def packet_count(self) -> int: 1729 "Packet count from counter data (or 0 if there is no data)." 1730 if self.data is not None: 1731 return self.data.packet_count 1732 return 0
Packet count from counter data (or 0 if there is no data).
1734 @property 1735 def byte_count(self) -> int: 1736 "Byte count from counter data (or 0 if there is no data)." 1737 if self.data is not None: 1738 return self.data.byte_count 1739 return 0
Byte count from counter data (or 0 if there is no data).
1741 def encode(self, schema: P4Schema) -> p4r.Entity: 1742 "Encode P4DirectCounterEntry as protobuf." 1743 if self.table_entry is None: 1744 # Use `counter_id` to construct a `P4TableEntry` with the proper 1745 # table name. 1746 if self.counter_id: 1747 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1748 table_entry = P4TableEntry(tb_name) 1749 else: 1750 table_entry = P4TableEntry() 1751 else: 1752 table_entry = self.table_entry 1753 1754 if self.data is not None: 1755 data = self.data.encode() 1756 else: 1757 data = None 1758 1759 entry = p4r.DirectCounterEntry( 1760 table_entry=table_entry.encode_entry(schema), 1761 data=data, 1762 ) 1763 return p4r.Entity(direct_counter_entry=entry)
Encode P4DirectCounterEntry as protobuf.
1765 @classmethod 1766 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1767 "Decode protobuf to P4DirectCounterEntry." 1768 entry = msg.direct_counter_entry 1769 1770 if entry.HasField("table_entry"): 1771 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1772 else: 1773 table_entry = None 1774 1775 if entry.HasField("data"): 1776 data = P4CounterData.decode(entry.data) 1777 else: 1778 data = None 1779 1780 # Determine `counter_id` from table_entry. 1781 counter_id = "" 1782 if table_entry is not None and table_entry.table_id: 1783 direct_counter = schema.tables[table_entry.table_id].direct_counter 1784 assert direct_counter is not None 1785 counter_id = direct_counter.alias 1786 1787 return cls(counter_id, table_entry=table_entry, data=data)
Decode protobuf to P4DirectCounterEntry.
Inherited Members
- finsy.p4entity._P4ModifyOnly
- encode_update
1580@decodable("direct_meter_entry") 1581@dataclass(kw_only=True, slots=True) 1582class P4DirectMeterEntry(_P4ModifyOnly): 1583 "Represents a P4Runtime DirectMeterEntry." 1584 1585 table_entry: P4TableEntry | None = None 1586 config: P4MeterConfig | None = None 1587 counter_data: P4MeterCounterData | None = None 1588 1589 def encode(self, schema: P4Schema) -> p4r.Entity: 1590 "Encode P4DirectMeterEntry as protobuf." 1591 if self.table_entry is not None: 1592 table_entry = self.table_entry.encode_entry(schema) 1593 else: 1594 table_entry = None 1595 1596 if self.config is not None: 1597 config = self.config.encode() 1598 else: 1599 config = None 1600 1601 if self.counter_data is not None: 1602 counter_data = self.counter_data.encode() 1603 else: 1604 counter_data = None 1605 1606 entry = p4r.DirectMeterEntry( 1607 table_entry=table_entry, 1608 config=config, 1609 counter_data=counter_data, 1610 ) 1611 return p4r.Entity(direct_meter_entry=entry) 1612 1613 @classmethod 1614 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1615 "Decode protobuf to P4DirectMeterEntry." 1616 entry = msg.direct_meter_entry 1617 1618 if entry.HasField("table_entry"): 1619 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1620 else: 1621 table_entry = None 1622 1623 if entry.HasField("config"): 1624 config = P4MeterConfig.decode(entry.config) 1625 else: 1626 config = None 1627 1628 if entry.HasField("counter_data"): 1629 counter_data = P4MeterCounterData.decode(entry.counter_data) 1630 else: 1631 counter_data = None 1632 1633 return cls( 1634 table_entry=table_entry, 1635 config=config, 1636 counter_data=counter_data, 1637 )
Represents a P4Runtime DirectMeterEntry.
1589 def encode(self, schema: P4Schema) -> p4r.Entity: 1590 "Encode P4DirectMeterEntry as protobuf." 1591 if self.table_entry is not None: 1592 table_entry = self.table_entry.encode_entry(schema) 1593 else: 1594 table_entry = None 1595 1596 if self.config is not None: 1597 config = self.config.encode() 1598 else: 1599 config = None 1600 1601 if self.counter_data is not None: 1602 counter_data = self.counter_data.encode() 1603 else: 1604 counter_data = None 1605 1606 entry = p4r.DirectMeterEntry( 1607 table_entry=table_entry, 1608 config=config, 1609 counter_data=counter_data, 1610 ) 1611 return p4r.Entity(direct_meter_entry=entry)
Encode P4DirectMeterEntry as protobuf.
1613 @classmethod 1614 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1615 "Decode protobuf to P4DirectMeterEntry." 1616 entry = msg.direct_meter_entry 1617 1618 if entry.HasField("table_entry"): 1619 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1620 else: 1621 table_entry = None 1622 1623 if entry.HasField("config"): 1624 config = P4MeterConfig.decode(entry.config) 1625 else: 1626 config = None 1627 1628 if entry.HasField("counter_data"): 1629 counter_data = P4MeterCounterData.decode(entry.counter_data) 1630 else: 1631 counter_data = None 1632 1633 return cls( 1634 table_entry=table_entry, 1635 config=config, 1636 counter_data=counter_data, 1637 )
Decode protobuf to P4DirectMeterEntry.
Inherited Members
- finsy.p4entity._P4ModifyOnly
- encode_update
58@dataclass 59class P4Error: 60 "P4Runtime Error message used to report a single P4-entity error." 61 62 canonical_code: GRPCStatusCode 63 message: str 64 space: str 65 code: int 66 subvalue: pbutil.PBMessage | None = None
P4Runtime Error message used to report a single P4-entity error.
2047@decodable("extern_entry") 2048@dataclass(kw_only=True, slots=True) 2049class P4ExternEntry(_P4Writable): 2050 "Represents a P4Runtime ExternEntry." 2051 2052 extern_type_id: str 2053 extern_id: str 2054 entry: pbutil.PBAny 2055 2056 def encode(self, schema: P4Schema) -> p4r.Entity: 2057 "Encode ExternEntry data as protobuf." 2058 extern = schema.externs[self.extern_type_id, self.extern_id] 2059 entry = p4r.ExternEntry( 2060 extern_type_id=extern.extern_type_id, 2061 extern_id=extern.id, 2062 entry=self.entry, 2063 ) 2064 return p4r.Entity(extern_entry=entry) 2065 2066 @classmethod 2067 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2068 "Decode protobuf to ExternEntry data." 2069 entry = msg.extern_entry 2070 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2071 return cls( 2072 extern_type_id=extern.extern_type_name, 2073 extern_id=extern.name, 2074 entry=entry.entry, 2075 )
Represents a P4Runtime ExternEntry.
2056 def encode(self, schema: P4Schema) -> p4r.Entity: 2057 "Encode ExternEntry data as protobuf." 2058 extern = schema.externs[self.extern_type_id, self.extern_id] 2059 entry = p4r.ExternEntry( 2060 extern_type_id=extern.extern_type_id, 2061 extern_id=extern.id, 2062 entry=self.entry, 2063 ) 2064 return p4r.Entity(extern_entry=entry)
Encode ExternEntry data as protobuf.
2066 @classmethod 2067 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2068 "Decode protobuf to ExternEntry data." 2069 entry = msg.extern_entry 2070 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2071 return cls( 2072 extern_type_id=extern.extern_type_name, 2073 extern_id=extern.name, 2074 entry=entry.entry, 2075 )
Decode protobuf to ExternEntry data.
Inherited Members
- finsy.p4entity._P4Writable
- encode_update
IndirectAction
is an alias for P4IndirectAction.
618@dataclass(slots=True) 619class P4IndirectAction: 620 """Represents a P4Runtime Action reference for an indirect table. 621 622 An indirect action can be either: 623 624 1. a "one-shot" action (action_set) 625 2. a reference to an action profile member (member_id) 626 3. a reference to an action profile group (group_id) 627 628 Only one of action_set, member_id or group_id may be configured. The other 629 values must be None. 630 631 Attributes: 632 action_set: sequence of weighted actions for one-shot 633 member_id: ID of action profile member 634 group_id: ID of action profile group 635 636 Examples: 637 638 ```python 639 # Construct a one-shot action profile. 640 one_shot = P4IndirectAction( 641 2 * P4TableAction("forward", port=1), 642 1 * P4TableAction("forward", port=2), 643 ) 644 645 # Refer to an action profile member by ID. 646 member_action = P4IndirectAction(member_id=1) 647 648 # Refer to an action profile group by ID. 649 group_action = P4IndirectAction(group_id=2) 650 ``` 651 652 References: 653 - "9.1.2. Action Specification", 654 - "9.2.3. One Shot Action Selector Programming" 655 656 TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and 657 P4GroupAction. 658 659 See Also: 660 - P4TableEntry 661 """ 662 663 action_set: Sequence[P4WeightedAction] | None = None 664 "Sequence of weighted actions defining one-shot action profile." 665 _: KW_ONLY 666 member_id: int | None = None 667 "ID of action profile member." 668 group_id: int | None = None 669 "ID of action profile group." 670 671 def __post_init__(self) -> None: 672 if not self._check_invariant(): 673 raise ValueError( 674 "exactly one of action_set, member_id, or group_id must be set" 675 ) 676 677 def _check_invariant(self) -> bool: 678 "Return true if instance satisfies class invariant." 679 if self.action_set is not None: 680 return self.member_id is None and self.group_id is None 681 if self.member_id is not None: 682 return self.group_id is None 683 return self.group_id is not None 684 685 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 686 "Encode object as a TableAction." 687 if self.action_set is not None: 688 return p4r.TableAction( 689 action_profile_action_set=self.encode_action_set(table) 690 ) 691 692 if self.member_id is not None: 693 return p4r.TableAction(action_profile_member_id=self.member_id) 694 695 assert self.group_id is not None 696 return p4r.TableAction(action_profile_group_id=self.group_id) 697 698 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 699 "Encode object as an ActionProfileActionSet." 700 assert self.action_set is not None 701 702 profile_actions: list[p4r.ActionProfileAction] = [] 703 for weight, table_action in self.action_set: 704 action = table_action.encode_action(table) 705 706 match weight: 707 case int(weight_value): 708 watch_port = None 709 case (weight_value, int(watch)): 710 watch_port = encode_watch_port(watch) 711 case _: # pyright: ignore[reportUnnecessaryComparison] 712 raise ValueError(f"unexpected action weight: {weight!r}") 713 714 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 715 if watch_port is not None: 716 profile.watch_port = watch_port 717 profile_actions.append(profile) 718 719 return p4r.ActionProfileActionSet(action_profile_actions=profile_actions) 720 721 @classmethod 722 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 723 "Decode ActionProfileActionSet." 724 action_set = list[P4WeightedAction]() 725 726 for action in msg.action_profile_actions: 727 match action.WhichOneof("watch_kind"): 728 case "watch_port": 729 weight = (action.weight, decode_watch_port(action.watch_port)) 730 case None: 731 weight = action.weight 732 case other: 733 # "watch" (deprecated) is not supported 734 raise ValueError(f"unexpected oneof: {other!r}") 735 736 table_action = P4TableAction.decode_action(action.action, table) 737 action_set.append((weight, table_action)) 738 739 return cls(action_set) 740 741 def format_str(self, table: P4Table) -> str: 742 """Format the indirect table action as a human-readable string.""" 743 if self.action_set is not None: 744 weighted_actions = [ 745 f"{weight}*{action.format_str(table)}" 746 for weight, action in self.action_set 747 ] 748 return " ".join(weighted_actions) 749 750 # Use the name of the action_profile, if we can get it. If not, just 751 # use the placeholder "__indirect". 752 if table.action_profile is not None: 753 profile_name = f"@{table.action_profile.alias}" 754 else: 755 profile_name = "__indirect" 756 757 if self.member_id is not None: 758 return f"{profile_name}[[{self.member_id:#x}]]" 759 760 return f"{profile_name}[{self.group_id:#x}]" 761 762 def __repr__(self) -> str: 763 "Customize representation to make it more concise." 764 if self.action_set is not None: 765 return f"P4IndirectAction(action_set={self.action_set!r})" 766 if self.member_id is not None: 767 return f"P4IndirectAction(member_id={self.member_id!r})" 768 return f"P4IndirectAction(group_id={self.group_id!r})"
Represents a P4Runtime Action reference for an indirect table.
An indirect action can be either:
- a "one-shot" action (action_set)
- a reference to an action profile member (member_id)
- a reference to an action profile group (group_id)
Only one of action_set, member_id or group_id may be configured. The other values must be None.
Attributes: action_set: sequence of weighted actions for one-shot member_id: ID of action profile member group_id: ID of action profile group
Examples:
# Construct a one-shot action profile.
one_shot = P4IndirectAction(
2 * P4TableAction("forward", port=1),
1 * P4TableAction("forward", port=2),
)
# Refer to an action profile member by ID.
member_action = P4IndirectAction(member_id=1)
# Refer to an action profile group by ID.
group_action = P4IndirectAction(group_id=2)
References: - "9.1.2. Action Specification", - "9.2.3. One Shot Action Selector Programming"
TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and P4GroupAction.
See Also: - P4TableEntry
Sequence of weighted actions defining one-shot action profile.
685 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 686 "Encode object as a TableAction." 687 if self.action_set is not None: 688 return p4r.TableAction( 689 action_profile_action_set=self.encode_action_set(table) 690 ) 691 692 if self.member_id is not None: 693 return p4r.TableAction(action_profile_member_id=self.member_id) 694 695 assert self.group_id is not None 696 return p4r.TableAction(action_profile_group_id=self.group_id)
Encode object as a TableAction.
698 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 699 "Encode object as an ActionProfileActionSet." 700 assert self.action_set is not None 701 702 profile_actions: list[p4r.ActionProfileAction] = [] 703 for weight, table_action in self.action_set: 704 action = table_action.encode_action(table) 705 706 match weight: 707 case int(weight_value): 708 watch_port = None 709 case (weight_value, int(watch)): 710 watch_port = encode_watch_port(watch) 711 case _: # pyright: ignore[reportUnnecessaryComparison] 712 raise ValueError(f"unexpected action weight: {weight!r}") 713 714 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 715 if watch_port is not None: 716 profile.watch_port = watch_port 717 profile_actions.append(profile) 718 719 return p4r.ActionProfileActionSet(action_profile_actions=profile_actions)
Encode object as an ActionProfileActionSet.
721 @classmethod 722 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 723 "Decode ActionProfileActionSet." 724 action_set = list[P4WeightedAction]() 725 726 for action in msg.action_profile_actions: 727 match action.WhichOneof("watch_kind"): 728 case "watch_port": 729 weight = (action.weight, decode_watch_port(action.watch_port)) 730 case None: 731 weight = action.weight 732 case other: 733 # "watch" (deprecated) is not supported 734 raise ValueError(f"unexpected oneof: {other!r}") 735 736 table_action = P4TableAction.decode_action(action.action, table) 737 action_set.append((weight, table_action)) 738 739 return cls(action_set)
Decode ActionProfileActionSet.
741 def format_str(self, table: P4Table) -> str: 742 """Format the indirect table action as a human-readable string.""" 743 if self.action_set is not None: 744 weighted_actions = [ 745 f"{weight}*{action.format_str(table)}" 746 for weight, action in self.action_set 747 ] 748 return " ".join(weighted_actions) 749 750 # Use the name of the action_profile, if we can get it. If not, just 751 # use the placeholder "__indirect". 752 if table.action_profile is not None: 753 profile_name = f"@{table.action_profile.alias}" 754 else: 755 profile_name = "__indirect" 756 757 if self.member_id is not None: 758 return f"{profile_name}[[{self.member_id:#x}]]" 759 760 return f"{profile_name}[{self.group_id:#x}]"
Format the indirect table action as a human-readable string.
1399@dataclass(slots=True) 1400class P4Member: 1401 """Represents an ActionProfileGroup Member. 1402 1403 See Also: 1404 - P4ActionProfileGroup 1405 """ 1406 1407 member_id: int 1408 _: KW_ONLY 1409 weight: P4Weight 1410 1411 def encode(self) -> p4r.ActionProfileGroup.Member: 1412 "Encode P4Member as protobuf." 1413 match self.weight: 1414 case int(weight): 1415 watch_port = None 1416 case (int(weight), int(watch)): 1417 watch_port = encode_watch_port(watch) 1418 case other: # pyright: ignore[reportUnnecessaryComparison] 1419 raise ValueError(f"unexpected weight: {other!r}") 1420 1421 member = p4r.ActionProfileGroup.Member( 1422 member_id=self.member_id, 1423 weight=weight, 1424 ) 1425 1426 if watch_port is not None: 1427 member.watch_port = watch_port 1428 return member 1429 1430 @classmethod 1431 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1432 "Decode protobuf to P4Member." 1433 match msg.WhichOneof("watch_kind"): 1434 case "watch_port": 1435 weight = (msg.weight, decode_watch_port(msg.watch_port)) 1436 case None: 1437 weight = msg.weight 1438 case other: 1439 # "watch" (deprecated) is not supported 1440 raise ValueError(f"unknown oneof: {other!r}") 1441 1442 return cls(member_id=msg.member_id, weight=weight)
Represents an ActionProfileGroup Member.
See Also: - P4ActionProfileGroup
1411 def encode(self) -> p4r.ActionProfileGroup.Member: 1412 "Encode P4Member as protobuf." 1413 match self.weight: 1414 case int(weight): 1415 watch_port = None 1416 case (int(weight), int(watch)): 1417 watch_port = encode_watch_port(watch) 1418 case other: # pyright: ignore[reportUnnecessaryComparison] 1419 raise ValueError(f"unexpected weight: {other!r}") 1420 1421 member = p4r.ActionProfileGroup.Member( 1422 member_id=self.member_id, 1423 weight=weight, 1424 ) 1425 1426 if watch_port is not None: 1427 member.watch_port = watch_port 1428 return member
Encode P4Member as protobuf.
1430 @classmethod 1431 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1432 "Decode protobuf to P4Member." 1433 match msg.WhichOneof("watch_kind"): 1434 case "watch_port": 1435 weight = (msg.weight, decode_watch_port(msg.watch_port)) 1436 case None: 1437 weight = msg.weight 1438 case other: 1439 # "watch" (deprecated) is not supported 1440 raise ValueError(f"unknown oneof: {other!r}") 1441 1442 return cls(member_id=msg.member_id, weight=weight)
Decode protobuf to P4Member.
771@dataclass(kw_only=True, slots=True) 772class P4MeterConfig: 773 """Represents a P4Runtime MeterConfig. 774 775 Attributes: 776 cir (int): Committed information rate (units/sec). 777 cburst (int): Committed burst size. 778 pir (int): Peak information rate (units/sec). 779 pburst (int): Peak burst size. 780 781 Example: 782 ``` 783 config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20) 784 ``` 785 786 See Also: 787 - P4TableEntry 788 - P4MeterEntry 789 - P4DirectMeterEntry 790 """ 791 792 cir: int 793 "Committed information rate (units/sec)." 794 cburst: int 795 "Committed burst size." 796 pir: int 797 "Peak information rate (units/sec)." 798 pburst: int 799 "Peak burst size." 800 801 def encode(self) -> p4r.MeterConfig: 802 "Encode object as MeterConfig." 803 return p4r.MeterConfig( 804 cir=self.cir, cburst=self.cburst, pir=self.pir, pburst=self.pburst 805 ) 806 807 @classmethod 808 def decode(cls, msg: p4r.MeterConfig) -> Self: 809 "Decode MeterConfig." 810 return cls(cir=msg.cir, cburst=msg.cburst, pir=msg.pir, pburst=msg.pburst)
Represents a P4Runtime MeterConfig.
Attributes: cir (int): Committed information rate (units/sec). cburst (int): Committed burst size. pir (int): Peak information rate (units/sec). pburst (int): Peak burst size.
Example:
config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)
See Also: - P4TableEntry - P4MeterEntry - P4DirectMeterEntry
845@dataclass(kw_only=True, slots=True) 846class P4MeterCounterData: 847 """Represents a P4Runtime MeterCounterData that stores per-color counters. 848 849 Attributes: 850 green (CounterData): counter data for packets marked GREEN. 851 yellow (CounterData): counter data for packets marked YELLOW. 852 red (CounterData): counter data for packets marked RED. 853 854 See Also: 855 - P4TableEntry 856 - P4MeterEntry 857 - P4DirectMeterEntry 858 """ 859 860 green: P4CounterData 861 "Counter of packets marked GREEN." 862 yellow: P4CounterData 863 "Counter of packets marked YELLOW." 864 red: P4CounterData 865 "Counter of packets marked RED." 866 867 def encode(self) -> p4r.MeterCounterData: 868 "Encode object as MeterCounterData." 869 return p4r.MeterCounterData( 870 green=self.green.encode(), 871 yellow=self.yellow.encode(), 872 red=self.red.encode(), 873 ) 874 875 @classmethod 876 def decode(cls, msg: p4r.MeterCounterData) -> Self: 877 "Decode MeterCounterData." 878 return cls( 879 green=P4CounterData.decode(msg.green), 880 yellow=P4CounterData.decode(msg.yellow), 881 red=P4CounterData.decode(msg.red), 882 )
Represents a P4Runtime MeterCounterData that stores per-color counters.
Attributes: green (CounterData): counter data for packets marked GREEN. yellow (CounterData): counter data for packets marked YELLOW. red (CounterData): counter data for packets marked RED.
See Also: - P4TableEntry - P4MeterEntry - P4DirectMeterEntry
867 def encode(self) -> p4r.MeterCounterData: 868 "Encode object as MeterCounterData." 869 return p4r.MeterCounterData( 870 green=self.green.encode(), 871 yellow=self.yellow.encode(), 872 red=self.red.encode(), 873 )
Encode object as MeterCounterData.
875 @classmethod 876 def decode(cls, msg: p4r.MeterCounterData) -> Self: 877 "Decode MeterCounterData." 878 return cls( 879 green=P4CounterData.decode(msg.green), 880 yellow=P4CounterData.decode(msg.yellow), 881 red=P4CounterData.decode(msg.red), 882 )
Decode MeterCounterData.
1507@decodable("meter_entry") 1508@dataclass(slots=True) 1509class P4MeterEntry(_P4ModifyOnly): 1510 "Represents a P4Runtime MeterEntry." 1511 1512 meter_id: str = "" 1513 _: KW_ONLY 1514 index: int | None = None 1515 config: P4MeterConfig | None = None 1516 counter_data: P4MeterCounterData | None = None 1517 1518 def encode(self, schema: P4Schema) -> p4r.Entity: 1519 "Encode P4MeterEntry to protobuf." 1520 if not self.meter_id: 1521 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1522 1523 meter = schema.meters[self.meter_id] 1524 1525 if self.index is not None: 1526 index = p4r.Index(index=self.index) 1527 else: 1528 index = None 1529 1530 if self.config is not None: 1531 config = self.config.encode() 1532 else: 1533 config = None 1534 1535 if self.counter_data is not None: 1536 counter_data = self.counter_data.encode() 1537 else: 1538 counter_data = None 1539 1540 entry = p4r.MeterEntry( 1541 meter_id=meter.id, 1542 index=index, 1543 config=config, 1544 counter_data=counter_data, 1545 ) 1546 return p4r.Entity(meter_entry=entry) 1547 1548 @classmethod 1549 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1550 "Decode protobuf to P4MeterEntry." 1551 entry = msg.meter_entry 1552 if not entry.meter_id: 1553 return cls() 1554 1555 meter = schema.meters[entry.meter_id] 1556 1557 if entry.HasField("index"): 1558 index = entry.index.index 1559 else: 1560 index = None 1561 1562 if entry.HasField("config"): 1563 config = P4MeterConfig.decode(entry.config) 1564 else: 1565 config = None 1566 1567 if entry.HasField("counter_data"): 1568 counter_data = P4MeterCounterData.decode(entry.counter_data) 1569 else: 1570 counter_data = None 1571 1572 return cls( 1573 meter_id=meter.alias, 1574 index=index, 1575 config=config, 1576 counter_data=counter_data, 1577 )
Represents a P4Runtime MeterEntry.
1518 def encode(self, schema: P4Schema) -> p4r.Entity: 1519 "Encode P4MeterEntry to protobuf." 1520 if not self.meter_id: 1521 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1522 1523 meter = schema.meters[self.meter_id] 1524 1525 if self.index is not None: 1526 index = p4r.Index(index=self.index) 1527 else: 1528 index = None 1529 1530 if self.config is not None: 1531 config = self.config.encode() 1532 else: 1533 config = None 1534 1535 if self.counter_data is not None: 1536 counter_data = self.counter_data.encode() 1537 else: 1538 counter_data = None 1539 1540 entry = p4r.MeterEntry( 1541 meter_id=meter.id, 1542 index=index, 1543 config=config, 1544 counter_data=counter_data, 1545 ) 1546 return p4r.Entity(meter_entry=entry)
Encode P4MeterEntry to protobuf.
1548 @classmethod 1549 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1550 "Decode protobuf to P4MeterEntry." 1551 entry = msg.meter_entry 1552 if not entry.meter_id: 1553 return cls() 1554 1555 meter = schema.meters[entry.meter_id] 1556 1557 if entry.HasField("index"): 1558 index = entry.index.index 1559 else: 1560 index = None 1561 1562 if entry.HasField("config"): 1563 config = P4MeterConfig.decode(entry.config) 1564 else: 1565 config = None 1566 1567 if entry.HasField("counter_data"): 1568 counter_data = P4MeterCounterData.decode(entry.counter_data) 1569 else: 1570 counter_data = None 1571 1572 return cls( 1573 meter_id=meter.alias, 1574 index=index, 1575 config=config, 1576 counter_data=counter_data, 1577 )
Decode protobuf to P4MeterEntry.
Inherited Members
- finsy.p4entity._P4ModifyOnly
- encode_update
1216@decodable("multicast_group_entry") 1217@dataclass(slots=True) 1218class P4MulticastGroupEntry(_P4Writable): 1219 "Represents a P4Runtime MulticastGroupEntry." 1220 1221 multicast_group_id: int = 0 1222 _: KW_ONLY 1223 replicas: Sequence[_ReplicaType] = () 1224 metadata: bytes = b"" 1225 1226 def encode(self, schema: P4Schema) -> p4r.Entity: 1227 "Encode MulticastGroupEntry data as protobuf." 1228 entry = p4r.MulticastGroupEntry( 1229 multicast_group_id=self.multicast_group_id, 1230 replicas=[encode_replica(replica) for replica in self.replicas], 1231 metadata=self.metadata, 1232 ) 1233 return p4r.Entity( 1234 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1235 multicast_group_entry=entry 1236 ) 1237 ) 1238 1239 @classmethod 1240 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1241 "Decode protobuf to MulticastGroupEntry data." 1242 entry = msg.packet_replication_engine_entry.multicast_group_entry 1243 return cls( 1244 multicast_group_id=entry.multicast_group_id, 1245 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1246 metadata=entry.metadata, 1247 ) 1248 1249 def replicas_str(self) -> str: 1250 "Format the replicas as a human-readable, canonical string." 1251 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime MulticastGroupEntry.
1226 def encode(self, schema: P4Schema) -> p4r.Entity: 1227 "Encode MulticastGroupEntry data as protobuf." 1228 entry = p4r.MulticastGroupEntry( 1229 multicast_group_id=self.multicast_group_id, 1230 replicas=[encode_replica(replica) for replica in self.replicas], 1231 metadata=self.metadata, 1232 ) 1233 return p4r.Entity( 1234 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1235 multicast_group_entry=entry 1236 ) 1237 )
Encode MulticastGroupEntry data as protobuf.
1239 @classmethod 1240 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1241 "Decode protobuf to MulticastGroupEntry data." 1242 entry = msg.packet_replication_engine_entry.multicast_group_entry 1243 return cls( 1244 multicast_group_id=entry.multicast_group_id, 1245 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1246 metadata=entry.metadata, 1247 )
Decode protobuf to MulticastGroupEntry data.
1249 def replicas_str(self) -> str: 1250 "Format the replicas as a human-readable, canonical string." 1251 return " ".join(format_replica(rep) for rep in self.replicas)
Format the replicas as a human-readable, canonical string.
Inherited Members
- finsy.p4entity._P4Writable
- encode_update
1884@decodable("packet") 1885@dataclass(slots=True) 1886class P4PacketIn: 1887 "Represents a P4Runtime PacketIn." 1888 1889 payload: bytes 1890 _: KW_ONLY 1891 metadata: _MetadataDictType 1892 1893 @classmethod 1894 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1895 "Decode protobuf to PacketIn data." 1896 packet = msg.packet 1897 cpm = schema.controller_packet_metadata.get("packet_in") 1898 if cpm is None: 1899 # There is no controller metadata. Warn if message has any. 1900 pkt_meta = packet.metadata 1901 if pkt_meta: 1902 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 1903 return cls(packet.payload, metadata={}) 1904 1905 return cls( 1906 packet.payload, 1907 metadata=cpm.decode(packet.metadata), 1908 ) 1909 1910 def __getitem__(self, key: str) -> Any: 1911 "Retrieve metadata value." 1912 return self.metadata[key] 1913 1914 def __repr__(self) -> str: 1915 "Return friendlier hexadecimal description of packet." 1916 if self.metadata: 1917 return f"P4PacketIn(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 1918 return f"P4PacketIn(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketIn.
1893 @classmethod 1894 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1895 "Decode protobuf to PacketIn data." 1896 packet = msg.packet 1897 cpm = schema.controller_packet_metadata.get("packet_in") 1898 if cpm is None: 1899 # There is no controller metadata. Warn if message has any. 1900 pkt_meta = packet.metadata 1901 if pkt_meta: 1902 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 1903 return cls(packet.payload, metadata={}) 1904 1905 return cls( 1906 packet.payload, 1907 metadata=cpm.decode(packet.metadata), 1908 )
Decode protobuf to PacketIn data.
1921@dataclass(slots=True) 1922class P4PacketOut: 1923 "Represents a P4Runtime PacketOut." 1924 1925 payload: bytes 1926 _: KW_ONLY 1927 metadata: _MetadataDictType 1928 1929 def __init__(self, __payload: bytes, /, **metadata: Any): 1930 self.payload = __payload 1931 self.metadata = metadata 1932 1933 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 1934 "Encode PacketOut data as protobuf." 1935 cpm = schema.controller_packet_metadata["packet_out"] 1936 return p4r.StreamMessageRequest( 1937 packet=p4r.PacketOut( 1938 payload=self.payload, 1939 metadata=cpm.encode(self.metadata), 1940 ) 1941 ) 1942 1943 def __getitem__(self, key: str) -> Any: 1944 "Retrieve metadata value." 1945 return self.metadata[key] 1946 1947 def __repr__(self) -> str: 1948 "Return friendlier hexadecimal description of packet." 1949 if self.metadata: 1950 return f"P4PacketOut(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 1951 return f"P4PacketOut(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketOut.
1933 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 1934 "Encode PacketOut data as protobuf." 1935 cpm = schema.controller_packet_metadata["packet_out"] 1936 return p4r.StreamMessageRequest( 1937 packet=p4r.PacketOut( 1938 payload=self.payload, 1939 metadata=cpm.encode(self.metadata), 1940 ) 1941 )
Encode PacketOut data as protobuf.
1156@decodable("register_entry") 1157@dataclass(slots=True) 1158class P4RegisterEntry(_P4ModifyOnly): 1159 "Represents a P4Runtime RegisterEntry." 1160 1161 register_id: str = "" 1162 _: KW_ONLY 1163 index: int | None = None 1164 data: _DataValueType | None = None 1165 1166 def encode(self, schema: P4Schema) -> p4r.Entity: 1167 "Encode RegisterEntry data as protobuf." 1168 if not self.register_id: 1169 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1170 1171 register = schema.registers[self.register_id] 1172 1173 if self.index is not None: 1174 index = p4r.Index(index=self.index) 1175 else: 1176 index = None 1177 1178 if self.data is not None: 1179 data = register.type_spec.encode_data(self.data) 1180 else: 1181 data = None 1182 1183 entry = p4r.RegisterEntry( 1184 register_id=register.id, 1185 index=index, 1186 data=data, 1187 ) 1188 return p4r.Entity(register_entry=entry) 1189 1190 @classmethod 1191 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1192 "Decode protobuf to RegisterEntry data." 1193 entry = msg.register_entry 1194 if entry.register_id == 0: 1195 return cls() 1196 1197 register = schema.registers[entry.register_id] 1198 1199 if entry.HasField("index"): 1200 index = entry.index.index 1201 else: 1202 index = None 1203 1204 if entry.HasField("data"): 1205 data = register.type_spec.decode_data(entry.data) 1206 else: 1207 data = None 1208 1209 return cls( 1210 register.alias, 1211 index=index, 1212 data=data, 1213 )
Represents a P4Runtime RegisterEntry.
1166 def encode(self, schema: P4Schema) -> p4r.Entity: 1167 "Encode RegisterEntry data as protobuf." 1168 if not self.register_id: 1169 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1170 1171 register = schema.registers[self.register_id] 1172 1173 if self.index is not None: 1174 index = p4r.Index(index=self.index) 1175 else: 1176 index = None 1177 1178 if self.data is not None: 1179 data = register.type_spec.encode_data(self.data) 1180 else: 1181 data = None 1182 1183 entry = p4r.RegisterEntry( 1184 register_id=register.id, 1185 index=index, 1186 data=data, 1187 ) 1188 return p4r.Entity(register_entry=entry)
Encode RegisterEntry data as protobuf.
1190 @classmethod 1191 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1192 "Decode protobuf to RegisterEntry data." 1193 entry = msg.register_entry 1194 if entry.register_id == 0: 1195 return cls() 1196 1197 register = schema.registers[entry.register_id] 1198 1199 if entry.HasField("index"): 1200 index = entry.index.index 1201 else: 1202 index = None 1203 1204 if entry.HasField("data"): 1205 data = register.type_spec.decode_data(entry.data) 1206 else: 1207 data = None 1208 1209 return cls( 1210 register.alias, 1211 index=index, 1212 data=data, 1213 )
Decode protobuf to RegisterEntry data.
Inherited Members
- finsy.p4entity._P4ModifyOnly
- encode_update
Action
is an alias for P4TableAction.
439@dataclass(init=False, slots=True) 440class P4TableAction: 441 """Represents a P4Runtime Action reference for a direct table. 442 443 Attributes: 444 name (str): the name of the action. 445 args (dict[str, Any]): the action's arguments as a dictionary. 446 447 Example: 448 If the name of the action is "ipv4_forward" and it takes a single 449 "port" parameter, you can construct the action as: 450 451 ``` 452 action = P4TableAction("ipv4_forward", port=1) 453 ``` 454 455 Reference "9.1.2 Action Specification": 456 The Action Protobuf has fields: (action_id, params). Finsy translates 457 `name` to the appropriate `action_id` as determined by P4Info. It also 458 translates each named argument in `args` to the appropriate `param_id`. 459 460 See Also: 461 To specify an action for an indirect table, use `P4IndirectAction`. 462 Note that P4TableAction will automatically be promoted to an "indirect" 463 action if needed. 464 465 Operators: 466 A `P4TableAction` supports the multiplication operator (*) for 467 constructing "weighted actions". A weighted action is used in specifying 468 indirect actions. Here is an action with a weight of 3: 469 470 ``` 471 weighted_action = 3 * P4TableAction("ipv4_forward", port=1) 472 ``` 473 474 To specify a weight with a `watch_port`, use a tuple `(weight, port)`. 475 The weight is always a positive integer. 476 477 See Also: 478 - P4TableEntry 479 """ 480 481 name: str 482 "The name of the action." 483 args: dict[str, Any] 484 "The action's arguments as a dictionary." 485 486 def __init__(self, __name: str, /, **args: Any): 487 self.name = __name 488 self.args = args 489 490 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 491 """Encode TableAction data as protobuf. 492 493 If the table is indirect, promote the action to a "one-shot" indirect 494 action. 495 """ 496 try: 497 action = table.actions[self.name] 498 except Exception as ex: 499 raise ValueError(f"{table.name!r}: {ex}") from ex 500 501 action_p4 = self._encode_action(action) 502 503 if table.action_profile is not None: 504 # Promote action to ActionProfileActionSet entry with weight=1. 505 return p4r.TableAction( 506 action_profile_action_set=p4r.ActionProfileActionSet( 507 action_profile_actions=[ 508 p4r.ActionProfileAction(action=action_p4, weight=1) 509 ] 510 ) 511 ) 512 513 return p4r.TableAction(action=action_p4) 514 515 def _fail_missing_params(self, action: P4ActionRef | P4Action) -> NoReturn: 516 "Report missing parameters." 517 seen = {param.name for param in action.params} 518 for name in self.args: 519 param = action.params[name] 520 seen.remove(param.name) 521 522 raise ValueError(f"Action {action.alias!r}: missing parameters {seen}") 523 524 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 525 "Encode Action data as protobuf." 526 action = schema.actions[self.name] 527 return self._encode_action(action) 528 529 def _encode_action(self, action: P4ActionRef | P4Action) -> p4r.Action: 530 "Helper to encode an action." 531 aps = action.params 532 try: 533 params = [ 534 aps[name].encode_param(value) for name, value in self.args.items() 535 ] 536 except ValueError as ex: 537 raise ValueError(f"{action.alias!r}: {ex}") from ex 538 539 # Check for missing action parameters. We always accept an action with 540 # no parameters (for wildcard ReadRequests). 541 param_count = len(params) 542 if param_count > 0 and param_count != len(aps): 543 self._fail_missing_params(action) 544 545 return p4r.Action(action_id=action.id, params=params) 546 547 @classmethod 548 def decode_table_action( 549 cls, msg: p4r.TableAction, table: P4Table 550 ) -> Self | "P4IndirectAction": 551 "Decode protobuf to TableAction data." 552 match msg.WhichOneof("type"): 553 case "action": 554 return cls.decode_action(msg.action, table) 555 case "action_profile_member_id": 556 return P4IndirectAction(member_id=msg.action_profile_member_id) 557 case "action_profile_group_id": 558 return P4IndirectAction(group_id=msg.action_profile_group_id) 559 case "action_profile_action_set": 560 return P4IndirectAction.decode_action_set( 561 msg.action_profile_action_set, table 562 ) 563 case other: 564 raise ValueError(f"unknown oneof: {other!r}") 565 566 @classmethod 567 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 568 "Decode protobuf to Action data." 569 action = parent.actions[msg.action_id] 570 args = {} 571 for param in msg.params: 572 action_param = action.params[param.param_id] 573 value = action_param.decode_param(param) 574 args[action_param.name] = value 575 576 return cls(action.alias, **args) 577 578 def format_str(self, schema: P4Schema | P4Table) -> str: 579 """Format the table action as a human-readable string. 580 581 The result is formatted to look like a function call: 582 583 ``` 584 name(param1=value1, ...) 585 ``` 586 587 Where `name` is the action name, and `(param<N>, value<N>)` are the 588 action parameters. The format of `value<N>` is schema-dependent. 589 """ 590 aps = schema.actions[self.name].params 591 args = [ 592 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 593 ] 594 595 return f"{self.name}({', '.join(args)})" 596 597 def __mul__(self, weight: P4Weight) -> P4WeightedAction: 598 "Make a weighted action." 599 if not isinstance( 600 weight, (int, tuple) 601 ): # pyright: ignore[reportUnnecessaryIsInstance] 602 raise NotImplementedError("expected P4Weight") 603 return (weight, self) 604 605 def __rmul__(self, weight: P4Weight) -> P4WeightedAction: 606 "Make a weighted action." 607 if not isinstance( 608 weight, (int, tuple) 609 ): # pyright: ignore[reportUnnecessaryIsInstance] 610 raise NotImplementedError("expected P4Weight") 611 return (weight, self) 612 613 def __call__(self, **params: Any) -> Self: 614 "Return a new action with the updated parameters." 615 return self.__class__(self.name, **(self.args | params))
Represents a P4Runtime Action reference for a direct table.
Attributes: name (str): the name of the action. args (dict[str, Any]): the action's arguments as a dictionary.
Example: If the name of the action is "ipv4_forward" and it takes a single "port" parameter, you can construct the action as:
action = P4TableAction("ipv4_forward", port=1)
Reference "9.1.2 Action Specification":
The Action Protobuf has fields: (action_id, params). Finsy translates
name
to the appropriate action_id
as determined by P4Info. It also
translates each named argument in args
to the appropriate param_id
.
See Also:
To specify an action for an indirect table, use P4IndirectAction
.
Note that P4TableAction will automatically be promoted to an "indirect"
action if needed.
Operators:
A P4TableAction
supports the multiplication operator (*) for
constructing "weighted actions". A weighted action is used in specifying
indirect actions. Here is an action with a weight of 3:
weighted_action = 3 * P4TableAction("ipv4_forward", port=1)
To specify a weight with a `watch_port`, use a tuple `(weight, port)`.
The weight is always a positive integer.
See Also: - P4TableEntry
490 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 491 """Encode TableAction data as protobuf. 492 493 If the table is indirect, promote the action to a "one-shot" indirect 494 action. 495 """ 496 try: 497 action = table.actions[self.name] 498 except Exception as ex: 499 raise ValueError(f"{table.name!r}: {ex}") from ex 500 501 action_p4 = self._encode_action(action) 502 503 if table.action_profile is not None: 504 # Promote action to ActionProfileActionSet entry with weight=1. 505 return p4r.TableAction( 506 action_profile_action_set=p4r.ActionProfileActionSet( 507 action_profile_actions=[ 508 p4r.ActionProfileAction(action=action_p4, weight=1) 509 ] 510 ) 511 ) 512 513 return p4r.TableAction(action=action_p4)
Encode TableAction data as protobuf.
If the table is indirect, promote the action to a "one-shot" indirect action.
524 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 525 "Encode Action data as protobuf." 526 action = schema.actions[self.name] 527 return self._encode_action(action)
Encode Action data as protobuf.
547 @classmethod 548 def decode_table_action( 549 cls, msg: p4r.TableAction, table: P4Table 550 ) -> Self | "P4IndirectAction": 551 "Decode protobuf to TableAction data." 552 match msg.WhichOneof("type"): 553 case "action": 554 return cls.decode_action(msg.action, table) 555 case "action_profile_member_id": 556 return P4IndirectAction(member_id=msg.action_profile_member_id) 557 case "action_profile_group_id": 558 return P4IndirectAction(group_id=msg.action_profile_group_id) 559 case "action_profile_action_set": 560 return P4IndirectAction.decode_action_set( 561 msg.action_profile_action_set, table 562 ) 563 case other: 564 raise ValueError(f"unknown oneof: {other!r}")
Decode protobuf to TableAction data.
566 @classmethod 567 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 568 "Decode protobuf to Action data." 569 action = parent.actions[msg.action_id] 570 args = {} 571 for param in msg.params: 572 action_param = action.params[param.param_id] 573 value = action_param.decode_param(param) 574 args[action_param.name] = value 575 576 return cls(action.alias, **args)
Decode protobuf to Action data.
578 def format_str(self, schema: P4Schema | P4Table) -> str: 579 """Format the table action as a human-readable string. 580 581 The result is formatted to look like a function call: 582 583 ``` 584 name(param1=value1, ...) 585 ``` 586 587 Where `name` is the action name, and `(param<N>, value<N>)` are the 588 action parameters. The format of `value<N>` is schema-dependent. 589 """ 590 aps = schema.actions[self.name].params 591 args = [ 592 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 593 ] 594 595 return f"{self.name}({', '.join(args)})"
Format the table action as a human-readable string.
The result is formatted to look like a function call:
name(param1=value1, ...)
Where name
is the action name, and (param<N>, value<N>)
are the
action parameters. The format of value<N>
is schema-dependent.
885@decodable("table_entry") 886@dataclass(slots=True) 887class P4TableEntry(_P4Writable): 888 """Represents a P4Runtime table entry. 889 890 Attributes: 891 table_id (str): Name of the table. 892 match (P4TableMatch | None): Entry's match fields. 893 action (P4TableAction | P4IndirectAction | None): Entry's action. 894 is_default_action (bool): True if entry is the default table entry. 895 priority (int): Priority of a table entry when match implies TCAM lookup. 896 metadata (bytes): Arbitrary controller cookie (1.2.0). 897 controller_metadata (int): Deprecated controller cookie (< 1.2.0). 898 meter_config (P4MeterConfig | None): Meter configuration. 899 counter_data (P4CounterData | None): Counter data for table entry. 900 meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0). 901 idle_timeout_ns (int): Idle timeout in nanoseconds. 902 time_since_last_hit (int | None): Nanoseconds since entry last matched. 903 is_const (bool): True if entry is constant (1.4.0). 904 905 The most commonly used fields are table_id, match, action, is_default_action, 906 and priority. See the P4Runtime Spec for usage examples regarding the other 907 attributes. 908 909 When writing a P4TableEntry, you can specify the type of update using '+', 910 '-', and '~'. 911 912 Examples: 913 ``` 914 # Specify all tables when using "read". 915 entry = fy.P4TableEntry() 916 917 # Specify the table named "ipv4" when using "read". 918 entry = fy.P4TableEntry("ipv4") 919 920 # Specify the default entry in the "ipv4" table when using "read". 921 entry = fy.P4TableEntry("ipv4", is_default_action=True) 922 923 # Insert an entry into the "ipv4" table. 924 update = +fy.P4TableEntry( 925 "ipv4", 926 match=fy.Match(ipv4_dst="10.0.0.0/8"), 927 action=fy.Action("forward", port=1), 928 ) 929 930 # Modify the default action in the "ipv4" table. 931 update = ~fy.P4TableEntry( 932 "ipv4", 933 action=fy.Action("forward", port=5), 934 is_default_action=True 935 ) 936 ``` 937 938 Operators: 939 You can retrieve a match field from a table entry using `[]`. For 940 example, `entry["ipv4_dst"]` is the same as `entry.match["ipv4_dst"]`. 941 942 Formatting Helpers: 943 The `match_str` and `action_str` methods provide P4Info-aware formatting 944 of the match and action attributes. 945 """ 946 947 table_id: str = "" 948 "Name of the table." 949 _: KW_ONLY 950 match: P4TableMatch | None = None 951 "Entry's match fields." 952 action: P4TableAction | P4IndirectAction | None = None 953 "Entry's action." 954 is_default_action: bool = False 955 "True if entry is the default table entry." 956 priority: int = 0 957 "Priority of a table entry when match implies TCAM lookup." 958 metadata: bytes = b"" 959 "Arbitrary controller cookie. (1.2.0)." 960 controller_metadata: int = 0 961 "Deprecated controller cookie (< 1.2.0)." 962 meter_config: P4MeterConfig | None = None 963 "Meter configuration." 964 counter_data: P4CounterData | None = None 965 "Counter data for table entry." 966 meter_counter_data: P4MeterCounterData | None = None 967 "Meter counter data (1.4.0)." 968 idle_timeout_ns: int = 0 969 "Idle timeout in nanoseconds." 970 time_since_last_hit: int | None = None 971 "Nanoseconds since entry last matched." 972 is_const: bool = False 973 "True if entry is constant (1.4.0)." 974 975 def __getitem__(self, key: str) -> Any: 976 "Convenience accessor to retrieve a value from the `match` property." 977 if self.match is not None: 978 return self.match[key] 979 raise KeyError(key) 980 981 def encode(self, schema: P4Schema) -> p4r.Entity: 982 "Encode TableEntry data as protobuf." 983 return p4r.Entity(table_entry=self.encode_entry(schema)) 984 985 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 986 "Encode TableEntry data as protobuf." 987 if not self.table_id: 988 return self._encode_empty() 989 990 table = schema.tables[self.table_id] 991 992 if self.match: 993 match = self.match.encode(table) 994 else: 995 match = None 996 997 if self.action: 998 action = self.action.encode_table_action(table) 999 else: 1000 action = None 1001 1002 if self.meter_config: 1003 meter_config = self.meter_config.encode() 1004 else: 1005 meter_config = None 1006 1007 if self.counter_data: 1008 counter_data = self.counter_data.encode() 1009 else: 1010 counter_data = None 1011 1012 if self.meter_counter_data: 1013 meter_counter_data = self.meter_counter_data.encode() 1014 else: 1015 meter_counter_data = None 1016 1017 if self.time_since_last_hit is not None: 1018 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1019 elapsed_ns=self.time_since_last_hit 1020 ) 1021 else: 1022 time_since_last_hit = None 1023 1024 return p4r.TableEntry( 1025 table_id=table.id, 1026 match=match, 1027 action=action, 1028 priority=self.priority, 1029 controller_metadata=self.controller_metadata, 1030 meter_config=meter_config, 1031 counter_data=counter_data, 1032 meter_counter_data=meter_counter_data, 1033 is_default_action=self.is_default_action, 1034 idle_timeout_ns=self.idle_timeout_ns, 1035 time_since_last_hit=time_since_last_hit, 1036 metadata=self.metadata, 1037 is_const=self.is_const, 1038 ) 1039 1040 def _encode_empty(self) -> p4r.TableEntry: 1041 "Encode an empty wildcard request." 1042 if self.counter_data is not None: 1043 counter_data = self.counter_data.encode() 1044 else: 1045 counter_data = None 1046 1047 # FIXME: time_since_last_hit not supported for wildcard reads? 1048 if self.time_since_last_hit is not None: 1049 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1050 elapsed_ns=self.time_since_last_hit 1051 ) 1052 else: 1053 time_since_last_hit = None 1054 1055 return p4r.TableEntry( 1056 counter_data=counter_data, 1057 time_since_last_hit=time_since_last_hit, 1058 ) 1059 1060 @classmethod 1061 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1062 "Decode protobuf to TableEntry data." 1063 return cls.decode_entry(msg.table_entry, schema) 1064 1065 @classmethod 1066 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1067 "Decode protobuf to TableEntry data." 1068 if entry.table_id == 0: 1069 return cls("") 1070 1071 table = schema.tables[entry.table_id] 1072 1073 if entry.match: 1074 match = P4TableMatch.decode(entry.match, table) 1075 else: 1076 match = None 1077 1078 if entry.HasField("action"): 1079 action = P4TableAction.decode_table_action(entry.action, table) 1080 else: 1081 action = None 1082 1083 if entry.HasField("time_since_last_hit"): 1084 last_hit = entry.time_since_last_hit.elapsed_ns 1085 else: 1086 last_hit = None 1087 1088 if entry.HasField("meter_config"): 1089 meter_config = P4MeterConfig.decode(entry.meter_config) 1090 else: 1091 meter_config = None 1092 1093 if entry.HasField("counter_data"): 1094 counter_data = P4CounterData.decode(entry.counter_data) 1095 else: 1096 counter_data = None 1097 1098 if entry.HasField("meter_counter_data"): 1099 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1100 else: 1101 meter_counter_data = None 1102 1103 return cls( 1104 table_id=table.alias, 1105 match=match, 1106 action=action, 1107 priority=entry.priority, 1108 controller_metadata=entry.controller_metadata, 1109 meter_config=meter_config, 1110 counter_data=counter_data, 1111 meter_counter_data=meter_counter_data, 1112 is_default_action=entry.is_default_action, 1113 idle_timeout_ns=entry.idle_timeout_ns, 1114 time_since_last_hit=last_hit, 1115 metadata=entry.metadata, 1116 is_const=entry.is_const, 1117 ) 1118 1119 def match_dict( 1120 self, 1121 schema: P4Schema, 1122 *, 1123 wildcard: str | None = None, 1124 ) -> dict[str, str]: 1125 """Format the match fields as a dictionary of strings. 1126 1127 If `wildcard` is None, only include match fields that have values. If 1128 `wildcard` is set, include all field names but replace unset values with 1129 given wildcard value (e.g. "*") 1130 """ 1131 table = schema.tables[self.table_id] 1132 if self.match is not None: 1133 return self.match.format_dict(table, wildcard=wildcard) 1134 return P4TableMatch().format_dict(table, wildcard=wildcard) 1135 1136 def match_str( 1137 self, 1138 schema: P4Schema, 1139 *, 1140 wildcard: str | None = None, 1141 ) -> str: 1142 "Format the match fields as a human-readable, canonical string." 1143 table = schema.tables[self.table_id] 1144 if self.match is not None: 1145 return self.match.format_str(table, wildcard=wildcard) 1146 return P4TableMatch().format_str(table, wildcard=wildcard) 1147 1148 def action_str(self, schema: P4Schema) -> str: 1149 "Format the actions as a human-readable, canonical string." 1150 table = schema.tables[self.table_id] 1151 if self.action is None: 1152 return NOACTION_STR 1153 return self.action.format_str(table)
Represents a P4Runtime table entry.
Attributes: table_id (str): Name of the table. match (P4TableMatch | None): Entry's match fields. action (P4TableAction | P4IndirectAction | None): Entry's action. is_default_action (bool): True if entry is the default table entry. priority (int): Priority of a table entry when match implies TCAM lookup. metadata (bytes): Arbitrary controller cookie (1.2.0). controller_metadata (int): Deprecated controller cookie (< 1.2.0). meter_config (P4MeterConfig | None): Meter configuration. counter_data (P4CounterData | None): Counter data for table entry. meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0). idle_timeout_ns (int): Idle timeout in nanoseconds. time_since_last_hit (int | None): Nanoseconds since entry last matched. is_const (bool): True if entry is constant (1.4.0).
The most commonly used fields are table_id, match, action, is_default_action, and priority. See the P4Runtime Spec for usage examples regarding the other attributes.
When writing a P4TableEntry, you can specify the type of update using '+', '-', and '~'.
Examples:
# Specify all tables when using "read".
entry = fy.P4TableEntry()
# Specify the table named "ipv4" when using "read".
entry = fy.P4TableEntry("ipv4")
# Specify the default entry in the "ipv4" table when using "read".
entry = fy.P4TableEntry("ipv4", is_default_action=True)
# Insert an entry into the "ipv4" table.
update = +fy.P4TableEntry(
"ipv4",
match=fyfinsy.Match(ipv4_dst="10.0.0.0/8"),
action=fyfinsy.Action("forward", port=1),
)
# Modify the default action in the "ipv4" table.
update = ~fy.P4TableEntry(
"ipv4",
action=fyfinsy.Action("forward", port=5),
is_default_action=True
)
Operators:
You can retrieve a match field from a table entry using []
. For
example, entry["ipv4_dst"]
is the same as entry.match["ipv4_dst"]
.
Formatting Helpers:
The match_str
and action_str
methods provide P4Info-aware formatting
of the match and action attributes.
975 def __getitem__(self, key: str) -> Any: 976 "Convenience accessor to retrieve a value from the `match` property." 977 if self.match is not None: 978 return self.match[key] 979 raise KeyError(key)
Convenience accessor to retrieve a value from the match
property.
981 def encode(self, schema: P4Schema) -> p4r.Entity: 982 "Encode TableEntry data as protobuf." 983 return p4r.Entity(table_entry=self.encode_entry(schema))
Encode TableEntry data as protobuf.
985 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 986 "Encode TableEntry data as protobuf." 987 if not self.table_id: 988 return self._encode_empty() 989 990 table = schema.tables[self.table_id] 991 992 if self.match: 993 match = self.match.encode(table) 994 else: 995 match = None 996 997 if self.action: 998 action = self.action.encode_table_action(table) 999 else: 1000 action = None 1001 1002 if self.meter_config: 1003 meter_config = self.meter_config.encode() 1004 else: 1005 meter_config = None 1006 1007 if self.counter_data: 1008 counter_data = self.counter_data.encode() 1009 else: 1010 counter_data = None 1011 1012 if self.meter_counter_data: 1013 meter_counter_data = self.meter_counter_data.encode() 1014 else: 1015 meter_counter_data = None 1016 1017 if self.time_since_last_hit is not None: 1018 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1019 elapsed_ns=self.time_since_last_hit 1020 ) 1021 else: 1022 time_since_last_hit = None 1023 1024 return p4r.TableEntry( 1025 table_id=table.id, 1026 match=match, 1027 action=action, 1028 priority=self.priority, 1029 controller_metadata=self.controller_metadata, 1030 meter_config=meter_config, 1031 counter_data=counter_data, 1032 meter_counter_data=meter_counter_data, 1033 is_default_action=self.is_default_action, 1034 idle_timeout_ns=self.idle_timeout_ns, 1035 time_since_last_hit=time_since_last_hit, 1036 metadata=self.metadata, 1037 is_const=self.is_const, 1038 )
Encode TableEntry data as protobuf.
1060 @classmethod 1061 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1062 "Decode protobuf to TableEntry data." 1063 return cls.decode_entry(msg.table_entry, schema)
Decode protobuf to TableEntry data.
1065 @classmethod 1066 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1067 "Decode protobuf to TableEntry data." 1068 if entry.table_id == 0: 1069 return cls("") 1070 1071 table = schema.tables[entry.table_id] 1072 1073 if entry.match: 1074 match = P4TableMatch.decode(entry.match, table) 1075 else: 1076 match = None 1077 1078 if entry.HasField("action"): 1079 action = P4TableAction.decode_table_action(entry.action, table) 1080 else: 1081 action = None 1082 1083 if entry.HasField("time_since_last_hit"): 1084 last_hit = entry.time_since_last_hit.elapsed_ns 1085 else: 1086 last_hit = None 1087 1088 if entry.HasField("meter_config"): 1089 meter_config = P4MeterConfig.decode(entry.meter_config) 1090 else: 1091 meter_config = None 1092 1093 if entry.HasField("counter_data"): 1094 counter_data = P4CounterData.decode(entry.counter_data) 1095 else: 1096 counter_data = None 1097 1098 if entry.HasField("meter_counter_data"): 1099 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1100 else: 1101 meter_counter_data = None 1102 1103 return cls( 1104 table_id=table.alias, 1105 match=match, 1106 action=action, 1107 priority=entry.priority, 1108 controller_metadata=entry.controller_metadata, 1109 meter_config=meter_config, 1110 counter_data=counter_data, 1111 meter_counter_data=meter_counter_data, 1112 is_default_action=entry.is_default_action, 1113 idle_timeout_ns=entry.idle_timeout_ns, 1114 time_since_last_hit=last_hit, 1115 metadata=entry.metadata, 1116 is_const=entry.is_const, 1117 )
Decode protobuf to TableEntry data.
1119 def match_dict( 1120 self, 1121 schema: P4Schema, 1122 *, 1123 wildcard: str | None = None, 1124 ) -> dict[str, str]: 1125 """Format the match fields as a dictionary of strings. 1126 1127 If `wildcard` is None, only include match fields that have values. If 1128 `wildcard` is set, include all field names but replace unset values with 1129 given wildcard value (e.g. "*") 1130 """ 1131 table = schema.tables[self.table_id] 1132 if self.match is not None: 1133 return self.match.format_dict(table, wildcard=wildcard) 1134 return P4TableMatch().format_dict(table, wildcard=wildcard)
Format the match fields as a dictionary of strings.
If wildcard
is None, only include match fields that have values. If
wildcard
is set, include all field names but replace unset values with
given wildcard value (e.g. "*")
1136 def match_str( 1137 self, 1138 schema: P4Schema, 1139 *, 1140 wildcard: str | None = None, 1141 ) -> str: 1142 "Format the match fields as a human-readable, canonical string." 1143 table = schema.tables[self.table_id] 1144 if self.match is not None: 1145 return self.match.format_str(table, wildcard=wildcard) 1146 return P4TableMatch().format_str(table, wildcard=wildcard)
Format the match fields as a human-readable, canonical string.
1148 def action_str(self, schema: P4Schema) -> str: 1149 "Format the actions as a human-readable, canonical string." 1150 table = schema.tables[self.table_id] 1151 if self.action is None: 1152 return NOACTION_STR 1153 return self.action.format_str(table)
Format the actions as a human-readable, canonical string.
Inherited Members
- finsy.p4entity._P4Writable
- encode_update
Match
is an alias for P4TableMatch.
292class P4TableMatch(dict[str, Any]): 293 """Represents a set of P4Runtime field matches. 294 295 Each match field is stored as a dictionary key, where the key is the name 296 of the match field. The field's value should be appropriate for the type 297 of match (EXACT, LPM, TERNARY, etc.) 298 299 Construct a match similar to a dictionary. 300 301 Example: 302 ``` 303 # Keyword arguments: 304 match = P4TableMatch(ipv4_dst="10.0.0.1") 305 306 # Dictionary argument: 307 match = P4TableMatch({"ipv4_dst": "10.0.0.1"}) 308 309 # List of 2-tuples: 310 match = P4TableMatch([("ipv4_dst", "10.0.0.1")]) 311 ``` 312 313 P4TableMatch is implemented as a subclass of `dict`. It supports all of the 314 standard dictionary methods: 315 ``` 316 match = P4TableMatch() 317 match["ipv4_dst"] = "10.0.0.1" 318 assert len(match) == 1 319 ``` 320 321 Reference "9.1.1 Match Format": 322 Each match field is translated to a FieldMatch Protobuf message by 323 translating the entry key to a `field_id`. The type of match (EXACT, 324 LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the 325 value is converted to the Protobuf representation. 326 327 Supported String Values: 328 ``` 329 EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01" 330 331 LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128", 332 "00:00:00:00:00:01/48" 333 (+ all exact formats are promoted to all-1 masks) 334 335 TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255", 336 "2000::1/&128", "00:00:00:00:00:01/&48" 337 (+ all exact formats are promoted to all-1 masks) 338 (+ all lpm formats are promoted to the specified contiguous mask) 339 340 RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9", 341 "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09" 342 (+ all exact formats are promoted to single-value ranges) 343 344 OPTIONAL: Same as exact format. 345 ``` 346 347 See the `p4values.py` module for all supported value classes. 348 349 TODO: 350 - Change range delimiter to '-' (and drop '-' delimited MAC's). 351 - Consider supporting ternary values with just '/' (and drop support 352 for decimal masks; mask must be hexadecimal number). 353 354 See Also: 355 - P4TableEntry 356 """ 357 358 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 359 "Encode TableMatch data as a list of Protobuf fields." 360 result: list[p4r.FieldMatch] = [] 361 match_fields = table.match_fields 362 363 for key, value in self.items(): 364 try: 365 field = match_fields[key].encode_field(value) 366 if field is not None: 367 result.append(field) 368 except Exception as ex: 369 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 370 371 return result 372 373 @classmethod 374 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 375 "Decode Protobuf fields as TableMatch data." 376 result: dict[str, Any] = {} 377 match_fields = table.match_fields 378 379 for field in msgs: 380 fld = match_fields[field.field_id] 381 result[fld.alias] = fld.decode_field(field) 382 383 return cls(result) 384 385 def format_dict( 386 self, 387 table: P4Table, 388 *, 389 wildcard: str | None = None, 390 ) -> dict[str, str]: 391 """Format the table match fields as a human-readable dictionary. 392 393 The result is a dictionary showing the TableMatch data for fields 394 included in the match. If `wildcard` is specified, all fields defined 395 in P4Info will be included with their value set to the wildcard string. 396 397 Values are formatted using the format/type specified in P4Info. 398 """ 399 result: dict[str, str] = {} 400 401 for fld in table.match_fields: 402 value = self.get(fld.alias, None) 403 if value is not None: 404 result[fld.alias] = fld.format_field(value) 405 elif wildcard is not None: 406 result[fld.alias] = wildcard 407 408 return result 409 410 def format_str( 411 self, 412 table: P4Table, 413 *, 414 wildcard: str | None = None, 415 ) -> str: 416 """Format the table match fields as a human-readable string. 417 418 The result is a string showing the TableMatch data for fields included 419 in the match. If `wildcard` is specified, all fields defined in P4Info 420 will be included with their value set to the wildcard string. 421 422 All fields are formatted as "name=value" and they are delimited by 423 spaces. 424 425 Values are formatted using the format/type specified in P4Info. 426 """ 427 result: list[str] = [] 428 429 for fld in table.match_fields: 430 value = self.get(fld.alias, None) 431 if value is not None: 432 result.append(f"{fld.alias}={fld.format_field(value)}") 433 elif wildcard is not None: 434 result.append(f"{fld.alias}={wildcard}") 435 436 return " ".join(result)
Represents a set of P4Runtime field matches.
Each match field is stored as a dictionary key, where the key is the name of the match field. The field's value should be appropriate for the type of match (EXACT, LPM, TERNARY, etc.)
Construct a match similar to a dictionary.
Example:
# Keyword arguments:
match = P4TableMatch(ipv4_dst="10.0.0.1")
# Dictionary argument:
match = P4TableMatch({"ipv4_dst": "10.0.0.1"})
# List of 2-tuples:
match = P4TableMatch([("ipv4_dst", "10.0.0.1")])
P4TableMatch is implemented as a subclass of dict
. It supports all of the
standard dictionary methods:
match = P4TableMatch()
match["ipv4_dst"] = "10.0.0.1"
assert len(match) == 1
Reference "9.1.1 Match Format":
Each match field is translated to a FieldMatch Protobuf message by
translating the entry key to a field_id
. The type of match (EXACT,
LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the
value is converted to the Protobuf representation.
Supported String Values:
EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"
LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
"00:00:00:00:00:01/48"
(+ all exact formats are promoted to all-1 masks)
TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
"2000::1/&128", "00:00:00:00:00:01/&48"
(+ all exact formats are promoted to all-1 masks)
(+ all lpm formats are promoted to the specified contiguous mask)
RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
"2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
(+ all exact formats are promoted to single-value ranges)
OPTIONAL: Same as exact format.
See the p4values.py
module for all supported value classes.
TODO: - Change range delimiter to '-' (and drop '-' delimited MAC's). - Consider supporting ternary values with just '/' (and drop support for decimal masks; mask must be hexadecimal number).
See Also: - P4TableEntry
358 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 359 "Encode TableMatch data as a list of Protobuf fields." 360 result: list[p4r.FieldMatch] = [] 361 match_fields = table.match_fields 362 363 for key, value in self.items(): 364 try: 365 field = match_fields[key].encode_field(value) 366 if field is not None: 367 result.append(field) 368 except Exception as ex: 369 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 370 371 return result
Encode TableMatch data as a list of Protobuf fields.
373 @classmethod 374 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 375 "Decode Protobuf fields as TableMatch data." 376 result: dict[str, Any] = {} 377 match_fields = table.match_fields 378 379 for field in msgs: 380 fld = match_fields[field.field_id] 381 result[fld.alias] = fld.decode_field(field) 382 383 return cls(result)
Decode Protobuf fields as TableMatch data.
385 def format_dict( 386 self, 387 table: P4Table, 388 *, 389 wildcard: str | None = None, 390 ) -> dict[str, str]: 391 """Format the table match fields as a human-readable dictionary. 392 393 The result is a dictionary showing the TableMatch data for fields 394 included in the match. If `wildcard` is specified, all fields defined 395 in P4Info will be included with their value set to the wildcard string. 396 397 Values are formatted using the format/type specified in P4Info. 398 """ 399 result: dict[str, str] = {} 400 401 for fld in table.match_fields: 402 value = self.get(fld.alias, None) 403 if value is not None: 404 result[fld.alias] = fld.format_field(value) 405 elif wildcard is not None: 406 result[fld.alias] = wildcard 407 408 return result
Format the table match fields as a human-readable dictionary.
The result is a dictionary showing the TableMatch data for fields
included in the match. If wildcard
is specified, all fields defined
in P4Info will be included with their value set to the wildcard string.
Values are formatted using the format/type specified in P4Info.
410 def format_str( 411 self, 412 table: P4Table, 413 *, 414 wildcard: str | None = None, 415 ) -> str: 416 """Format the table match fields as a human-readable string. 417 418 The result is a string showing the TableMatch data for fields included 419 in the match. If `wildcard` is specified, all fields defined in P4Info 420 will be included with their value set to the wildcard string. 421 422 All fields are formatted as "name=value" and they are delimited by 423 spaces. 424 425 Values are formatted using the format/type specified in P4Info. 426 """ 427 result: list[str] = [] 428 429 for fld in table.match_fields: 430 value = self.get(fld.alias, None) 431 if value is not None: 432 result.append(f"{fld.alias}={fld.format_field(value)}") 433 elif wildcard is not None: 434 result.append(f"{fld.alias}={wildcard}") 435 436 return " ".join(result)
Format the table match fields as a human-readable string.
The result is a string showing the TableMatch data for fields included
in the match. If wildcard
is specified, all fields defined in P4Info
will be included with their value set to the wildcard string.
All fields are formatted as "name=value" and they are delimited by spaces.
Values are formatted using the format/type specified in P4Info.
Inherited Members
- builtins.dict
- __iter__
- __len__
- __getitem__
- __contains__
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
1848@decodable("value_set_entry") 1849@dataclass(slots=True) 1850class P4ValueSetEntry(_P4ModifyOnly): 1851 "Represents a P4Runtime ValueSetEntry." 1852 1853 value_set_id: str 1854 _: KW_ONLY 1855 members: list[P4ValueSetMember] 1856 1857 def encode(self, schema: P4Schema) -> p4r.Entity: 1858 "Encode P4ValueSetEntry as protobuf." 1859 value_set = schema.value_sets[self.value_set_id] 1860 members = [ 1861 p4r.ValueSetMember(match=member.encode(value_set)) 1862 for member in self.members 1863 ] 1864 1865 return p4r.Entity( 1866 value_set_entry=p4r.ValueSetEntry( 1867 value_set_id=value_set.id, members=members 1868 ) 1869 ) 1870 1871 @classmethod 1872 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1873 "Decode protobuf to P4ValueSetEntry." 1874 entry = msg.value_set_entry 1875 value_set = schema.value_sets[entry.value_set_id] 1876 1877 members = [ 1878 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 1879 ] 1880 1881 return cls(value_set.alias, members=members)
Represents a P4Runtime ValueSetEntry.
1857 def encode(self, schema: P4Schema) -> p4r.Entity: 1858 "Encode P4ValueSetEntry as protobuf." 1859 value_set = schema.value_sets[self.value_set_id] 1860 members = [ 1861 p4r.ValueSetMember(match=member.encode(value_set)) 1862 for member in self.members 1863 ] 1864 1865 return p4r.Entity( 1866 value_set_entry=p4r.ValueSetEntry( 1867 value_set_id=value_set.id, members=members 1868 ) 1869 )
Encode P4ValueSetEntry as protobuf.
1871 @classmethod 1872 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1873 "Decode protobuf to P4ValueSetEntry." 1874 entry = msg.value_set_entry 1875 value_set = schema.value_sets[entry.value_set_id] 1876 1877 members = [ 1878 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 1879 ] 1880 1881 return cls(value_set.alias, members=members)
Decode protobuf to P4ValueSetEntry.
Inherited Members
- finsy.p4entity._P4ModifyOnly
- encode_update
110class P4ConfigAction(_EnumBase): 111 "IntEnum equivalent to `p4r.SetForwardingPipelineConfigRequest.Action`." 112 UNSPECIFIED = p4r.SetForwardingPipelineConfigRequest.Action.UNSPECIFIED 113 VERIFY = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY 114 VERIFY_AND_SAVE = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_SAVE 115 VERIFY_AND_COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT 116 COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.COMMIT 117 RECONCILE_AND_COMMIT = ( 118 p4r.SetForwardingPipelineConfigRequest.Action.RECONCILE_AND_COMMIT 119 ) 120 121 def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType: 122 "Cast `self` to `ValueType`." 123 return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)
IntEnum equivalent to p4r.SetForwardingPipelineConfigRequest.Action
.
121 def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType: 122 "Cast `self` to `ValueType`." 123 return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)
Cast self
to ValueType
.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
479class P4Schema(_ReprMixin): 480 """Represents a P4Info file and its associated P4 blob (optional). 481 482 ``` 483 p4 = P4Schema(Path("basic.p4info.txtpb")) 484 ``` 485 486 This class parses the P4Info contents to produce an in-memory representation 487 of the tables, actions, types, etc. inside. This in-memory graph of the 488 contents of the P4Info file may be shared when we parse identical 489 P4Info files. The sharing of P4Info data is controlled by the 490 `P4SchemaCache` class. 491 """ 492 493 _p4info: p4i.P4Info | None 494 _p4blob: Path | bytes | SupportsBytes | None 495 _p4defs: _P4Defs # possibly shared in-memory representation 496 _p4cookie: int = 0 497 498 def __init__( 499 self, 500 p4info: p4i.P4Info | Path | None = None, 501 p4blob: Path | bytes | SupportsBytes | None = None, 502 ): 503 self._p4blob = p4blob 504 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 505 p4info, 506 self._p4blob, 507 ) 508 509 @property 510 def exists(self) -> bool: 511 "True if p4info is configured." 512 return self._p4info is not None 513 514 @property 515 def is_authoritative(self) -> bool: 516 "True if both p4info and p4blob are configured." 517 return self._p4info is not None and self._p4blob is not None 518 519 @property 520 def p4info(self) -> p4i.P4Info: 521 "P4Info value." 522 if self._p4info is None: 523 raise ValueError("No P4Info configured.") 524 return self._p4info 525 526 def set_p4info(self, p4info: p4i.P4Info) -> None: 527 "Set P4Info using value returned from switch." 528 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 529 p4info, 530 self._p4blob, 531 ) 532 533 def has_p4info(self, p4info: p4i.P4Info) -> bool: 534 "Return true if the current P4Info equals the given P4Info." 535 if self._p4info is None: 536 return False 537 return self._p4info.SerializeToString( 538 deterministic=True 539 ) == p4info.SerializeToString(deterministic=True) 540 541 @property 542 def p4blob(self) -> bytes: 543 "P4Blob value a.k.a p4_device_config." 544 return _blob_bytes(self._p4blob) 545 546 @property 547 def p4cookie(self) -> int: 548 """Cookie value for p4info and p4blob.""" 549 return self._p4cookie 550 551 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 552 """The forwarding pipeline configuration.""" 553 return p4r.ForwardingPipelineConfig( 554 p4info=self.p4info, 555 p4_device_config=self.p4blob, 556 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 557 ) 558 559 def get_pipeline_info(self) -> str: 560 "Concise string description of the pipeline (suitable for logging)." 561 if self.exists: 562 pipeline = self.name 563 version = self.version 564 arch = self.arch 565 return f"{pipeline=} {version=} {arch=}" 566 567 return "<No pipeline exists>" 568 569 @property 570 def name(self) -> str: 571 "Name from pkg_info." 572 if self._p4info is None: 573 return "" 574 return self._p4info.pkg_info.name 575 576 @property 577 def version(self) -> str: 578 "Version from pkg_info." 579 if self._p4info is None: 580 return "" 581 return self._p4info.pkg_info.version 582 583 @property 584 def arch(self) -> str: 585 "Arch from pkg_info." 586 if self._p4info is None: 587 return "" 588 return self._p4info.pkg_info.arch 589 590 @property 591 def pkg_info(self) -> p4i.PkgInfo: 592 """Protobuf message containing original `PkgInfo` header. 593 594 Use this to access less frequently used fields like `contact`, `url`, 595 and `platform_properties`. 596 """ 597 if self._p4info is None: 598 raise ValueError("P4Info: No pipeline configured") 599 return self._p4info.pkg_info 600 601 @property 602 def tables(self) -> P4EntityMap["P4Table"]: 603 "Collection of P4 tables." 604 return self._p4defs.tables 605 606 @property 607 def actions(self) -> P4EntityMap["P4Action"]: 608 "Collection of P4 actions." 609 return self._p4defs.actions 610 611 @property 612 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 613 "Collection of P4 action profiles." 614 return self._p4defs.action_profiles 615 616 @property 617 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 618 "Collection of P4 controller packet metadata." 619 return self._p4defs.controller_packet_metadata 620 621 @property 622 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 623 "Collection of P4 direct counters." 624 return self._p4defs.direct_counters 625 626 @property 627 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 628 "Collection of P4 direct meters." 629 return self._p4defs.direct_meters 630 631 @property 632 def counters(self) -> P4EntityMap["P4Counter"]: 633 "Collection of P4 counters." 634 return self._p4defs.counters 635 636 @property 637 def meters(self) -> P4EntityMap["P4Meter"]: 638 "Collection of P4 meters." 639 return self._p4defs.meters 640 641 @property 642 def registers(self) -> P4EntityMap["P4Register"]: 643 "Collection of P4 registers." 644 return self._p4defs.registers 645 646 @property 647 def digests(self) -> P4EntityMap["P4Digest"]: 648 "Collection of P4 digests." 649 return self._p4defs.digests 650 651 @property 652 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 653 "Collection of P4 value sets." 654 return self._p4defs.value_sets 655 656 @property 657 def type_info(self) -> "P4TypeInfo": 658 "Type Info object." 659 return self._p4defs.type_info 660 661 @property 662 def externs(self) -> "P4ExternMap": 663 "Collection of P4 extern instances." 664 return self._p4defs.externs 665 666 def __str__(self) -> str: 667 if self._p4info is None: 668 return "<P4Info: No pipeline configured>" 669 return str(P4SchemaDescription(self))
Represents a P4Info file and its associated P4 blob (optional).
p4 = P4Schema(Path("basic.p4info.txtpb"))
This class parses the P4Info contents to produce an in-memory representation
of the tables, actions, types, etc. inside. This in-memory graph of the
contents of the P4Info file may be shared when we parse identical
P4Info files. The sharing of P4Info data is controlled by the
P4SchemaCache
class.
509 @property 510 def exists(self) -> bool: 511 "True if p4info is configured." 512 return self._p4info is not None
True if p4info is configured.
519 @property 520 def p4info(self) -> p4i.P4Info: 521 "P4Info value." 522 if self._p4info is None: 523 raise ValueError("No P4Info configured.") 524 return self._p4info
P4Info value.
526 def set_p4info(self, p4info: p4i.P4Info) -> None: 527 "Set P4Info using value returned from switch." 528 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 529 p4info, 530 self._p4blob, 531 )
Set P4Info using value returned from switch.
533 def has_p4info(self, p4info: p4i.P4Info) -> bool: 534 "Return true if the current P4Info equals the given P4Info." 535 if self._p4info is None: 536 return False 537 return self._p4info.SerializeToString( 538 deterministic=True 539 ) == p4info.SerializeToString(deterministic=True)
Return true if the current P4Info equals the given P4Info.
541 @property 542 def p4blob(self) -> bytes: 543 "P4Blob value a.k.a p4_device_config." 544 return _blob_bytes(self._p4blob)
P4Blob value a.k.a p4_device_config.
551 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 552 """The forwarding pipeline configuration.""" 553 return p4r.ForwardingPipelineConfig( 554 p4info=self.p4info, 555 p4_device_config=self.p4blob, 556 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 557 )
The forwarding pipeline configuration.
559 def get_pipeline_info(self) -> str: 560 "Concise string description of the pipeline (suitable for logging)." 561 if self.exists: 562 pipeline = self.name 563 version = self.version 564 arch = self.arch 565 return f"{pipeline=} {version=} {arch=}" 566 567 return "<No pipeline exists>"
Concise string description of the pipeline (suitable for logging).
569 @property 570 def name(self) -> str: 571 "Name from pkg_info." 572 if self._p4info is None: 573 return "" 574 return self._p4info.pkg_info.name
Name from pkg_info.
576 @property 577 def version(self) -> str: 578 "Version from pkg_info." 579 if self._p4info is None: 580 return "" 581 return self._p4info.pkg_info.version
Version from pkg_info.
583 @property 584 def arch(self) -> str: 585 "Arch from pkg_info." 586 if self._p4info is None: 587 return "" 588 return self._p4info.pkg_info.arch
Arch from pkg_info.
590 @property 591 def pkg_info(self) -> p4i.PkgInfo: 592 """Protobuf message containing original `PkgInfo` header. 593 594 Use this to access less frequently used fields like `contact`, `url`, 595 and `platform_properties`. 596 """ 597 if self._p4info is None: 598 raise ValueError("P4Info: No pipeline configured") 599 return self._p4info.pkg_info
Protobuf message containing original PkgInfo
header.
Use this to access less frequently used fields like contact
, url
,
and platform_properties
.
601 @property 602 def tables(self) -> P4EntityMap["P4Table"]: 603 "Collection of P4 tables." 604 return self._p4defs.tables
Collection of P4 tables.
606 @property 607 def actions(self) -> P4EntityMap["P4Action"]: 608 "Collection of P4 actions." 609 return self._p4defs.actions
Collection of P4 actions.
611 @property 612 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 613 "Collection of P4 action profiles." 614 return self._p4defs.action_profiles
Collection of P4 action profiles.
616 @property 617 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 618 "Collection of P4 controller packet metadata." 619 return self._p4defs.controller_packet_metadata
Collection of P4 controller packet metadata.
621 @property 622 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 623 "Collection of P4 direct counters." 624 return self._p4defs.direct_counters
Collection of P4 direct counters.
626 @property 627 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 628 "Collection of P4 direct meters." 629 return self._p4defs.direct_meters
Collection of P4 direct meters.
631 @property 632 def counters(self) -> P4EntityMap["P4Counter"]: 633 "Collection of P4 counters." 634 return self._p4defs.counters
Collection of P4 counters.
636 @property 637 def meters(self) -> P4EntityMap["P4Meter"]: 638 "Collection of P4 meters." 639 return self._p4defs.meters
Collection of P4 meters.
641 @property 642 def registers(self) -> P4EntityMap["P4Register"]: 643 "Collection of P4 registers." 644 return self._p4defs.registers
Collection of P4 registers.
646 @property 647 def digests(self) -> P4EntityMap["P4Digest"]: 648 "Collection of P4 digests." 649 return self._p4defs.digests
Collection of P4 digests.
651 @property 652 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 653 "Collection of P4 value sets." 654 return self._p4defs.value_sets
Collection of P4 value sets.
157@final 158class Switch: 159 """Represents a P4Runtime Switch. 160 161 A `Switch` is constructed with a `name`, `address` and an optional 162 `SwitchOptions` configuration. 163 164 The `name` is up to the user but should uniquely identify the switch. 165 166 The `address` identifies the target endpoint of the GRPC channel. It should 167 have the format "<address>:<port>" where <address> can be a domain name, 168 IPv4 address, or IPv6 address in square brackets. 169 170 The `options` is a `SwitchOptions` object that specifies how the `Switch` 171 will behave. 172 173 ``` 174 opts = SwitchOptions(p4info=..., p4blob=...) 175 sw1 = Switch('sw1', '10.0.0.1:50000', opts) 176 ``` 177 178 Each switch object has an event emitter `ee`. Use the EventEmitter to listen 179 for port change events like PORT_UP and PORT_DOWN. See the `SwitchEvent` 180 class for a list of support switch events. 181 """ 182 183 _name: str 184 _address: str 185 _options: SwitchOptions 186 _stash: dict[str, Any] 187 _ee: "SwitchEmitter" 188 _p4client: P4Client | None 189 _p4schema: P4Schema 190 _tasks: "SwitchTasks | None" 191 _packet_queues: list[tuple[Callable[[bytes], bool], Queue[p4entity.P4PacketIn]]] 192 _digest_queues: dict[str, Queue[p4entity.P4DigestList]] 193 _timeout_queue: Queue[p4entity.P4IdleTimeoutNotification] | None 194 _arbitrator: "Arbitrator" 195 _gnmi_client: GNMIClient | None 196 _ports: SwitchPortList 197 _is_channel_up: bool = False 198 _api_version: ApiVersion = ApiVersion(1, 0, 0, "") 199 _control_task: asyncio.Task[Any] | None = None 200 201 def __init__( 202 self, 203 name: str, 204 address: str, 205 options: SwitchOptions | None = None, 206 ) -> None: 207 if options is None: 208 options = SwitchOptions() 209 210 self._name = name 211 self._address = address 212 self._options = options 213 self._stash = {} 214 self._ee = SwitchEmitter(self) 215 self._p4client = None 216 self._p4schema = P4Schema(options.p4info, options.p4blob) 217 self._tasks = None 218 self._packet_queues = [] 219 self._digest_queues = {} 220 self._timeout_queue = None 221 self._arbitrator = Arbitrator( 222 options.initial_election_id, options.role_name, options.role_config 223 ) 224 self._gnmi_client = None 225 self._ports = SwitchPortList() 226 227 @property 228 def name(self) -> str: 229 "Name of the switch." 230 return self._name 231 232 @property 233 def address(self) -> str: 234 "Address of the switch." 235 return self._address 236 237 @property 238 def options(self) -> SwitchOptions: 239 "Switch options." 240 return self._options 241 242 @options.setter 243 def options(self, opts: SwitchOptions) -> None: 244 "Set switch options to a new value." 245 if self._p4client is not None: 246 raise RuntimeError("Cannot change switch options while client is open.") 247 248 self._options = opts 249 self._p4schema = P4Schema(opts.p4info, opts.p4blob) 250 self._arbitrator = Arbitrator( 251 opts.initial_election_id, opts.role_name, opts.role_config 252 ) 253 254 @property 255 def stash(self) -> dict[str, Any]: 256 "Switch stash, may be used to store per-switch data for any purpose." 257 return self._stash 258 259 @property 260 def ee(self) -> "SwitchEmitter": 261 "Switch event emitter. See `SwitchEvent` for more details on events." 262 return self._ee 263 264 @property 265 def device_id(self) -> int: 266 "Switch's device ID." 267 return self._options.device_id 268 269 @property 270 def is_up(self) -> bool: 271 "True if switch is UP." 272 return self._is_channel_up 273 274 @property 275 def is_primary(self) -> bool: 276 "True if switch is primary." 277 return self._arbitrator.is_primary 278 279 @property 280 def primary_id(self) -> int: 281 "Election ID of switch that is currently primary." 282 return self._arbitrator.primary_id 283 284 @property 285 def election_id(self) -> int: 286 "Switch's current election ID." 287 return self._arbitrator.election_id 288 289 @property 290 def role_name(self) -> str: 291 "Switch's current role name." 292 return self._arbitrator.role_name 293 294 @property 295 def p4info(self) -> P4Schema: 296 "Switch's P4 schema." 297 return self._p4schema 298 299 @property 300 def gnmi_client(self) -> GNMIClient | None: 301 "Switch's gNMI client." 302 return self._gnmi_client 303 304 @property 305 def ports(self) -> SwitchPortList: 306 "Switch's list of interfaces." 307 return self._ports 308 309 @property 310 def api_version(self) -> ApiVersion: 311 "P4Runtime protocol version." 312 return self._api_version 313 314 @overload 315 async def read( 316 self, 317 entities: _ET, 318 ) -> AsyncGenerator[_ET, None]: 319 "Overload for read of a single P4Entity subtype." 320 ... # pragma: no cover 321 322 @overload 323 async def read( 324 self, 325 entities: Iterable[_ET], 326 ) -> AsyncGenerator[_ET, None]: 327 "Overload for read of an iterable of the same P4Entity subtype." 328 ... # pragma: no cover 329 330 @overload 331 async def read( 332 self, 333 entities: Iterable[p4entity.P4EntityList], 334 ) -> AsyncGenerator[p4entity.P4Entity, None]: 335 "Most general overload: we can't determine the return type exactly." 336 ... # pragma: no cover 337 338 async def read( 339 self, 340 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 341 ) -> AsyncGenerator[p4entity.P4Entity, None]: 342 "Async iterator that reads entities from the switch." 343 assert self._p4client is not None 344 345 if not entities: 346 return 347 348 if isinstance(entities, p4entity.P4Entity): 349 entities = [entities] 350 351 request = p4r.ReadRequest( 352 device_id=self.device_id, 353 entities=p4entity.encode_entities(entities, self.p4info), 354 ) 355 356 async for reply in self._p4client.request_iter(request): 357 for ent in reply.entities: 358 yield p4entity.decode_entity(ent, self.p4info) 359 360 async def read_packets( 361 self, 362 *, 363 queue_size: int = _DEFAULT_QUEUE_SIZE, 364 eth_types: Iterable[int] | None = None, 365 ) -> AsyncIterator["p4entity.P4PacketIn"]: 366 "Async iterator for incoming packets (P4PacketIn)." 367 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 368 369 if eth_types is None: 370 371 def _pkt_filter(_payload: bytes) -> bool: 372 return True 373 374 else: 375 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 376 377 def _pkt_filter(_payload: bytes) -> bool: 378 return _payload[12:14] in _filter 379 380 queue = Queue[p4entity.P4PacketIn](queue_size) 381 queue_filter = (_pkt_filter, queue) 382 self._packet_queues.append(queue_filter) 383 384 try: 385 while True: 386 yield await queue.get() 387 finally: 388 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 389 self._packet_queues.remove(queue_filter) 390 391 async def read_digests( 392 self, 393 digest_id: str, 394 *, 395 queue_size: int = _DEFAULT_QUEUE_SIZE, 396 ) -> AsyncIterator["p4entity.P4DigestList"]: 397 "Async iterator for incoming digest lists (P4DigestList)." 398 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 399 400 if digest_id in self._digest_queues: 401 raise ValueError(f"queue for digest_id {digest_id!r} already open") 402 403 queue = Queue[p4entity.P4DigestList](queue_size) 404 self._digest_queues[digest_id] = queue 405 try: 406 while True: 407 yield await queue.get() 408 finally: 409 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 410 del self._digest_queues[digest_id] 411 412 async def read_idle_timeouts( 413 self, 414 *, 415 queue_size: int = _DEFAULT_QUEUE_SIZE, 416 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 417 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 418 LOGGER.debug("read_idle_timeouts: opening queue") 419 420 if self._timeout_queue is not None: 421 raise ValueError("timeout queue already open") 422 423 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 424 self._timeout_queue = queue 425 try: 426 while True: 427 yield await queue.get() 428 finally: 429 LOGGER.debug("read_idle_timeouts: closing queue") 430 self._timeout_queue = None 431 432 async def write( 433 self, 434 entities: Iterable[p4entity.P4UpdateList], 435 *, 436 strict: bool = True, 437 warn_only: bool = False, 438 ) -> None: 439 """Write updates and stream messages to the switch. 440 441 If `strict` is False, MODIFY and DELETE operations will NOT raise an 442 error if the entity does not exist (NOT_FOUND). 443 444 If `warn_only` is True, no operations will raise an error. Instead, 445 the exception will be logged as a WARNING and the method will return 446 normally. 447 """ 448 assert self._p4client is not None 449 450 if not entities: 451 return 452 453 msgs = p4entity.encode_updates(entities, self.p4info) 454 455 updates: list[p4r.Update] = [] 456 for msg in msgs: 457 if isinstance(msg, p4r.StreamMessageRequest): 458 # StreamMessageRequests are transmitted immediately. 459 # TODO: Understand what happens with backpressure? 460 await self._p4client.send(msg) 461 else: 462 updates.append(msg) 463 464 if updates: 465 await self._write_request(updates, strict, warn_only) 466 467 async def insert( 468 self, 469 entities: Iterable[p4entity.P4EntityList], 470 *, 471 warn_only: bool = False, 472 ) -> None: 473 """Insert the specified entities. 474 475 If `warn_only` is True, errors will be logged as warnings instead of 476 raising an exception. 477 """ 478 if entities: 479 await self._write_request( 480 [ 481 p4r.Update(type=p4r.Update.INSERT, entity=ent) 482 for ent in p4entity.encode_entities(entities, self.p4info) 483 ], 484 True, 485 warn_only, 486 ) 487 488 async def modify( 489 self, 490 entities: Iterable[p4entity.P4EntityList], 491 *, 492 strict: bool = True, 493 warn_only: bool = False, 494 ) -> None: 495 """Modify the specified entities. 496 497 If `strict` is False, NOT_FOUND errors will be ignored. 498 499 If `warn_only` is True, errors will be logged as warnings instead of 500 raising an exception. 501 """ 502 if entities: 503 await self._write_request( 504 [ 505 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 506 for ent in p4entity.encode_entities(entities, self.p4info) 507 ], 508 strict, 509 warn_only, 510 ) 511 512 async def delete( 513 self, 514 entities: Iterable[p4entity.P4EntityList], 515 *, 516 strict: bool = True, 517 warn_only: bool = False, 518 ) -> None: 519 """Delete the specified entities. 520 521 If `strict` is False, NOT_FOUND errors will be ignored. 522 523 If `warn_only` is True, errors will be logged as warnings instead of 524 raising an exception. 525 """ 526 if entities: 527 await self._write_request( 528 [ 529 p4r.Update(type=p4r.Update.DELETE, entity=ent) 530 for ent in p4entity.encode_entities(entities, self.p4info) 531 ], 532 strict, 533 warn_only, 534 ) 535 536 async def delete_all(self) -> None: 537 """Delete all entities if no parameter is passed. Otherwise, delete 538 items that match `entities`. 539 540 This method does not attempt to delete entries in const tables. 541 542 TODO: This method does not affect indirect counters, meters or 543 value_sets. 544 """ 545 await self.delete_many( 546 [ 547 p4entity.P4TableEntry(), 548 p4entity.P4MulticastGroupEntry(), 549 p4entity.P4CloneSessionEntry(), 550 ] 551 ) 552 553 # Reset all default table entries. 554 default_entries = [ 555 p4entity.P4TableEntry(table.alias, is_default_action=True) 556 for table in self.p4info.tables 557 if table.const_default_action is None and table.action_profile is None 558 ] 559 if default_entries: 560 await self.modify(default_entries) 561 562 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 563 # We do this after deleting the P4TableEntry's in case a client is using 564 # "one-shot" references; these are incompatible with separate 565 # action profiles. 566 await self.delete_many( 567 [ 568 p4entity.P4ActionProfileGroup(), 569 p4entity.P4ActionProfileMember(), 570 ] 571 ) 572 573 # Delete DigestEntry separately. Wildcard reads are not supported. 574 digest_entries = [ 575 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 576 ] 577 if digest_entries: 578 await self.delete(digest_entries, strict=False) 579 580 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 581 """Delete entities that match a wildcard read. 582 583 This method always skips over entries in const tables. It is an error 584 to attempt to delete those. 585 """ 586 assert self._p4client is not None 587 588 request = p4r.ReadRequest( 589 device_id=self.device_id, 590 entities=p4entity.encode_entities(entities, self.p4info), 591 ) 592 593 # Compute set of all const table ID's (may be empty). 594 to_skip = {table.id for table in self.p4info.tables if table.is_const} 595 596 async for reply in self._p4client.request_iter(request): 597 if reply.entities: 598 if to_skip: 599 await self.delete( 600 reply 601 for reply in reply.entities 602 if reply.HasField("table_entry") 603 and reply.table_entry.table_id not in to_skip 604 ) 605 else: 606 await self.delete(reply.entities) 607 608 async def run(self) -> None: 609 "Run the switch's lifecycle repeatedly." 610 assert self._p4client is None 611 assert self._tasks is None 612 613 self._tasks = SwitchTasks(self._options.fail_fast) 614 self._p4client = P4Client(self._address, self._options.channel_credentials) 615 self._switch_start() 616 617 try: 618 while True: 619 # If the switch fails and restarts too quickly, slow it down. 620 async with _throttle_failure(): 621 self.create_task(self._run(), background=True) 622 await self._tasks.wait() 623 self._arbitrator.reset() 624 625 finally: 626 self._p4client = None 627 self._tasks = None 628 self._switch_stop() 629 630 def create_task( 631 self, 632 coro: Coroutine[Any, Any, _T], 633 *, 634 background: bool = False, 635 name: str | None = None, 636 ) -> asyncio.Task[_T]: 637 "Create an asyncio task tied to the Switch's lifecycle." 638 assert self._tasks is not None 639 640 return self._tasks.create_task( 641 coro, 642 switch=self, 643 background=background, 644 name=name, 645 ) 646 647 async def _run(self): 648 "Main Switch task runs the stream." 649 assert not self._is_channel_up 650 assert self._p4client is not None 651 652 try: 653 await self._p4client.open( 654 schema=self.p4info, 655 complete_request=self._arbitrator.complete_request, 656 ) 657 await self._arbitrator.handshake(self) 658 await self._fetch_capabilities() 659 await self._start_gnmi() 660 self._channel_up() 661 662 # Receive messages from the stream until it closes. 663 await self._receive_until_closed() 664 665 finally: 666 await self._stop_gnmi() 667 await self._p4client.close() 668 self._channel_down() 669 670 async def _receive_until_closed(self): 671 "Receive messages from stream until EOF." 672 assert self._p4client is not None 673 674 client = self._p4client 675 676 while True: 677 try: 678 msg = await client.receive() 679 except P4ClientError as ex: 680 if not ex.is_election_id_used: 681 raise 682 # Handle "election ID in use" error. 683 await self._arbitrator.handshake(self, conflict=True) 684 else: 685 await self._handle_stream_message(msg) 686 687 async def _handle_stream_message(self, msg: p4r.StreamMessageResponse): 688 "Handle a P4Runtime StreamMessageResponse." 689 match msg.WhichOneof("update"): 690 case "packet": 691 self._stream_packet_message(msg) 692 case "digest": 693 self._stream_digest_message(msg) 694 case "idle_timeout_notification": 695 self._stream_timeout_message(msg) 696 case "arbitration": 697 await self._arbitrator.update(self, msg.arbitration) 698 case "error": 699 self._stream_error_message(msg) 700 case other: 701 LOGGER.error("_handle_stream_message: unknown update %r", other) 702 703 async def __aenter__(self) -> Self: 704 "Similar to run() but provides a one-time context manager interface." 705 assert self._p4client is None 706 assert self._tasks is None 707 708 self._tasks = SwitchTasks(self._options.fail_fast) 709 self._p4client = P4Client( 710 self._address, 711 self._options.channel_credentials, 712 wait_for_ready=False, 713 ) 714 self._switch_start() 715 716 try: 717 # Start the switch's `_run` task in the background. Then, wait for 718 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 719 # cannot connect or fails in some other way, it will finish before 720 # the `ready` future. We need to handle the error in this case. 721 722 run = self.create_task(self._run(), background=True) 723 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 724 done, _ = await asyncio.wait( 725 [run, ready], return_when=asyncio.FIRST_COMPLETED 726 ) 727 if run in done: 728 await run 729 730 except BaseException: 731 await self.__aexit__(None, None, None) 732 raise 733 734 return self 735 736 async def __aexit__( 737 self, 738 _exc_type: type[BaseException] | None, 739 _exc_val: BaseException | None, 740 _exc_tb: TracebackType | None, 741 ) -> bool | None: 742 "Similar to run() but provides a one-time context manager interface." 743 assert self._p4client is not None 744 assert self._tasks is not None 745 746 self._tasks.cancel_all() 747 await self._tasks.wait() 748 self._arbitrator.reset() 749 self._p4client = None 750 self._tasks = None 751 self._switch_stop() 752 753 def _switch_start(self): 754 "Called when switch starts its run() cycle." 755 assert not self._is_channel_up 756 757 LOGGER.info( 758 "Switch start (name=%r, address=%r, device_id=%r, role_name=%r, initial_election_id=%r)", 759 self.name, 760 self.address, 761 self.device_id, 762 self.role_name, 763 self.options.initial_election_id, 764 ) 765 self.ee.emit(SwitchEvent.SWITCH_START) 766 767 def _switch_stop(self): 768 "Called when switch stops its run() cycle." 769 assert not self._is_channel_up 770 771 LOGGER.info( 772 "Switch stop (name=%r, address=%r, device_id=%r, role_name=%r)", 773 self.name, 774 self.address, 775 self.device_id, 776 self.role_name, 777 ) 778 self.ee.emit(SwitchEvent.SWITCH_STOP) 779 780 def _channel_up(self): 781 "Called when P4Runtime channel is UP." 782 assert not self._is_channel_up 783 784 ports = " ".join(f"({port.id}){port.name}" for port in self.ports) 785 LOGGER.info( 786 "Channel up (is_primary=%r, role_name=%r, p4r=%s): %s", 787 self.is_primary, 788 self.role_name, 789 self.api_version, 790 ports, 791 ) 792 self._is_channel_up = True 793 self.create_task(self._ready()) 794 795 self.ee.emit(SwitchEvent.CHANNEL_UP, self) 796 797 def _channel_down(self): 798 "Called when P4Runtime channel is DOWN." 799 if not self._is_channel_up: 800 return # do nothing! 801 802 LOGGER.info( 803 "Channel down (is_primary=%r, role_name=%r)", 804 self.is_primary, 805 self.role_name, 806 ) 807 self._is_channel_up = False 808 809 self.ee.emit(SwitchEvent.CHANNEL_DOWN, self) 810 811 def _become_primary(self): 812 "Called when a P4Runtime backup channel becomes the primary." 813 assert self._tasks is not None 814 815 LOGGER.info( 816 "Become primary (is_primary=%r, role_name=%r)", 817 self.is_primary, 818 self.role_name, 819 ) 820 821 self._tasks.cancel_primary() 822 self.create_task(self._ready()) 823 824 self.ee.emit(SwitchEvent.BECOME_PRIMARY, self) 825 826 def _become_backup(self): 827 "Called when a P4Runtime primary channel becomes a backup." 828 assert self._tasks is not None 829 830 LOGGER.info( 831 "Become backup (is_primary=%r, role_name=%r)", 832 self.is_primary, 833 self.role_name, 834 ) 835 836 self._tasks.cancel_primary() 837 self.create_task(self._ready()) 838 839 self.ee.emit(SwitchEvent.BECOME_BACKUP, self) 840 841 def _channel_ready(self): 842 "Called when a P4Runtime channel is READY." 843 LOGGER.info( 844 "Channel ready (is_primary=%r, role_name=%r): %s", 845 self.is_primary, 846 self.role_name, 847 self.p4info.get_pipeline_info(), 848 ) 849 850 if self._options.ready_handler: 851 self.create_task(self._options.ready_handler(self)) 852 853 self.ee.emit(SwitchEvent.CHANNEL_READY, self) 854 855 def _stream_packet_message(self, msg: p4r.StreamMessageResponse): 856 "Called when a P4Runtime packet-in response is received." 857 packet = p4entity.decode_stream(msg, self.p4info) 858 859 was_queued = False 860 for pkt_filter, queue in self._packet_queues: 861 if not queue.full() and pkt_filter(packet.payload): 862 queue.put_nowait(packet) 863 was_queued = True 864 865 if not was_queued: 866 LOGGER.warning("packet ignored: %r", packet) 867 868 def _stream_digest_message(self, msg: p4r.StreamMessageResponse): 869 "Called when a P4Runtime digest response is received." 870 try: 871 # Decode the digest list message. 872 digest: p4entity.P4DigestList = p4entity.decode_stream(msg, self.p4info) 873 except ValueError as ex: 874 # It's possible to receive a digest for a different P4Info file, or 875 # even before a P4Info is fetched from the switch. 876 LOGGER.warning("digest decode failed: %s", ex) 877 else: 878 # Place the decoded digest list in a queue, if one is waiting. 879 queue = self._digest_queues.get(digest.digest_id) 880 if queue is not None and not queue.full(): 881 queue.put_nowait(digest) 882 else: 883 LOGGER.warning("digest ignored: %r", digest) 884 885 def _stream_timeout_message(self, msg: p4r.StreamMessageResponse): 886 "Called when a P4Runtime timeout notification is received." 887 timeout: p4entity.P4IdleTimeoutNotification = p4entity.decode_stream( 888 msg, self.p4info 889 ) 890 queue = self._timeout_queue 891 892 if queue is not None and not queue.full(): 893 queue.put_nowait(timeout) 894 else: 895 LOGGER.warning("timeout ignored: %r", timeout) 896 897 def _stream_error_message(self, msg: p4r.StreamMessageResponse): 898 "Called when a P4Runtime stream error response is received." 899 assert self._p4client is not None 900 901 # Log the message at the ERROR level. 902 pbutil.log_msg(self._p4client.channel, msg, self.p4info, level=logging.ERROR) 903 904 self.ee.emit(SwitchEvent.STREAM_ERROR, self, msg) 905 906 async def _ready(self): 907 "Prepare the pipeline." 908 if self.p4info.is_authoritative and self.is_primary: 909 await self._set_pipeline() 910 else: 911 await self._get_pipeline() 912 913 self._channel_ready() 914 915 async def _get_pipeline(self): 916 "Get the switch's P4Info." 917 has_pipeline = False 918 919 try: 920 reply = await self._get_pipeline_config_request( 921 response_type=P4ConfigResponseType.P4INFO_AND_COOKIE 922 ) 923 924 if reply.config.HasField("p4info"): 925 has_pipeline = True 926 p4info = reply.config.p4info 927 if not self.p4info.exists: 928 # If we don't have P4Info yet, set it. 929 self.p4info.set_p4info(p4info) 930 elif not self.p4info.has_p4info(p4info): 931 # If P4Info is not identical, log a warning message. 932 LOGGER.warning("Retrieved P4Info is different than expected!") 933 934 except P4ClientError as ex: 935 if not ex.is_pipeline_missing: 936 raise 937 938 if not has_pipeline and self.p4info.exists: 939 LOGGER.warning("Forwarding pipeline is not configured") 940 941 async def _set_pipeline(self): 942 """Set up the pipeline. 943 944 If `p4force` is false (the default), we first retrieve the cookie for 945 the current pipeline and see if it matches the new pipeline's cookie. 946 If the cookies match, we are done; there is no need to set the pipeline. 947 948 If `p4force` is true, we always load the new pipeline. 949 """ 950 cookie = -1 951 try: 952 if not self.options.p4force: 953 reply = await self._get_pipeline_config_request() 954 if reply.config.HasField("cookie"): 955 cookie = reply.config.cookie.cookie 956 957 except P4ClientError as ex: 958 if not ex.is_pipeline_missing: 959 raise 960 961 if cookie != self.p4info.p4cookie: 962 LOGGER.debug( 963 "cookie %#x does not match expected %#x", cookie, self.p4info.p4cookie 964 ) 965 await self._set_pipeline_config_request( 966 config=self.p4info.get_pipeline_config() 967 ) 968 LOGGER.info("Pipeline installed: %s", self.p4info.get_pipeline_info()) 969 970 async def _get_pipeline_config_request( 971 self, 972 *, 973 response_type: P4ConfigResponseType = P4ConfigResponseType.COOKIE_ONLY, 974 ) -> p4r.GetForwardingPipelineConfigResponse: 975 "Send a GetForwardingPipelineConfigRequest and await the response." 976 assert self._p4client is not None 977 978 return await self._p4client.request( 979 p4r.GetForwardingPipelineConfigRequest( 980 device_id=self.device_id, 981 response_type=response_type.vt(), 982 ) 983 ) 984 985 async def _set_pipeline_config_request( 986 self, 987 *, 988 action: P4ConfigAction = P4ConfigAction.VERIFY_AND_COMMIT, 989 config: p4r.ForwardingPipelineConfig, 990 ) -> p4r.SetForwardingPipelineConfigResponse: 991 "Send a SetForwardingPipelineConfigRequest and await the response." 992 assert self._p4client is not None 993 994 return await self._p4client.request( 995 p4r.SetForwardingPipelineConfigRequest( 996 device_id=self.device_id, 997 action=action.vt(), 998 config=config, 999 ) 1000 ) 1001 1002 async def _write_request( 1003 self, 1004 updates: list[p4r.Update], 1005 strict: bool, 1006 warn_only: bool, 1007 ): 1008 "Send a P4Runtime WriteRequest." 1009 assert self._p4client is not None 1010 1011 try: 1012 await self._p4client.request( 1013 p4r.WriteRequest( 1014 device_id=self.device_id, 1015 updates=updates, 1016 ) 1017 ) 1018 except P4ClientError as ex: 1019 if strict or not ex.is_not_found_only: 1020 if warn_only: 1021 LOGGER.warning( 1022 "WriteRequest with `warn_only=True` failed", 1023 exc_info=True, 1024 ) 1025 else: 1026 raise 1027 1028 assert (not strict and ex.is_not_found_only) or warn_only 1029 1030 async def _fetch_capabilities(self): 1031 "Check the P4Runtime protocol version supported by the other end." 1032 assert self._p4client is not None 1033 1034 try: 1035 reply = await self._p4client.request(p4r.CapabilitiesRequest()) 1036 self._api_version = ApiVersion.parse(reply.p4runtime_api_version) 1037 1038 except P4ClientError as ex: 1039 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1040 raise 1041 LOGGER.warning("CapabilitiesRequest is not implemented") 1042 1043 async def _start_gnmi(self): 1044 "Start the associated gNMI client." 1045 assert self._gnmi_client is None 1046 assert self._p4client is not None 1047 1048 self._gnmi_client = GNMIClient(self._address, self._options.channel_credentials) 1049 await self._gnmi_client.open(channel=self._p4client.channel) 1050 1051 try: 1052 await self._ports.subscribe(self._gnmi_client) 1053 if self._ports: 1054 self.create_task(self._ports.listen(), background=True, name="_ports") 1055 1056 except GNMIClientError as ex: 1057 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1058 raise 1059 LOGGER.warning("gNMI is not implemented") 1060 await self._gnmi_client.close() 1061 self._gnmi_client = None 1062 1063 async def _stop_gnmi(self): 1064 "Stop the associated gNMI client." 1065 if self._gnmi_client is not None: 1066 self._ports.close() 1067 await self._gnmi_client.close() 1068 self._gnmi_client = None 1069 1070 def __repr__(self) -> str: 1071 "Return string representation of switch." 1072 return f"Switch(name={self._name!r}, address={self._address!r})"
Represents a P4Runtime Switch.
A Switch
is constructed with a name
, address
and an optional
SwitchOptions
configuration.
The name
is up to the user but should uniquely identify the switch.
The address
identifies the target endpoint of the GRPC channel. It should
have the format "
The options
is a SwitchOptions
object that specifies how the Switch
will behave.
opts = SwitchOptions(p4info=..., p4blob=...)
sw1 = Switch('sw1', '10.0.0.1:50000', opts)
Each switch object has an event emitter ee
. Use the EventEmitter to listen
for port change events like PORT_UP and PORT_DOWN. See the SwitchEvent
class for a list of support switch events.
201 def __init__( 202 self, 203 name: str, 204 address: str, 205 options: SwitchOptions | None = None, 206 ) -> None: 207 if options is None: 208 options = SwitchOptions() 209 210 self._name = name 211 self._address = address 212 self._options = options 213 self._stash = {} 214 self._ee = SwitchEmitter(self) 215 self._p4client = None 216 self._p4schema = P4Schema(options.p4info, options.p4blob) 217 self._tasks = None 218 self._packet_queues = [] 219 self._digest_queues = {} 220 self._timeout_queue = None 221 self._arbitrator = Arbitrator( 222 options.initial_election_id, options.role_name, options.role_config 223 ) 224 self._gnmi_client = None 225 self._ports = SwitchPortList()
237 @property 238 def options(self) -> SwitchOptions: 239 "Switch options." 240 return self._options
Switch options.
254 @property 255 def stash(self) -> dict[str, Any]: 256 "Switch stash, may be used to store per-switch data for any purpose." 257 return self._stash
Switch stash, may be used to store per-switch data for any purpose.
259 @property 260 def ee(self) -> "SwitchEmitter": 261 "Switch event emitter. See `SwitchEvent` for more details on events." 262 return self._ee
Switch event emitter. See SwitchEvent
for more details on events.
264 @property 265 def device_id(self) -> int: 266 "Switch's device ID." 267 return self._options.device_id
Switch's device ID.
269 @property 270 def is_up(self) -> bool: 271 "True if switch is UP." 272 return self._is_channel_up
True if switch is UP.
274 @property 275 def is_primary(self) -> bool: 276 "True if switch is primary." 277 return self._arbitrator.is_primary
True if switch is primary.
279 @property 280 def primary_id(self) -> int: 281 "Election ID of switch that is currently primary." 282 return self._arbitrator.primary_id
Election ID of switch that is currently primary.
284 @property 285 def election_id(self) -> int: 286 "Switch's current election ID." 287 return self._arbitrator.election_id
Switch's current election ID.
289 @property 290 def role_name(self) -> str: 291 "Switch's current role name." 292 return self._arbitrator.role_name
Switch's current role name.
299 @property 300 def gnmi_client(self) -> GNMIClient | None: 301 "Switch's gNMI client." 302 return self._gnmi_client
Switch's gNMI client.
304 @property 305 def ports(self) -> SwitchPortList: 306 "Switch's list of interfaces." 307 return self._ports
Switch's list of interfaces.
309 @property 310 def api_version(self) -> ApiVersion: 311 "P4Runtime protocol version." 312 return self._api_version
P4Runtime protocol version.
338 async def read( 339 self, 340 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 341 ) -> AsyncGenerator[p4entity.P4Entity, None]: 342 "Async iterator that reads entities from the switch." 343 assert self._p4client is not None 344 345 if not entities: 346 return 347 348 if isinstance(entities, p4entity.P4Entity): 349 entities = [entities] 350 351 request = p4r.ReadRequest( 352 device_id=self.device_id, 353 entities=p4entity.encode_entities(entities, self.p4info), 354 ) 355 356 async for reply in self._p4client.request_iter(request): 357 for ent in reply.entities: 358 yield p4entity.decode_entity(ent, self.p4info)
Async iterator that reads entities from the switch.
360 async def read_packets( 361 self, 362 *, 363 queue_size: int = _DEFAULT_QUEUE_SIZE, 364 eth_types: Iterable[int] | None = None, 365 ) -> AsyncIterator["p4entity.P4PacketIn"]: 366 "Async iterator for incoming packets (P4PacketIn)." 367 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 368 369 if eth_types is None: 370 371 def _pkt_filter(_payload: bytes) -> bool: 372 return True 373 374 else: 375 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 376 377 def _pkt_filter(_payload: bytes) -> bool: 378 return _payload[12:14] in _filter 379 380 queue = Queue[p4entity.P4PacketIn](queue_size) 381 queue_filter = (_pkt_filter, queue) 382 self._packet_queues.append(queue_filter) 383 384 try: 385 while True: 386 yield await queue.get() 387 finally: 388 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 389 self._packet_queues.remove(queue_filter)
Async iterator for incoming packets (P4PacketIn).
391 async def read_digests( 392 self, 393 digest_id: str, 394 *, 395 queue_size: int = _DEFAULT_QUEUE_SIZE, 396 ) -> AsyncIterator["p4entity.P4DigestList"]: 397 "Async iterator for incoming digest lists (P4DigestList)." 398 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 399 400 if digest_id in self._digest_queues: 401 raise ValueError(f"queue for digest_id {digest_id!r} already open") 402 403 queue = Queue[p4entity.P4DigestList](queue_size) 404 self._digest_queues[digest_id] = queue 405 try: 406 while True: 407 yield await queue.get() 408 finally: 409 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 410 del self._digest_queues[digest_id]
Async iterator for incoming digest lists (P4DigestList).
412 async def read_idle_timeouts( 413 self, 414 *, 415 queue_size: int = _DEFAULT_QUEUE_SIZE, 416 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 417 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 418 LOGGER.debug("read_idle_timeouts: opening queue") 419 420 if self._timeout_queue is not None: 421 raise ValueError("timeout queue already open") 422 423 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 424 self._timeout_queue = queue 425 try: 426 while True: 427 yield await queue.get() 428 finally: 429 LOGGER.debug("read_idle_timeouts: closing queue") 430 self._timeout_queue = None
Async iterator for incoming idle timeouts (P4IdleTimeoutNotification).
432 async def write( 433 self, 434 entities: Iterable[p4entity.P4UpdateList], 435 *, 436 strict: bool = True, 437 warn_only: bool = False, 438 ) -> None: 439 """Write updates and stream messages to the switch. 440 441 If `strict` is False, MODIFY and DELETE operations will NOT raise an 442 error if the entity does not exist (NOT_FOUND). 443 444 If `warn_only` is True, no operations will raise an error. Instead, 445 the exception will be logged as a WARNING and the method will return 446 normally. 447 """ 448 assert self._p4client is not None 449 450 if not entities: 451 return 452 453 msgs = p4entity.encode_updates(entities, self.p4info) 454 455 updates: list[p4r.Update] = [] 456 for msg in msgs: 457 if isinstance(msg, p4r.StreamMessageRequest): 458 # StreamMessageRequests are transmitted immediately. 459 # TODO: Understand what happens with backpressure? 460 await self._p4client.send(msg) 461 else: 462 updates.append(msg) 463 464 if updates: 465 await self._write_request(updates, strict, warn_only)
Write updates and stream messages to the switch.
If strict
is False, MODIFY and DELETE operations will NOT raise an
error if the entity does not exist (NOT_FOUND).
If warn_only
is True, no operations will raise an error. Instead,
the exception will be logged as a WARNING and the method will return
normally.
467 async def insert( 468 self, 469 entities: Iterable[p4entity.P4EntityList], 470 *, 471 warn_only: bool = False, 472 ) -> None: 473 """Insert the specified entities. 474 475 If `warn_only` is True, errors will be logged as warnings instead of 476 raising an exception. 477 """ 478 if entities: 479 await self._write_request( 480 [ 481 p4r.Update(type=p4r.Update.INSERT, entity=ent) 482 for ent in p4entity.encode_entities(entities, self.p4info) 483 ], 484 True, 485 warn_only, 486 )
Insert the specified entities.
If warn_only
is True, errors will be logged as warnings instead of
raising an exception.
488 async def modify( 489 self, 490 entities: Iterable[p4entity.P4EntityList], 491 *, 492 strict: bool = True, 493 warn_only: bool = False, 494 ) -> None: 495 """Modify the specified entities. 496 497 If `strict` is False, NOT_FOUND errors will be ignored. 498 499 If `warn_only` is True, errors will be logged as warnings instead of 500 raising an exception. 501 """ 502 if entities: 503 await self._write_request( 504 [ 505 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 506 for ent in p4entity.encode_entities(entities, self.p4info) 507 ], 508 strict, 509 warn_only, 510 )
Modify the specified entities.
If strict
is False, NOT_FOUND errors will be ignored.
If warn_only
is True, errors will be logged as warnings instead of
raising an exception.
512 async def delete( 513 self, 514 entities: Iterable[p4entity.P4EntityList], 515 *, 516 strict: bool = True, 517 warn_only: bool = False, 518 ) -> None: 519 """Delete the specified entities. 520 521 If `strict` is False, NOT_FOUND errors will be ignored. 522 523 If `warn_only` is True, errors will be logged as warnings instead of 524 raising an exception. 525 """ 526 if entities: 527 await self._write_request( 528 [ 529 p4r.Update(type=p4r.Update.DELETE, entity=ent) 530 for ent in p4entity.encode_entities(entities, self.p4info) 531 ], 532 strict, 533 warn_only, 534 )
Delete the specified entities.
If strict
is False, NOT_FOUND errors will be ignored.
If warn_only
is True, errors will be logged as warnings instead of
raising an exception.
536 async def delete_all(self) -> None: 537 """Delete all entities if no parameter is passed. Otherwise, delete 538 items that match `entities`. 539 540 This method does not attempt to delete entries in const tables. 541 542 TODO: This method does not affect indirect counters, meters or 543 value_sets. 544 """ 545 await self.delete_many( 546 [ 547 p4entity.P4TableEntry(), 548 p4entity.P4MulticastGroupEntry(), 549 p4entity.P4CloneSessionEntry(), 550 ] 551 ) 552 553 # Reset all default table entries. 554 default_entries = [ 555 p4entity.P4TableEntry(table.alias, is_default_action=True) 556 for table in self.p4info.tables 557 if table.const_default_action is None and table.action_profile is None 558 ] 559 if default_entries: 560 await self.modify(default_entries) 561 562 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 563 # We do this after deleting the P4TableEntry's in case a client is using 564 # "one-shot" references; these are incompatible with separate 565 # action profiles. 566 await self.delete_many( 567 [ 568 p4entity.P4ActionProfileGroup(), 569 p4entity.P4ActionProfileMember(), 570 ] 571 ) 572 573 # Delete DigestEntry separately. Wildcard reads are not supported. 574 digest_entries = [ 575 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 576 ] 577 if digest_entries: 578 await self.delete(digest_entries, strict=False)
Delete all entities if no parameter is passed. Otherwise, delete
items that match entities
.
This method does not attempt to delete entries in const tables.
TODO: This method does not affect indirect counters, meters or value_sets.
580 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 581 """Delete entities that match a wildcard read. 582 583 This method always skips over entries in const tables. It is an error 584 to attempt to delete those. 585 """ 586 assert self._p4client is not None 587 588 request = p4r.ReadRequest( 589 device_id=self.device_id, 590 entities=p4entity.encode_entities(entities, self.p4info), 591 ) 592 593 # Compute set of all const table ID's (may be empty). 594 to_skip = {table.id for table in self.p4info.tables if table.is_const} 595 596 async for reply in self._p4client.request_iter(request): 597 if reply.entities: 598 if to_skip: 599 await self.delete( 600 reply 601 for reply in reply.entities 602 if reply.HasField("table_entry") 603 and reply.table_entry.table_id not in to_skip 604 ) 605 else: 606 await self.delete(reply.entities)
Delete entities that match a wildcard read.
This method always skips over entries in const tables. It is an error to attempt to delete those.
608 async def run(self) -> None: 609 "Run the switch's lifecycle repeatedly." 610 assert self._p4client is None 611 assert self._tasks is None 612 613 self._tasks = SwitchTasks(self._options.fail_fast) 614 self._p4client = P4Client(self._address, self._options.channel_credentials) 615 self._switch_start() 616 617 try: 618 while True: 619 # If the switch fails and restarts too quickly, slow it down. 620 async with _throttle_failure(): 621 self.create_task(self._run(), background=True) 622 await self._tasks.wait() 623 self._arbitrator.reset() 624 625 finally: 626 self._p4client = None 627 self._tasks = None 628 self._switch_stop()
Run the switch's lifecycle repeatedly.
630 def create_task( 631 self, 632 coro: Coroutine[Any, Any, _T], 633 *, 634 background: bool = False, 635 name: str | None = None, 636 ) -> asyncio.Task[_T]: 637 "Create an asyncio task tied to the Switch's lifecycle." 638 assert self._tasks is not None 639 640 return self._tasks.create_task( 641 coro, 642 switch=self, 643 background=background, 644 name=name, 645 )
Create an asyncio task tied to the Switch's lifecycle.
703 async def __aenter__(self) -> Self: 704 "Similar to run() but provides a one-time context manager interface." 705 assert self._p4client is None 706 assert self._tasks is None 707 708 self._tasks = SwitchTasks(self._options.fail_fast) 709 self._p4client = P4Client( 710 self._address, 711 self._options.channel_credentials, 712 wait_for_ready=False, 713 ) 714 self._switch_start() 715 716 try: 717 # Start the switch's `_run` task in the background. Then, wait for 718 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 719 # cannot connect or fails in some other way, it will finish before 720 # the `ready` future. We need to handle the error in this case. 721 722 run = self.create_task(self._run(), background=True) 723 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 724 done, _ = await asyncio.wait( 725 [run, ready], return_when=asyncio.FIRST_COMPLETED 726 ) 727 if run in done: 728 await run 729 730 except BaseException: 731 await self.__aexit__(None, None, None) 732 raise 733 734 return self
Similar to run() but provides a one-time context manager interface.
1075class SwitchEvent(str, enum.Enum): 1076 "Events for Switch class." 1077 1078 CONTROLLER_ENTER = "controller_enter" # (switch) 1079 CONTROLLER_LEAVE = "controller_leave" # (switch) 1080 SWITCH_START = "switch_start" # (switch) 1081 SWITCH_STOP = "switch_stop" # (switch) 1082 CHANNEL_UP = "channel_up" # (switch) 1083 CHANNEL_DOWN = "channel_down" # (switch) 1084 CHANNEL_READY = "channel_ready" # (switch) 1085 BECOME_PRIMARY = "become_primary" # (switch) 1086 BECOME_BACKUP = "become_backup" # (switch) 1087 PORT_UP = "port_up" # (switch, port) 1088 PORT_DOWN = "port_down" # (switch, port) 1089 STREAM_ERROR = "stream_error" # (switch, p4r.StreamMessageResponse)
Events for Switch class.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- __iter__
- __len__
- __getitem__
- __contains__
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
70@final 71@dataclasses.dataclass(frozen=True) 72class SwitchOptions: 73 """Represents the configuration options for a `Switch`. 74 75 ``` 76 opts = SwitchOptions( 77 p4info=Path("basic.p4info.txtpb"), 78 p4blob=Path("basic.json"), 79 ready_handler=on_ready, 80 ) 81 ``` 82 83 Each `SwitchOptions` object is immutable and may be shared by multiple 84 switches. You should treat all values as read-only. 85 86 You can use function call syntax to return a copy of a `SwitchOptions` with 87 one or more propertise altered. 88 89 ``` 90 new_opts = opts(device_id=6) 91 ``` 92 """ 93 94 p4info: Path | None = None 95 "Path to P4Info protobuf text file." 96 97 p4blob: Path | SupportsBytes | None = None 98 "Path to P4Blob file, or an object that can provide the bytes value." 99 100 p4force: bool = False 101 "If true, always load the P4 program after initial handshake." 102 103 device_id: int = 1 104 "Default P4Runtime device ID." 105 106 initial_election_id: int = 10 107 "Initial P4Runtime election ID." 108 109 channel_credentials: GRPCCredentialsTLS | None = None 110 "P4Runtime channel credentials. Used for TLS support." 111 112 role_name: str = "" 113 "P4Runtime role configuration." 114 115 role_config: pbutil.PBMessage | None = None 116 "P4Runtime role configuration." 117 118 ready_handler: Callable[["Switch"], Coroutine[Any, Any, None]] | None = None 119 "Ready handler async function callback." 120 121 fail_fast: bool = False 122 """If true, log switch errors as CRITICAL and immediately abort when the 123 switch is running in a Controller.""" 124 125 configuration: Any = None 126 "Store your app's configuration information here." 127 128 def __call__(self, **kwds: Any) -> Self: 129 return dataclasses.replace(self, **kwds)
Represents the configuration options for a Switch
.
opts = SwitchOptions(
p4info=Path("basic.p4info.txtpb"),
p4blob=Path("basic.json"),
ready_handler=on_ready,
)
Each SwitchOptions
object is immutable and may be shared by multiple
switches. You should treat all values as read-only.
You can use function call syntax to return a copy of a SwitchOptions
with
one or more propertise altered.
new_opts = opts(device_id=6)
Path to P4Blob file, or an object that can provide the bytes value.
43@dataclass 44class SwitchPort: 45 "Represents a switch port." 46 47 id: int 48 name: str 49 oper_status: OperStatus = OperStatus.UNKNOWN 50 51 @property 52 def up(self) -> bool: 53 "Return true if port is basically up." 54 return self.oper_status == OperStatus.UP
Represents a switch port.
57class SwitchPortList: 58 "Represents a list of switch ports." 59 60 _ports: dict[str, SwitchPort] 61 _subscription: GNMISubscription | None = None 62 63 def __init__(self): 64 self._ports = {} 65 66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key] 69 70 def __len__(self) -> int: 71 "Return number of switch ports." 72 return len(self._ports) 73 74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values()) 77 78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist") 88 89 async def listen(self, switch: "_sw.Switch | None" = None) -> None: 90 "Listen for switch port updates." 91 assert self._subscription is not None 92 93 async for update in self._subscription.updates(): 94 self._update(update, switch) 95 96 def close(self) -> None: 97 "Close the switch port subscription." 98 if self._subscription is not None: 99 self._subscription.cancel() 100 self._subscription = None 101 self._ports = {} 102 103 async def _get_ports(self, client: GNMIClient) -> dict[str, SwitchPort]: 104 "Retrieve ID and name of each port." 105 ports: dict[str, SwitchPort] = {} 106 107 result = await client.get(_ifIndex) 108 for update in result: 109 path = update.path 110 assert path.last == _ifIndex.last 111 112 port = SwitchPort(update.value, path["name"]) 113 ports[port.name] = port 114 115 return ports 116 117 async def _get_subscription(self, client: GNMIClient) -> GNMISubscription: 118 sub = client.subscribe() 119 120 # Subscribe to change notifications. 121 for port in self._ports.values(): 122 sub.on_change(_ifOperStatus.set(name=port.name)) 123 124 # Synchronize initial settings for ports. 125 async for update in sub.synchronize(): 126 self._update(update, None) 127 128 return sub 129 130 def _update(self, update: GNMIUpdate, switch: "_sw.Switch | None"): 131 path = update.path 132 if path.last == _ifOperStatus.last: 133 status = OperStatus(update.value) 134 self._update_port(path["name"], status, switch) 135 else: 136 LOGGER.warning(f"PortList: unknown gNMI path: {path}") 137 138 def _update_port(self, name: str, status: OperStatus, switch: "_sw.Switch | None"): 139 port = self._ports[name] 140 141 prev_up = port.up 142 port.oper_status = status 143 curr_up = port.up 144 145 if switch is not None and curr_up != prev_up: 146 if curr_up: 147 switch.ee.emit(_sw.SwitchEvent.PORT_UP, switch, port) 148 else: 149 switch.ee.emit(_sw.SwitchEvent.PORT_DOWN, switch, port)
Represents a list of switch ports.
66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key]
Retrieve interface by ID.
74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values())
Iterate over switch ports.
78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist")
Obtain the initial list of ports and subscribe to switch port status updates using GNMI.
119class GNMIClient: 120 """Async GNMI client. 121 122 This client implements `get`, `set`, `subscribe` and `capabilities`. 123 124 The API depends on the protobuf definition of `gnmi.TypedValue`. 125 126 Get usage: 127 ``` 128 client = GNMIClient('127.0.0.1:9339') 129 await client.open() 130 131 path = GNMIPath("interfaces/interface") 132 async for update in client.get(path): 133 print(update) 134 ``` 135 136 Subscribe usage: 137 ``` 138 path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status") 139 sub = client.subscribe() 140 sub.on_change(path) 141 142 async for initial_state in sub.synchronize(): 143 print(initial_state) 144 145 async for update in sub.updates(): 146 print(update) 147 ``` 148 149 Set usage: 150 ``` 151 enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled") 152 153 await client.set(update={ 154 enabled: gnmi.TypedValue(boolValue=True), 155 }) 156 ``` 157 """ 158 159 _address: str 160 _credentials: GRPCCredentialsTLS | None 161 _channel: grpc.aio.Channel | None = None 162 _stub: gnmi_grpc.gNMIStub | None = None 163 _channel_reused: bool = False 164 165 def __init__( 166 self, 167 address: str, 168 credentials: GRPCCredentialsTLS | None = None, 169 ): 170 self._address = address 171 self._credentials = credentials 172 173 async def __aenter__(self) -> Self: 174 await self.open() 175 return self 176 177 async def __aexit__(self, *_args: Any) -> bool | None: 178 await self.close() 179 180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel) 205 206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False 216 217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result 255 256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix) 292 293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply 311 312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp 374 375 def _log_msg(self, msg: "pbutil.PBMessage"): 376 "Log a gNMI message." 377 pbutil.log_msg(self._channel, msg, None)
Async GNMI client.
This client implements get
, set
, subscribe
and capabilities
.
The API depends on the protobuf definition of gnmi.TypedValue
.
Get usage:
client = GNMIClient('127.0.0.1:9339')
await client.open()
path = GNMIPath("interfaces/interface")
async for update in client.get(path):
print(update)
Subscribe usage:
path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
sub = client.subscribe()
sub.on_change(path)
async for initial_state in sub.synchronize():
print(initial_state)
async for update in sub.updates():
print(update)
Set usage:
enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")
await client.set(update={
enabled: gnmi.TypedValue(boolValue=True),
})
180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel)
Open the client channel.
Note: This method is async
for forward-compatible reasons.
206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False
Close the client channel.
217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result
Retrieve value(s) using a GetRequest.
256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix)
Subscribe to gNMI change notifications.
Usage:
sub = client.subscribe()
sub.on_change(path1, ...)
sub.sample(path3, path4, sample_interval=1000000000)
async for update in sub.synchronize():
# do something with initial state
async for update in sub.updates():
# do something with updates
You can also subscribe in "ONCE" mode:
sub = client.subscribe()
sub.once(path1, path2, ...)
async for info in sub.synchronize():
# do something with info
The subscription object is not re-entrant, but a fully consumed subscription may be reused.
293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply
Issue a CapabilitiesRequest.
312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp
Set value(s) using SetRequest.
Returns the timestamp from the successful SetResponse
.
25class GNMIPath: 26 """Concrete class for working with `gnmi.Path` objects. 27 28 A `GNMIPath` should be treated as an immutable object. You can access 29 the wrapped `gnmi.Path` protobuf class using the `.path` property. 30 `GNMIPath` objects are hashable, but you must be careful that you do not 31 mutate them via the underlying `gnmi.Path`. 32 33 You can construct a GNMIPath from a string: 34 ``` 35 path = GNMIPath("interfaces/interface[name=eth1]/state") 36 ``` 37 38 You can construct a GNMIPath object from a `gnmi.Path` directly. 39 ``` 40 path = GNMIPath(update.path) 41 ``` 42 43 You can create paths by using an existing path as a template, without 44 modifying the original path. Use the `set` method: 45 ``` 46 operStatus = GNMIPath("interfaces/interface/state/oper-status") 47 path = operStatus.set("interface", name="eth1") 48 ``` 49 50 Use [] to access the name/key of path elements: 51 ``` 52 path[1] == "interface" 53 path["interface", "name"] == "eth1" 54 path["name"] == "eth1" 55 ``` 56 """ 57 58 path: gnmi.Path 59 "The underlying `gnmi.Path` protobuf representation." 60 61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path 83 84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name 88 89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name 93 94 @property 95 def origin(self) -> str: 96 "Return the path's origin." 97 return self.path.origin 98 99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target 103 104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result 119 120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path)) 123 124 @overload 125 def __getitem__( 126 self, key: int | str | tuple[int | str, str] 127 ) -> str: ... # pragma: no cover 128 129 @overload 130 def __getitem__(self, key: slice) -> "GNMIPath": ... # pragma: no cover 131 132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}") 156 157 def __eq__(self, rhs: Any) -> bool: 158 "Return True if path's are equal." 159 if not isinstance(rhs, GNMIPath): 160 return False 161 return self.path == rhs.path # pyright: ignore[reportUnknownVariableType] 162 163 def __hash__(self) -> int: 164 "Return hash of path." 165 return hash(tuple(_to_tuple(elem) for elem in self.path.elem)) 166 167 def __contains__(self, name: str) -> bool: 168 "Return True if element name is in path." 169 for elem in self.path.elem: 170 if elem.name == name: 171 return True 172 return False 173 174 def __repr__(self) -> str: 175 "Return string representation of path." 176 path = gnmistring.to_str(self.path) 177 if self.target or self.origin: 178 return f"GNMIPath({path!r}, origin={self.origin!r}, target={self.target!r})" 179 return f"GNMIPath({path!r})" 180 181 def __str__(self) -> str: 182 "Return path as string." 183 return gnmistring.to_str(self.path) 184 185 def __len__(self) -> int: 186 "Return the length of the path." 187 return len(self.path.elem) 188 189 def __truediv__(self, rhs: "GNMIPath | str") -> "GNMIPath": 190 "Append values to end of path." 191 if not isinstance(rhs, GNMIPath): 192 rhs = GNMIPath(rhs) 193 194 result = gnmi.Path( 195 elem=itertools.chain(self.path.elem, rhs.path.elem), 196 origin=self.origin, 197 target=self.target, 198 ) 199 return GNMIPath(result) 200 201 def __rtruediv__(self, lhs: str) -> "GNMIPath": 202 "Prepend values to the beginning of the path." 203 path = GNMIPath(lhs) 204 205 result = gnmi.Path(elem=itertools.chain(path.path.elem, self.path.elem)) 206 return GNMIPath(result) 207 208 def _slice( 209 self, start: int | None, stop: int | None, step: int | None 210 ) -> "GNMIPath": 211 "Return specified slice of GNMIPath." 212 if start is None: 213 start = 0 214 215 if stop is None: 216 stop = len(self) 217 elif stop < 0: 218 stop = len(self) + stop 219 220 if step is None: 221 step = 1 222 223 path = gnmi.Path() 224 for i in range(start, stop, step): 225 elem = self.path.elem[i] 226 path.elem.append(gnmi.PathElem(name=elem.name, key=elem.key)) 227 228 return GNMIPath(path) 229 230 def _rekey(self, keys: dict[str, Any]) -> "GNMIPath": 231 """Construct a new path with specified keys replaced.""" 232 if not keys: 233 raise ValueError("empty keys") 234 235 result = self.copy() 236 237 found = False 238 for elem in result.path.elem: 239 for key, value in keys.items(): 240 if key in elem.key: 241 elem.key[key] = str(value) 242 found = True 243 244 if not found: 245 raise ValueError(f"no keys found in path: {keys!r}") 246 247 return result
Concrete class for working with gnmi.Path
objects.
A GNMIPath
should be treated as an immutable object. You can access
the wrapped gnmi.Path
protobuf class using the .path
property.
GNMIPath
objects are hashable, but you must be careful that you do not
mutate them via the underlying gnmi.Path
.
You can construct a GNMIPath from a string:
path = GNMIPath("interfaces/interface[name=eth1]/state")
You can construct a GNMIPath object from a gnmi.Path
directly.
path = GNMIPath(update.path)
You can create paths by using an existing path as a template, without
modifying the original path. Use the set
method:
operStatus = GNMIPath("interfaces/interface/state/oper-status")
path = operStatus.set("interface", name="eth1")
Use [] to access the name/key of path elements:
path[1] == "interface"
path["interface", "name"] == "eth1"
path["name"] == "eth1"
61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path
Construct a GNMIPath
from a string or gnmi.Path
.
84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name
Return the first element of the path.
89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name
Return the last element of the path.
99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target
Return the path's target.
104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result
Construct a new GNMIPath with keys set for the given elem.
120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path))
Return a copy of the path.
132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}")
Return the specified element or key value.
385class GNMISubscription: 386 """Represents a gNMI subscription stream. 387 388 Returned from `GNMIClient.subscribe()`. 389 """ 390 391 _client: GNMIClient 392 _stream: _StreamTypeAlias | None 393 394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path) 403 404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub) 411 412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub) 422 423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub) 442 443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None 456 457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None 467 468 def cancel(self) -> None: 469 "Cancel the subscription." 470 if self._stream is not None: 471 self._stream.cancel() 472 self._stream = None 473 474 async def _subscribe(self): 475 assert self._stream is None 476 assert self._client._stub is not None 477 478 self._stream = cast( 479 _StreamTypeAlias, 480 self._client._stub.Subscribe(wait_for_ready=True), # type: ignore 481 ) 482 483 request = gnmi.SubscribeRequest(subscribe=self._sublist) 484 self._client._log_msg(request) 485 try: 486 await self._stream.write(request) 487 except grpc.RpcError as ex: 488 raise GNMIClientError(ex) from None 489 490 async def _read(self, stop_at_sync: bool) -> AsyncIterator[GNMIUpdate]: 491 assert self._stream is not None 492 493 while True: 494 msg = cast( 495 gnmi.SubscribeResponse, 496 await self._stream.read(), # type: ignore 497 ) 498 if msg is GRPC_EOF: 499 LOGGER.warning("gNMI _read: unexpected EOF") 500 return 501 502 self._client._log_msg(msg) 503 504 match msg.WhichOneof("response"): 505 case "update": 506 for update in _read_updates(msg.update): 507 yield update 508 case "sync_response": 509 if stop_at_sync: 510 if self._is_once(): 511 # FIXME? I expected Stratum to issue an EOF after 512 # the sync_response in "ONCE" mode, but it doesn't. 513 # For now, cancel the stream explictly. 514 self.cancel() 515 # Let grpc react to the cancellation immediately. 516 await asyncio.sleep(0) 517 return # All done! 518 LOGGER.warning("gNMI _read: ignored sync_response") 519 case other: 520 LOGGER.warning("gNMI _read: unexpected oneof: %s", other) 521 522 def _is_once(self) -> bool: 523 "Return true if the subscription is in ONCE mode." 524 return self._sublist.mode == gnmi.SubscriptionList.Mode.ONCE
Represents a gNMI subscription stream.
Returned from GNMIClient.subscribe()
.
394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path)
Initialize a GNMISubscription.
404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub)
Subscribe in ONCE
mode to given paths.
412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub)
Subscribe in ON_CHANGE
mode to given paths.
423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub)
Subscribe in SAMPLE
mode to given paths.
443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None
Async iterator for initial subscription updates.
Retrieve all updates up to sync_response
message.
457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None
Async iterator for all remaining subscription updates.
63@dataclass 64class GNMIUpdate: 65 "Represents a gNMI update returned from GNMIClient." 66 67 timestamp: int 68 path: GNMIPath 69 typed_value: gnmi.TypedValue | None 70 71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr) 81 82 def __repr__(self) -> str: 83 "Override repr to strip newline from end of `TypedValue`." 84 value = repr(self.typed_value).rstrip() 85 return f"GNMIUpdate(timestamp={self.timestamp!r}, path={self.path!r}, typed_value=`{value}`)"
Represents a gNMI update returned from GNMIClient.
71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr)
Return the value as a Python value.
117@dataclass(kw_only=True) 118class GRPCCredentialsTLS: 119 "GRPC channel credentials for Transport Layer Security (TLS)." 120 121 cacert: Path | bytes | None 122 """Certificate authority used to authenticate the certificate at the other 123 end of the connection.""" 124 125 cert: Path | bytes | None 126 "Certificate that identifies this side of the connection." 127 128 private_key: Path | bytes | None 129 "Private key associated with this side's certificate identity." 130 131 target_name_override: str = "" 132 "Override the target name used for TLS host name checking (useful for testing)." 133 134 call_credentials: grpc.AuthMetadataPlugin | None = None 135 """Optional GRPC call credentials for the client channel. Be aware that the 136 auth plugin's callback takes place in a different thread.""" 137 138 def to_client_credentials(self) -> grpc.ChannelCredentials: 139 "Create native SSL client credentials object." 140 root_certificates = _coerce_tls_path(self.cacert) 141 certificate_chain = _coerce_tls_path(self.cert) 142 private_key = _coerce_tls_path(self.private_key) 143 144 return self._compose_credentials( 145 grpc.ssl_channel_credentials( # pyright: ignore[reportUnknownMemberType] 146 root_certificates=root_certificates, 147 private_key=private_key, 148 certificate_chain=certificate_chain, 149 ) 150 ) 151 152 def to_server_credentials(self) -> grpc.ServerCredentials: 153 """Create native SSL server credentials object. 154 155 On the server side, we ignore the `call_credentials`. 156 """ 157 root_certificates = _coerce_tls_path(self.cacert) 158 certificate_chain = _coerce_tls_path(self.cert) 159 private_key = _coerce_tls_path(self.private_key) 160 161 return grpc.ssl_server_credentials( # pyright: ignore[reportUnknownMemberType] 162 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 163 root_certificates=root_certificates, 164 require_client_auth=True, 165 ) 166 167 def _compose_credentials( 168 self, channel_cred: grpc.ChannelCredentials 169 ) -> grpc.ChannelCredentials: 170 "Compose call credentials with channel credentials." 171 if not self.call_credentials: 172 return channel_cred 173 174 call_cred = ( 175 grpc.metadata_call_credentials( # pyright: ignore[reportUnknownMemberType] 176 self.call_credentials 177 ) 178 ) 179 return grpc.composite_channel_credentials( # pyright: ignore[reportUnknownMemberType] 180 channel_cred, 181 call_cred, 182 )
GRPC channel credentials for Transport Layer Security (TLS).
Certificate authority used to authenticate the certificate at the other end of the connection.
Private key associated with this side's certificate identity.
Override the target name used for TLS host name checking (useful for testing).
Optional GRPC call credentials for the client channel. Be aware that the auth plugin's callback takes place in a different thread.
138 def to_client_credentials(self) -> grpc.ChannelCredentials: 139 "Create native SSL client credentials object." 140 root_certificates = _coerce_tls_path(self.cacert) 141 certificate_chain = _coerce_tls_path(self.cert) 142 private_key = _coerce_tls_path(self.private_key) 143 144 return self._compose_credentials( 145 grpc.ssl_channel_credentials( # pyright: ignore[reportUnknownMemberType] 146 root_certificates=root_certificates, 147 private_key=private_key, 148 certificate_chain=certificate_chain, 149 ) 150 )
Create native SSL client credentials object.
152 def to_server_credentials(self) -> grpc.ServerCredentials: 153 """Create native SSL server credentials object. 154 155 On the server side, we ignore the `call_credentials`. 156 """ 157 root_certificates = _coerce_tls_path(self.cacert) 158 certificate_chain = _coerce_tls_path(self.cert) 159 private_key = _coerce_tls_path(self.private_key) 160 161 return grpc.ssl_server_credentials( # pyright: ignore[reportUnknownMemberType] 162 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 163 root_certificates=root_certificates, 164 require_client_auth=True, 165 )
Create native SSL server credentials object.
On the server side, we ignore the call_credentials
.
41class GRPCStatusCode(_EnumBase): 42 "IntEnum equivalent to `grpc.StatusCode`." 43 44 OK = rpc_code.OK 45 CANCELLED = rpc_code.CANCELLED 46 UNKNOWN = rpc_code.UNKNOWN 47 FAILED_PRECONDITION = rpc_code.FAILED_PRECONDITION 48 INVALID_ARGUMENT = rpc_code.INVALID_ARGUMENT 49 DEADLINE_EXCEEDED = rpc_code.DEADLINE_EXCEEDED 50 NOT_FOUND = rpc_code.NOT_FOUND 51 ALREADY_EXISTS = rpc_code.ALREADY_EXISTS 52 PERMISSION_DENIED = rpc_code.PERMISSION_DENIED 53 UNAUTHENTICATED = rpc_code.UNAUTHENTICATED 54 RESOURCE_EXHAUSTED = rpc_code.RESOURCE_EXHAUSTED 55 ABORTED = rpc_code.ABORTED 56 OUT_OF_RANGE = rpc_code.OUT_OF_RANGE 57 UNIMPLEMENTED = rpc_code.UNIMPLEMENTED 58 INTERNAL = rpc_code.INTERNAL 59 UNAVAILABLE = rpc_code.UNAVAILABLE 60 DATA_LOSS = rpc_code.DATA_LOSS 61 62 @classmethod 63 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 64 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 65 n: Any = val.value[0] # pyright: ignore[reportUnknownMemberType] 66 assert isinstance(n, int) 67 return GRPCStatusCode(n) 68 69 @staticmethod 70 def _validate_enum() -> None: 71 "Verify that GRPCStatusCode covers every possible grpc.StatusCode." 72 for value in grpc.StatusCode: 73 n: Any = value.value[0] # pyright: ignore[reportUnknownMemberType] 74 assert isinstance(n, int) 75 assert GRPCStatusCode[value.name].value == n, value.name
IntEnum equivalent to grpc.StatusCode
.
62 @classmethod 63 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 64 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 65 n: Any = val.value[0] # pyright: ignore[reportUnknownMemberType] 66 assert isinstance(n, int) 67 return GRPCStatusCode(n)
Create corresponding GRPCStatusCode from a grpc.StatusCode object.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator