finsy
Finsy P4Runtime Controller Library
Finsy is a P4Runtime controller library written in Python using asyncio. Finsy includes support for gNMI.
Check out the examples directory for some demonstration programs.
Installation
Finsy requires Python 3.10 or later. To install the latest version, type pip install finsy
.
P4Runtime Scripts
With Finsy, you can write a Python script that reads/writes P4Runtime entities for a single switch.
Here is a complete example that retrieves the P4Info from a switch:
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Print out a description of the switch's P4Info, if one is configured.
print(sw1.p4info)
fy.run(main())
Here is another example that prints out all non-default table entries.
import finsy as fy
async def main():
async with fy.Switch("sw1", "127.0.0.1:50001") as sw1:
# Do a wildcard read for table entries.
async for entry in sw1.read(fy.P4TableEntry()):
print(entry)
fy.run(main())
P4Runtime Controller
You can also write a P4Runtime controller that manages multiple switches independently. Your controller can react to events from the Switch by changing the contents of P4 tables.
Each switch is managed by an async ready_handler
function. Your ready_handler
function can read or
update various P4Runtime entities in the switch. It can also create tasks to listen for
packets or digests.
When you write P4Runtime updates to the switch, you use a unary operator (+, -, \~) to specify the operation: INSERT (+), DELETE (-) or MODIFY (\~).
import finsy as fy
async def ready_handler(sw: fy.Switch):
await sw.delete_all()
await sw.write(
[
# Insert (+) multicast group with ports 1, 2, 3 and CONTROLLER.
+fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3, 255]),
# Modify (~) default table entry to flood all unmatched packets.
~fy.P4TableEntry(
"ipv4",
action=fy.Action("flood"),
is_default_action=True,
),
]
)
async for packet in sw.read_packets():
print(f"{sw.name}: {packet}")
Use the SwitchOptions
class to specify each switch's settings, including the p4info/p4blob and
ready_handler
. Use the Controller
class to drive multiple switch connections. Each switch will call back
into your ready_handler
function after the P4Runtime connection is established.
from pathlib import Path
options = fy.SwitchOptions(
p4info=Path("hello.p4info.txt"),
p4blob=Path("hello.json"),
ready_handler=ready_handler,
)
controller = fy.Controller([
fy.Switch("sw1", "127.0.0.1:50001", options),
fy.Switch("sw2", "127.0.0.1:50002", options),
fy.Switch("sw3", "127.0.0.1:50003", options),
])
fy.run(controller.run())
Your ready_handler
can spawn concurrent tasks with the Switch.create_task
method. Tasks
created this way will have their lifetimes managed by the switch object.
If the switch disconnects or its role changes to backup, the task running your ready_handler
(and any tasks it spawned) will be cancelled and the ready_handler
will begin again.
For more examples, see the examples directory.
Switch Read/Write API
The Switch
class provides the API for interacting with P4Runtime switches. You will control
a Switch object with a "ready handler" function. The ready handler
is an
async function that is called when the switch is ready to accept commands.
Your ready handler
will typically write some control entities to the switch, then
listen for incoming events and react to them with more writes. You may occasionally read entities
from the switch.
When your ready handler
is invoked, there is already a P4Runtime channel established, with client
arbitration completed, and pipeline configured as specified in SwitchOptions
.
Here is an example skeleton program. The ready handler
is named ready()
.
async def ready(switch: fy.Switch):
# Check if switch is the primary. If not, we may want to proceed
# in read-only mode. In this example, ignore switch if it's a backup.
if not switch.is_primary:
return
# If we're reconnecting to a switch, it will already have runtime state.
# In this example, we just delete all entities and start over.
await switch.delete_all()
# Provision the pipeline with one or more `write` transactions. Each
# `write` is a single WriteRequest which may contain multiple updates.
await switch.write(
# [Next section will cover what goes here.]
)
# Listen for events and respond to them. This "infinite" loop will
# continue until the Switch disconnects, changes primary/backup status,
# or the controller is stopped.
async for packet in switch.read_packets():
await handle_packet(switch, packet)
The Switch class provides a switch.create_task
method to start a managed task.
Tasks allow you to perform concurrent operations on the same switch. We could have
written the last stanza above that reads packets in an infinite loop as a separate
task. It's okay for the ready handler function to return early; any tasks it
created will still run.
Writes
Use the write()
method to write one or more P4Runtime updates and packets.
A P4Runtime update supports one of three operations: INSERT, MODIFY or DELETE. Some entities support all three operations. Other entities only support MODIFY.
Entity | Operations Permitted | Related Classes |
---|---|---|
P4TableEntry |
INSERT, MODIFY, DELETE | Match , Action , IndirectAction , P4MeterConfig , P4CounterData , P4MeterCounterData |
P4ActionProfileMember |
INSERT, MODIFY, DELETE | |
P4ActionProfileGroup |
INSERT, MODIFY, DELETE | P4Member |
P4MulticastGroupEntry |
INSERT, MODIFY, DELETE | |
P4CloneSessionEntry |
INSERT, MODIFY, DELETE | |
P4DigestEntry |
INSERT, MODIFY, DELETE | |
P4ExternEntry |
INSERT, MODIFY, DELETE | |
P4RegisterEntry |
MODIFY | |
P4CounterEntry |
MODIFY | P4CounterData |
P4DirectCounterEntry |
MODIFY | P4CounterData |
P4MeterEntry |
MODIFY | P4MeterConfig , P4MeterCounterData |
P4DirectMeterEntry |
MODIFY | |
P4ValueSetEntry |
MODIFY | P4ValueSetMember |
Insert/Modify/Delete Updates
To specify the operation, use a unary +
(INSERT), ~
(MODIFY), or -
(DELETE). If you
do not specify the operation, write
will raise a ValueError
exception.
Here is an example showing how to insert and delete two different entities in the same WriteRequest.
await switch.write([
+fy.P4TableEntry( # unary + means INSERT
"ipv4",
match=fy.Match(dest="192.168.1.0/24"),
action=fy.Action("forward", port=1),
),
-fy.P4TableEntry( # unary - means DELETE
"ipv4",
match=fy.Match(dest="192.168.2.0/24"),
action=fy.Action("forward", port=2),
),
])
You should not insert, modify or delete the same entry in the same WriteRequest.
If you are performing the same operation on all entities, you can use the Switch
insert
, delete
, or modify
methods.
await switch.insert([
fy.P4MulticastGroupEntry(1, replicas=[1, 2, 3]),
fy.P4MulticastGroupEntry(2, replicas=[4, 5, 6]),
])
Modify-Only Updates
For entities that only support the modify operation, you do not need to specify the operation. (You can
optionally use ~
.)
await switch.write([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
You can also use the modify
method:
await switch.modify([
fy.P4RegisterEntry("reg1", index=0, data=0),
fy.P4RegisterEntry("reg1", index=1, data=1),
fy.P4RegisterEntry("reg1", index=2, data=2),
])
If you pass a modify-only entity to the insert
or delete
methods, the P4Runtime server will
return an error.
Sending Packets
Use the write
method to send a packet.
await switch.write([fy.P4PacketOut(b"a payload.....", port=3)])
You can include other entities in the same call. Any non-update objects (e.g. P4PacketOut, P4DigestListAck) will be sent before the WriteRequest.
Listening for Packets
To receive packets, use the async iterator Switch.read_packets()
.
In this example, pkt
is a P4PacketIn
object.
read_packets
can filter for a specific eth_type
.
# Read packets filtering only for ARP (eth_type == 0x0806).
async for pkt in switch.read_packets(eth_types={0x0806}):
# You can access the packet payload `pkt.payload` or any metadata value,
# e.g. `pkt['ingress_port']`
print(pkt.payload)
print(pkt['ingress_port'])
Listening for Digests
To receive digests, use the async iterator Switch.read_digests
. You must specify
the name of the digest from your P4 program.
async for digest in switch.read_digests("digest_t"):
# You can access the digest metadata e.g. `digest['ingress_port']`
# Your code may need to update table entries based on the digest data.
# To ack the digest, write `digest.ack()`.
await switch.write([entry, ...])
await switch.write([digest.ack()])
To acknowledge the digest entry, you can write digest.ack()
.
Listening for Idle Timeouts
To receive idle timeout notifications, use the async iterator Switch.read_idle_timeouts
.
You will receive a P4IdleTimeoutNotification
which contains multiple table
entries -- one for each entry that timed out.
async for timeout in switch.read_idle_timeouts():
for entry in timeout.table_entry:
print(timeout.timestamp, entry)
Other Events
A P4 switch may report other events using the EventEmitter
API. See
the SwitchEvent
class for the event types. Each switch has a switch.ee
attribute that lets your code register for event callbacks.
Development and Testing
Perform these steps to set up your local environment for Finsy development, or try the codespace. Finsy requires Python 3.10 or later. If poetry is not installed, follow these directions to install it.
Clone and Prepare a Virtual Environment
The poetry install
command installs all development dependencies into the
virtual environment (venv).
$ git clone https://github.com/byllyfish/finsy.git
$ cd finsy
$ python3 -m venv .venv
$ poetry install
Run Unit Tests
When you run pytest from the top level of the repository, you will run the unit tests.
$ poetry run pytest
Run Integration Tests
When you run pytest from within the examples
directory, you will run the integration
tests instead of the unit tests. The integration tests run the example programs against a
Mininet network. Docker or podman are required.
$ cd examples
$ poetry run pytest
API Documentation
1""" 2.. include:: ../README.md 3 4# API Documentation 5""" 6 7# Copyright (c) 2022-2023 Bill Fisher 8# 9# Licensed under the Apache License, Version 2.0 (the "License"); 10# you may not use this file except in compliance with the License. 11# You may obtain a copy of the License at 12# 13# http://www.apache.org/licenses/LICENSE-2.0 14# 15# Unless required by applicable law or agreed to in writing, software 16# distributed under the License is distributed on an "AS IS" BASIS, 17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18# See the License for the specific language governing permissions and 19# limitations under the License. 20 21__version__ = "0.27.0" 22 23import sys 24 25if sys.version_info < (3, 10): # pragma: no cover 26 raise RuntimeError("Requires Python 3.10 or later.") 27 28from .controller import Controller 29from .gnmiclient import GNMIClient, GNMISubscription, GNMIUpdate 30from .gnmipath import GNMIPath 31from .grpcutil import GRPCCredentialsTLS, GRPCStatusCode 32from .log import LoggerAdapter 33from .macaddr import MACAddress 34from .p4client import P4Client, P4ClientError, P4Error 35from .p4entity import ( 36 P4ActionProfileGroup, 37 P4ActionProfileMember, 38 P4CloneSessionEntry, 39 P4CounterData, 40 P4CounterEntry, 41 P4DigestEntry, 42 P4DigestList, 43 P4DigestListAck, 44 P4DirectCounterEntry, 45 P4DirectMeterEntry, 46 P4ExternEntry, 47 P4IndirectAction, 48 P4Member, 49 P4MeterConfig, 50 P4MeterCounterData, 51 P4MeterEntry, 52 P4MulticastGroupEntry, 53 P4PacketIn, 54 P4PacketOut, 55 P4RegisterEntry, 56 P4TableAction, 57 P4TableEntry, 58 P4TableMatch, 59 P4ValueSetEntry, 60) 61from .p4schema import P4ConfigAction, P4CounterUnit, P4Schema 62from .ports import SwitchPort, SwitchPortList 63from .runner import run 64from .switch import Switch, SwitchEvent, SwitchOptions 65 66Match = P4TableMatch 67"`Match` is an alias for P4TableMatch." 68 69Action = P4TableAction 70"`Action` is an alias for P4TableAction." 71 72IndirectAction = P4IndirectAction 73"`IndirectAction` is an alias for P4IndirectAction." 74 75__all__ = [ 76 "run", 77 "Controller", 78 "LoggerAdapter", 79 "MACAddress", 80 "P4ActionProfileGroup", 81 "P4ActionProfileMember", 82 "P4Client", 83 "P4ClientError", 84 "P4CloneSessionEntry", 85 "P4CounterData", 86 "P4CounterEntry", 87 "P4CounterUnit", 88 "P4DigestEntry", 89 "P4DigestList", 90 "P4DigestListAck", 91 "P4DirectCounterEntry", 92 "P4DirectMeterEntry", 93 "P4Error", 94 "P4ExternEntry", 95 "IndirectAction", # alias for P4IndirectAction 96 "P4IndirectAction", 97 "P4Member", 98 "P4MeterConfig", 99 "P4MeterCounterData", 100 "P4MeterEntry", 101 "P4MulticastGroupEntry", 102 "P4PacketIn", 103 "P4PacketOut", 104 "P4RegisterEntry", 105 "Action", # alias for P4TableAction 106 "P4TableAction", 107 "P4TableEntry", 108 "Match", # alias for P4TableMatch 109 "P4TableMatch", 110 "P4ValueSetEntry", 111 "P4ConfigAction", 112 "P4Schema", 113 "Switch", 114 "SwitchEvent", 115 "SwitchOptions", 116 "SwitchPort", 117 "SwitchPortList", 118 "GNMIClient", 119 "GNMIPath", 120 "GNMISubscription", 121 "GNMIUpdate", 122 "GRPCCredentialsTLS", 123 "GRPCStatusCode", 124]
53def run(coro: Coroutine[Any, Any, None]) -> None: 54 """`finsy.run` provides a useful wrapper around `asyncio.run`. 55 56 This function implements common boilerplate for running a Finsy application. 57 58 - Set up basic logging to stderr at the INFO log level. 59 - Set up a signal handler for SIGTERM that shuts down gracefully. 60 - Set up caching of P4Info data so common definitions are re-used. 61 62 Example: 63 64 ```python 65 import finsy as fy 66 67 async def main(): 68 async with fy.Switch("sw", "127.0.0.1:50001") as sw: 69 print(sw.p4info) 70 71 if __name__ == "__main__": 72 fy.run(main()) 73 ``` 74 75 If you choose to use `asyncio.run` instead, your P4Schema/P4Info objects 76 will not be eligible for sharing. You can create your own `P4SchemaCache` 77 context manager to implement this. 78 """ 79 try: 80 asyncio.run(_finsy_boilerplate(coro)) 81 except (KeyboardInterrupt, asyncio.CancelledError): 82 pass
finsy.run
provides a useful wrapper around asyncio.run
.
This function implements common boilerplate for running a Finsy application.
- Set up basic logging to stderr at the INFO log level.
- Set up a signal handler for SIGTERM that shuts down gracefully.
- Set up caching of P4Info data so common definitions are re-used.
Example:
import finsy as fy
async def main():
async with fy.Switch("sw", "127.0.0.1:50001") as sw:
print(sw.p4info)
if __name__ == "__main__":
fy.run(main())
If you choose to use asyncio.run
instead, your P4Schema/P4Info objects
will not be eligible for sharing. You can create your own P4SchemaCache
context manager to implement this.
30@final 31class Controller: 32 """Represents a collection of P4Runtime switches. 33 34 Each `Switch` in the Controller is identified by its name. Each name must 35 be unique. 36 37 ``` 38 switches = [ 39 fy.Switch("sw1", "10.0.0.1:50000"), 40 fy.Switch("sw2", "10.0.0.2:50000"), 41 ] 42 controller = fy.Controller(switches) 43 await controller.run() 44 ``` 45 46 A Controller can be running or stopped. There are two ways to run a 47 Controller. You can use the `Controller.run()` method, or you can use 48 a Controller as a context manager. 49 50 ``` 51 controller = fy.Controller(switches) 52 async with controller: 53 # Let controller run for 60 seconds. 54 await asyncio.sleep(60) 55 ``` 56 57 You can `add` or `remove` switches regardless of whether a Controller is 58 running or not. If the Controller is not running, adding or removing a Switch is 59 instantaneous. If the Controller is running, adding a Switch will start 60 it running asynchronously. Removing a Switch will schedule the Switch to stop, 61 but defer actual removal until the Switch has stopped asynchronously. 62 63 When a switch starts inside a Controller, it fires the `CONTROLLER_ENTER` 64 event. When a switch stops inside a Controller, it fires the `CONTROLLER_LEAVE` 65 event. 66 67 A Controller supports these methods to access its contents: 68 69 - len(controller): Return number of switches in the controller. 70 - controller[name]: Return the switch with the given name. 71 - controller.get(name): Return the switch with the given name, or None if not found. 72 73 You can iterate over the switches in a Controller using a for loop: 74 75 ``` 76 for switch in controller: 77 print(switch.name) 78 ``` 79 80 Any task or sub-task running inside a controller can retrieve its 81 Controller object using the `Controller.current()` method. 82 """ 83 84 _switches: dict[str, Switch] 85 _pending_removal: set[Switch] 86 _task_count: CountdownFuture 87 88 _control_task: asyncio.Task[Any] | None = None 89 "Keep track of controller's main task." 90 91 def __init__(self, switches: Iterable[Switch] = ()): 92 """Initialize Controller object with an initial list of switches. 93 94 Args: 95 switches: a collection or other iterable of Switch objects. 96 """ 97 self._switches = {} 98 self._pending_removal = set() 99 self._task_count = CountdownFuture() 100 101 for switch in switches: 102 if switch.name in self._switches: 103 raise ValueError(f"Switch named {switch.name!r} already exists") 104 self._switches[switch.name] = switch 105 106 @property 107 def running(self) -> bool: 108 "True if Controller is running." 109 return self._control_task is not None 110 111 async def run(self) -> None: 112 "Run the controller." 113 async with self: 114 await wait_for_cancel() 115 116 def stop(self) -> None: 117 "Stop the controller if it is running." 118 if self._control_task is not None: 119 self._control_task.cancel() 120 121 async def __aenter__(self) -> Self: 122 "Run the controller as a context manager (see also run())." 123 assert not self.running, "Controller.__aenter__ is not re-entrant" 124 assert self._task_count.value() == 0 125 assert not self._pending_removal 126 127 self._control_task = asyncio.current_task() 128 _CONTROLLER.set(self) 129 130 try: 131 # Start each switch running. 132 for switch in self: 133 self._start_switch(switch) 134 except Exception: 135 self._control_task = None 136 _CONTROLLER.set(None) 137 raise 138 139 return self 140 141 async def __aexit__( 142 self, 143 _exc_type: type[BaseException] | None, 144 _exc_val: BaseException | None, 145 _exc_tb: TracebackType | None, 146 ) -> bool | None: 147 "Run the controller as a context manager (see also run())." 148 assert self.running 149 150 try: 151 # Stop all the switches. 152 for switch in self: 153 self._stop_switch(switch) 154 155 # Wait for switch tasks to finish. 156 await self._task_count.wait() 157 158 finally: 159 self._control_task = None 160 _CONTROLLER.set(None) 161 162 def add(self, switch: Switch) -> None: 163 """Add a switch to the controller. 164 165 If the controller is running, tell the switch to start. 166 167 Args: 168 switch: the Switch object. 169 """ 170 if switch.name in self._switches: 171 raise ValueError(f"Switch named {switch.name!r} already exists") 172 173 self._switches[switch.name] = switch 174 if self.running: 175 self._start_switch(switch) 176 177 def remove(self, switch: Switch) -> asyncio.Event: 178 """Remove a switch from the controller. 179 180 If the controller is running, tell the switch to stop and schedule it 181 for removal when it fully stops. 182 183 Args: 184 switch: the Switch object. 185 """ 186 name = switch.name 187 if self._switches.get(name, None) is not switch: 188 raise ValueError(f"Switch named {name!r} not found") 189 190 del self._switches[name] 191 192 event = asyncio.Event() 193 if self.running: 194 # When controller is running, event will complete when switch 195 # is actually stopped. 196 self._stop_switch(switch) 197 self._pending_removal.add(switch) 198 199 def _controller_leave(sw: Switch): 200 self._pending_removal.discard(sw) 201 event.set() 202 203 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 204 else: 205 # When controller is not running, event completes immediately. 206 event.set() 207 208 return event 209 210 def _start_switch(self, switch: Switch): 211 "Start the switch's control task." 212 LOGGER.debug("Controller._start_switch: %r", switch) 213 assert switch._control_task is None # pyright: ignore[reportPrivateUsage] 214 215 switch.ee.emit(SwitchEvent.CONTROLLER_ENTER, switch) 216 217 task = asyncio.create_task(switch.run(), name=f"fy:{switch.name}") 218 switch._control_task = task # pyright: ignore[reportPrivateUsage] 219 self._task_count.increment() 220 221 def _switch_done(done: asyncio.Task[Any]): 222 switch._control_task = None # pyright: ignore[reportPrivateUsage] 223 switch.ee.emit(SwitchEvent.CONTROLLER_LEAVE, switch) 224 self._task_count.decrement() 225 226 if not done.cancelled(): 227 ex = done.exception() 228 if ex is not None: 229 if not isinstance(ex, SwitchFailFastError): 230 # The `fail_fast` error has already been logged. If 231 # it's any other error, log it. (There shouldn't be 232 # any other error.) 233 LOGGER.critical( 234 "Controller task %r failed", 235 done.get_name(), 236 exc_info=ex, 237 ) 238 # Shutdown the program cleanly due to switch failure. 239 raise SystemExit(99) 240 241 task.add_done_callback(_switch_done) 242 243 def _stop_switch(self, switch: Switch): 244 "Stop the switch's control task." 245 LOGGER.debug("Controller._stop_switch: %r", switch) 246 247 if switch._control_task is not None: # pyright: ignore[reportPrivateUsage] 248 switch._control_task.cancel() # pyright: ignore[reportPrivateUsage] 249 250 def __len__(self) -> int: 251 "Return the number of switches." 252 return len(self._switches) 253 254 def __iter__(self) -> Iterator[Switch]: 255 "Iterate over the switches." 256 return iter(self._switches.values()) 257 258 def __getitem__(self, name: str) -> Switch: 259 "Retrieve a switch by name." 260 return self._switches[name] 261 262 def get(self, name: str) -> Switch | None: 263 "Retrieve a switch by name, or return None if not found." 264 return self._switches.get(name) 265 266 @staticmethod 267 def current() -> "Controller": 268 "Return the current Controller object." 269 result = _CONTROLLER.get() 270 if result is None: 271 raise RuntimeError("controller does not exist") 272 return result
Represents a collection of P4Runtime switches.
Each Switch
in the Controller is identified by its name. Each name must
be unique.
switches = [
fy.Switch("sw1", "10.0.0.1:50000"),
fy.Switch("sw2", "10.0.0.2:50000"),
]
controller = fy.Controller(switches)
await controller.run()
A Controller can be running or stopped. There are two ways to run a
Controller. You can use the Controller.run()
method, or you can use
a Controller as a context manager.
controller = fy.Controller(switches)
async with controller:
# Let controller run for 60 seconds.
await asyncio.sleep(60)
You can add
or remove
switches regardless of whether a Controller is
running or not. If the Controller is not running, adding or removing a Switch is
instantaneous. If the Controller is running, adding a Switch will start
it running asynchronously. Removing a Switch will schedule the Switch to stop,
but defer actual removal until the Switch has stopped asynchronously.
When a switch starts inside a Controller, it fires the CONTROLLER_ENTER
event. When a switch stops inside a Controller, it fires the CONTROLLER_LEAVE
event.
A Controller supports these methods to access its contents:
- len(controller): Return number of switches in the controller.
- controller[name]: Return the switch with the given name.
- controller.get(name): Return the switch with the given name, or None if not found.
You can iterate over the switches in a Controller using a for loop:
for switch in controller:
print(switch.name)
Any task or sub-task running inside a controller can retrieve its
Controller object using the Controller.current()
method.
91 def __init__(self, switches: Iterable[Switch] = ()): 92 """Initialize Controller object with an initial list of switches. 93 94 Args: 95 switches: a collection or other iterable of Switch objects. 96 """ 97 self._switches = {} 98 self._pending_removal = set() 99 self._task_count = CountdownFuture() 100 101 for switch in switches: 102 if switch.name in self._switches: 103 raise ValueError(f"Switch named {switch.name!r} already exists") 104 self._switches[switch.name] = switch
Initialize Controller object with an initial list of switches.
Arguments:
- switches: a collection or other iterable of Switch objects.
106 @property 107 def running(self) -> bool: 108 "True if Controller is running." 109 return self._control_task is not None
True if Controller is running.
111 async def run(self) -> None: 112 "Run the controller." 113 async with self: 114 await wait_for_cancel()
Run the controller.
116 def stop(self) -> None: 117 "Stop the controller if it is running." 118 if self._control_task is not None: 119 self._control_task.cancel()
Stop the controller if it is running.
121 async def __aenter__(self) -> Self: 122 "Run the controller as a context manager (see also run())." 123 assert not self.running, "Controller.__aenter__ is not re-entrant" 124 assert self._task_count.value() == 0 125 assert not self._pending_removal 126 127 self._control_task = asyncio.current_task() 128 _CONTROLLER.set(self) 129 130 try: 131 # Start each switch running. 132 for switch in self: 133 self._start_switch(switch) 134 except Exception: 135 self._control_task = None 136 _CONTROLLER.set(None) 137 raise 138 139 return self
Run the controller as a context manager (see also run()).
162 def add(self, switch: Switch) -> None: 163 """Add a switch to the controller. 164 165 If the controller is running, tell the switch to start. 166 167 Args: 168 switch: the Switch object. 169 """ 170 if switch.name in self._switches: 171 raise ValueError(f"Switch named {switch.name!r} already exists") 172 173 self._switches[switch.name] = switch 174 if self.running: 175 self._start_switch(switch)
Add a switch to the controller.
If the controller is running, tell the switch to start.
Arguments:
- switch: the Switch object.
177 def remove(self, switch: Switch) -> asyncio.Event: 178 """Remove a switch from the controller. 179 180 If the controller is running, tell the switch to stop and schedule it 181 for removal when it fully stops. 182 183 Args: 184 switch: the Switch object. 185 """ 186 name = switch.name 187 if self._switches.get(name, None) is not switch: 188 raise ValueError(f"Switch named {name!r} not found") 189 190 del self._switches[name] 191 192 event = asyncio.Event() 193 if self.running: 194 # When controller is running, event will complete when switch 195 # is actually stopped. 196 self._stop_switch(switch) 197 self._pending_removal.add(switch) 198 199 def _controller_leave(sw: Switch): 200 self._pending_removal.discard(sw) 201 event.set() 202 203 switch.ee.once(SwitchEvent.CONTROLLER_LEAVE, _controller_leave) # type: ignore 204 else: 205 # When controller is not running, event completes immediately. 206 event.set() 207 208 return event
Remove a switch from the controller.
If the controller is running, tell the switch to stop and schedule it for removal when it fully stops.
Arguments:
- switch: the Switch object.
254 def __iter__(self) -> Iterator[Switch]: 255 "Iterate over the switches." 256 return iter(self._switches.values())
Iterate over the switches.
258 def __getitem__(self, name: str) -> Switch: 259 "Retrieve a switch by name." 260 return self._switches[name]
Retrieve a switch by name.
262 def get(self, name: str) -> Switch | None: 263 "Retrieve a switch by name, or return None if not found." 264 return self._switches.get(name)
Retrieve a switch by name, or return None if not found.
68class LoggerAdapter(_BaseLoggerAdapter): 69 """Custom log adapter to include the name of the current task.""" 70 71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs 81 82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name representation for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
Custom log adapter to include the name of the current task.
71 def process( 72 self, 73 msg: Any, 74 kwargs: MutableMapping[str, Any], 75 ) -> tuple[Any, MutableMapping[str, Any]]: 76 """Process the logging message and keyword arguments passed in to a 77 logging call to insert contextual information. 78 """ 79 task_name = _get_current_task_name() 80 return f"[{task_name}] {msg}", kwargs
Process the logging message and keyword arguments passed in to a logging call to insert contextual information.
82 def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: 83 """INFO level uses a concise task name representation for readability.""" 84 if self.logger.isEnabledFor(logging.INFO): 85 task_name = _get_current_task_name(True) 86 self.logger.info(f"[{task_name}] {msg}", *args, **kwargs)
INFO level uses a concise task name representation for readability.
24@functools.total_ordering 25class MACAddress: 26 """Concrete class for a MAC address.""" 27 28 __slots__ = ("_mac", "__weakref__") 29 _mac: int 30 31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value)) 44 45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac) 49 50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH 54 55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0 59 60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0 64 65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private 69 70 @property 71 def is_unspecified(self) -> bool: 72 "Return true if MAC address is all zeros." 73 return self._mac == 0 74 75 @property 76 def is_broadcast(self) -> bool: 77 "Return true if MAC address is the broadcast address." 78 return self._mac == (1 << _BIT_WIDTH) - 1 79 80 def __int__(self) -> int: 81 return self._mac 82 83 def __eq__(self, rhs: object) -> bool: 84 if not isinstance(rhs, MACAddress): 85 return NotImplemented 86 return self._mac == rhs._mac 87 88 def __lt__(self, rhs: object) -> bool: 89 if not isinstance(rhs, MACAddress): 90 return NotImplemented 91 return self._mac < rhs._mac 92 93 def __repr__(self) -> str: 94 return f"MACAddress({_to_string(self._mac)!r})" 95 96 def __str__(self) -> str: 97 return _to_string(self._mac) 98 99 def __hash__(self) -> int: 100 return hash(hex(self._mac))
Concrete class for a MAC address.
31 def __init__(self, value: object) -> None: 32 "Construct a MAC address from a string, integer or bytes object." 33 match value: 34 case int(): 35 self._mac = _from_int(value) 36 case bytes(): 37 self._mac = _from_bytes(value) 38 case MACAddress(): 39 self._mac = value._mac 40 case _: 41 # Input argument is a MAC string, or an object that is 42 # formatted as MAC string (behave like ipaddress module). 43 self._mac = _from_string(str(value))
Construct a MAC address from a string, integer or bytes object.
45 @property 46 def packed(self) -> bytes: 47 "Return the MAC address a byte string." 48 return _to_bytes(self._mac)
Return the MAC address a byte string.
50 @property 51 def max_prefixlen(self) -> int: 52 "Return the maximum prefix length (48 bits)." 53 return _BIT_WIDTH
Return the maximum prefix length (48 bits).
55 @property 56 def is_multicast(self) -> bool: 57 "Return true if MAC address has the multicast bit set." 58 return self._mac & (1 << 40) != 0
Return true if MAC address has the multicast bit set.
60 @property 61 def is_private(self) -> bool: 62 "Return true if MAC address has the locally administered bit set." 63 return self._mac & (1 << 41) != 0
Return true if MAC address has the locally administered bit set.
65 @property 66 def is_global(self) -> bool: 67 "Return true if the locally administered bit is not set." 68 return not self.is_private
Return true if the locally administered bit is not set.
1458@decodable("action_profile_group") 1459@dataclass(slots=True) 1460class P4ActionProfileGroup(_P4Writable): 1461 "Represents a P4Runtime ActionProfileGroup." 1462 1463 action_profile_id: str = "" 1464 _: KW_ONLY 1465 group_id: int = 0 1466 max_size: int = 0 1467 members: Sequence[P4Member] | None = None 1468 1469 def encode(self, schema: P4Schema) -> p4r.Entity: 1470 "Encode P4ActionProfileGroup as protobuf." 1471 if not self.action_profile_id: 1472 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1473 1474 profile = schema.action_profiles[self.action_profile_id] 1475 1476 if self.members is not None: 1477 members = [member.encode() for member in self.members] 1478 else: 1479 members = None 1480 1481 entry = p4r.ActionProfileGroup( 1482 action_profile_id=profile.id, 1483 group_id=self.group_id, 1484 members=members, 1485 max_size=self.max_size, 1486 ) 1487 return p4r.Entity(action_profile_group=entry) 1488 1489 @classmethod 1490 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1491 "Decode protobuf to ActionProfileGroup data." 1492 entry = msg.action_profile_group 1493 if entry.action_profile_id == 0: 1494 return cls() 1495 1496 profile = schema.action_profiles[entry.action_profile_id] 1497 1498 if entry.members: 1499 members = [P4Member.decode(member) for member in entry.members] 1500 else: 1501 members = None 1502 1503 return cls( 1504 action_profile_id=profile.alias, 1505 group_id=entry.group_id, 1506 max_size=entry.max_size, 1507 members=members, 1508 ) 1509 1510 def action_str(self, _schema: P4Schema) -> str: 1511 "Return string representation of the weighted members." 1512 if not self.members: 1513 return "" 1514 1515 return " ".join( 1516 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1517 )
Represents a P4Runtime ActionProfileGroup.
1469 def encode(self, schema: P4Schema) -> p4r.Entity: 1470 "Encode P4ActionProfileGroup as protobuf." 1471 if not self.action_profile_id: 1472 return p4r.Entity(action_profile_group=p4r.ActionProfileGroup()) 1473 1474 profile = schema.action_profiles[self.action_profile_id] 1475 1476 if self.members is not None: 1477 members = [member.encode() for member in self.members] 1478 else: 1479 members = None 1480 1481 entry = p4r.ActionProfileGroup( 1482 action_profile_id=profile.id, 1483 group_id=self.group_id, 1484 members=members, 1485 max_size=self.max_size, 1486 ) 1487 return p4r.Entity(action_profile_group=entry)
Encode P4ActionProfileGroup as protobuf.
1489 @classmethod 1490 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1491 "Decode protobuf to ActionProfileGroup data." 1492 entry = msg.action_profile_group 1493 if entry.action_profile_id == 0: 1494 return cls() 1495 1496 profile = schema.action_profiles[entry.action_profile_id] 1497 1498 if entry.members: 1499 members = [P4Member.decode(member) for member in entry.members] 1500 else: 1501 members = None 1502 1503 return cls( 1504 action_profile_id=profile.alias, 1505 group_id=entry.group_id, 1506 max_size=entry.max_size, 1507 members=members, 1508 )
Decode protobuf to ActionProfileGroup data.
1510 def action_str(self, _schema: P4Schema) -> str: 1511 "Return string representation of the weighted members." 1512 if not self.members: 1513 return "" 1514 1515 return " ".join( 1516 [f"{member.weight}*{member.member_id:#x}" for member in self.members] 1517 )
Return string representation of the weighted members.
1356@decodable("action_profile_member") 1357@dataclass(slots=True) 1358class P4ActionProfileMember(_P4Writable): 1359 "Represents a P4Runtime ActionProfileMember." 1360 1361 action_profile_id: str = "" 1362 _: KW_ONLY 1363 member_id: int = 0 1364 action: P4TableAction | None = None 1365 1366 def encode(self, schema: P4Schema) -> p4r.Entity: 1367 "Encode P4ActionProfileMember as protobuf." 1368 if not self.action_profile_id: 1369 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1370 1371 profile = schema.action_profiles[self.action_profile_id] 1372 1373 if self.action: 1374 action = self.action.encode_action(schema) 1375 else: 1376 action = None 1377 1378 entry = p4r.ActionProfileMember( 1379 action_profile_id=profile.id, 1380 member_id=self.member_id, 1381 action=action, 1382 ) 1383 return p4r.Entity(action_profile_member=entry) 1384 1385 @classmethod 1386 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1387 "Decode protobuf to ActionProfileMember data." 1388 entry = msg.action_profile_member 1389 if entry.action_profile_id == 0: 1390 return cls() 1391 1392 profile = schema.action_profiles[entry.action_profile_id] 1393 1394 if entry.HasField("action"): 1395 action = P4TableAction.decode_action(entry.action, schema) 1396 else: 1397 action = None 1398 1399 return cls( 1400 action_profile_id=profile.alias, 1401 member_id=entry.member_id, 1402 action=action, 1403 ) 1404 1405 def action_str(self, schema: P4Schema) -> str: 1406 "Format the action as a human-readable, canonical string." 1407 if self.action is None: 1408 return NOACTION_STR 1409 return self.action.format_str(schema)
Represents a P4Runtime ActionProfileMember.
1366 def encode(self, schema: P4Schema) -> p4r.Entity: 1367 "Encode P4ActionProfileMember as protobuf." 1368 if not self.action_profile_id: 1369 return p4r.Entity(action_profile_member=p4r.ActionProfileMember()) 1370 1371 profile = schema.action_profiles[self.action_profile_id] 1372 1373 if self.action: 1374 action = self.action.encode_action(schema) 1375 else: 1376 action = None 1377 1378 entry = p4r.ActionProfileMember( 1379 action_profile_id=profile.id, 1380 member_id=self.member_id, 1381 action=action, 1382 ) 1383 return p4r.Entity(action_profile_member=entry)
Encode P4ActionProfileMember as protobuf.
1385 @classmethod 1386 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1387 "Decode protobuf to ActionProfileMember data." 1388 entry = msg.action_profile_member 1389 if entry.action_profile_id == 0: 1390 return cls() 1391 1392 profile = schema.action_profiles[entry.action_profile_id] 1393 1394 if entry.HasField("action"): 1395 action = P4TableAction.decode_action(entry.action, schema) 1396 else: 1397 action = None 1398 1399 return cls( 1400 action_profile_id=profile.alias, 1401 member_id=entry.member_id, 1402 action=action, 1403 )
Decode protobuf to ActionProfileMember data.
239class P4Client: 240 "Implements a P4Runtime client." 241 242 _address: str 243 _credentials: GRPCCredentialsTLS | None 244 _wait_for_ready: bool 245 _channel: grpc.aio.Channel | None = None 246 _stub: p4r_grpc.P4RuntimeStub | None = None 247 _stream: _P4StreamTypeAlias | None = None 248 _complete_request: Callable[[pbutil.PBMessage], None] | None = None 249 250 _schema: P4Schema | None = None 251 "Annotate log messages using this optional P4Info schema." 252 253 def __init__( 254 self, 255 address: str, 256 credentials: GRPCCredentialsTLS | None = None, 257 *, 258 wait_for_ready: bool = True, 259 ) -> None: 260 self._address = address 261 self._credentials = credentials 262 self._wait_for_ready = wait_for_ready 263 264 @property 265 def channel(self) -> grpc.aio.Channel | None: 266 "Return the GRPC channel object, or None if the channel is not open." 267 return self._channel 268 269 async def __aenter__(self) -> Self: 270 await self.open() 271 return self 272 273 async def __aexit__(self, *args: Any) -> bool | None: 274 await self.close() 275 276 async def open( 277 self, 278 *, 279 schema: P4Schema | None = None, 280 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 281 ) -> None: 282 """Open the client channel. 283 284 Note: This method is `async` for forward-compatible reasons. 285 """ 286 assert self._channel is None 287 assert self._stub is None 288 289 # Increase max_metadata_size from 8 KB to 64 KB. 290 options = GRPCOptions( 291 max_metadata_size=64 * 1024, # 64 kilobytes 292 max_reconnect_backoff_ms=15000, # 15.0 seconds 293 ) 294 295 self._channel = grpc_channel( 296 self._address, 297 credentials=self._credentials, 298 options=options, 299 client_type="P4Client", 300 ) 301 302 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 303 self._schema = schema 304 self._complete_request = complete_request 305 306 async def close(self) -> None: 307 "Close the client channel." 308 if self._channel is not None: 309 LOGGER.debug("P4Client: close channel %r", self._address) 310 311 if self._stream is not None: 312 self._stream.cancel() 313 self._stream = None 314 315 await self._channel.close() 316 self._channel = None 317 self._stub = None 318 self._schema = None 319 self._complete_request = None 320 321 async def send(self, msg: p4r.StreamMessageRequest) -> None: 322 """Send a message to the stream.""" 323 assert self._stub is not None 324 325 if not self._stream or self._stream.done(): 326 self._stream = cast( 327 _P4StreamTypeAlias, 328 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 329 ) 330 331 self._log_msg(msg) 332 333 try: 334 await self._stream.write(msg) 335 except grpc.RpcError as ex: 336 raise P4ClientError(ex, "send") from None 337 338 async def receive(self) -> p4r.StreamMessageResponse: 339 """Read a message from the stream.""" 340 assert self._stream is not None 341 342 try: 343 msg = cast( 344 p4r.StreamMessageResponse, 345 await self._stream.read(), # type: ignore 346 ) 347 if msg is GRPC_EOF: 348 # Treat EOF as a protocol violation. 349 raise RuntimeError("P4Client.receive got EOF!") 350 351 except grpc.RpcError as ex: 352 raise P4ClientError(ex, "receive") from None 353 354 self._log_msg(msg) 355 return msg 356 357 @overload 358 async def request( 359 self, msg: p4r.WriteRequest 360 ) -> p4r.WriteResponse: ... # pragma: no cover 361 362 @overload 363 async def request( 364 self, msg: p4r.GetForwardingPipelineConfigRequest 365 ) -> p4r.GetForwardingPipelineConfigResponse: ... # pragma: no cover 366 367 @overload 368 async def request( 369 self, msg: p4r.SetForwardingPipelineConfigRequest 370 ) -> p4r.SetForwardingPipelineConfigResponse: ... # pragma: no cover 371 372 @overload 373 async def request( 374 self, msg: p4r.CapabilitiesRequest 375 ) -> p4r.CapabilitiesResponse: ... # pragma: no cover 376 377 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 378 "Send a unary-unary P4Runtime request and wait for the response." 379 assert self._stub is not None 380 381 if self._complete_request: 382 self._complete_request(msg) 383 384 msg_type = type(msg).__name__ 385 assert msg_type.endswith("Request") 386 rpc_method = getattr(self._stub, msg_type[:-7]) 387 388 self._log_msg(msg) 389 try: 390 reply = await rpc_method( 391 msg, 392 timeout=_DEFAULT_RPC_TIMEOUT, 393 ) 394 except grpc.RpcError as ex: 395 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 396 397 self._log_msg(reply) 398 return reply 399 400 async def request_iter( 401 self, msg: p4r.ReadRequest 402 ) -> AsyncIterator[p4r.ReadResponse]: 403 "Send a unary-stream P4Runtime read request and wait for the responses." 404 assert self._stub is not None 405 406 if self._complete_request: 407 self._complete_request(msg) 408 409 msg_type = type(msg).__name__ 410 assert msg_type.endswith("Request") 411 rpc_method = getattr(self._stub, msg_type[:-7]) 412 413 self._log_msg(msg) 414 try: 415 async for reply in rpc_method( 416 msg, 417 timeout=_DEFAULT_RPC_TIMEOUT, 418 ): 419 self._log_msg(reply) 420 yield reply 421 except grpc.RpcError as ex: 422 raise P4ClientError(ex, msg_type) from None 423 424 def _log_msg(self, msg: pbutil.PBMessage) -> None: 425 "Log a P4Runtime request or response." 426 pbutil.log_msg(self._channel, msg, self._schema)
Implements a P4Runtime client.
264 @property 265 def channel(self) -> grpc.aio.Channel | None: 266 "Return the GRPC channel object, or None if the channel is not open." 267 return self._channel
Return the GRPC channel object, or None if the channel is not open.
276 async def open( 277 self, 278 *, 279 schema: P4Schema | None = None, 280 complete_request: Callable[[pbutil.PBMessage], None] | None = None, 281 ) -> None: 282 """Open the client channel. 283 284 Note: This method is `async` for forward-compatible reasons. 285 """ 286 assert self._channel is None 287 assert self._stub is None 288 289 # Increase max_metadata_size from 8 KB to 64 KB. 290 options = GRPCOptions( 291 max_metadata_size=64 * 1024, # 64 kilobytes 292 max_reconnect_backoff_ms=15000, # 15.0 seconds 293 ) 294 295 self._channel = grpc_channel( 296 self._address, 297 credentials=self._credentials, 298 options=options, 299 client_type="P4Client", 300 ) 301 302 self._stub = p4r_grpc.P4RuntimeStub(self._channel) 303 self._schema = schema 304 self._complete_request = complete_request
Open the client channel.
Note: This method is async
for forward-compatible reasons.
306 async def close(self) -> None: 307 "Close the client channel." 308 if self._channel is not None: 309 LOGGER.debug("P4Client: close channel %r", self._address) 310 311 if self._stream is not None: 312 self._stream.cancel() 313 self._stream = None 314 315 await self._channel.close() 316 self._channel = None 317 self._stub = None 318 self._schema = None 319 self._complete_request = None
Close the client channel.
321 async def send(self, msg: p4r.StreamMessageRequest) -> None: 322 """Send a message to the stream.""" 323 assert self._stub is not None 324 325 if not self._stream or self._stream.done(): 326 self._stream = cast( 327 _P4StreamTypeAlias, 328 self._stub.StreamChannel(wait_for_ready=self._wait_for_ready), # type: ignore 329 ) 330 331 self._log_msg(msg) 332 333 try: 334 await self._stream.write(msg) 335 except grpc.RpcError as ex: 336 raise P4ClientError(ex, "send") from None
Send a message to the stream.
338 async def receive(self) -> p4r.StreamMessageResponse: 339 """Read a message from the stream.""" 340 assert self._stream is not None 341 342 try: 343 msg = cast( 344 p4r.StreamMessageResponse, 345 await self._stream.read(), # type: ignore 346 ) 347 if msg is GRPC_EOF: 348 # Treat EOF as a protocol violation. 349 raise RuntimeError("P4Client.receive got EOF!") 350 351 except grpc.RpcError as ex: 352 raise P4ClientError(ex, "receive") from None 353 354 self._log_msg(msg) 355 return msg
Read a message from the stream.
377 async def request(self, msg: pbutil.PBMessage) -> pbutil.PBMessage: 378 "Send a unary-unary P4Runtime request and wait for the response." 379 assert self._stub is not None 380 381 if self._complete_request: 382 self._complete_request(msg) 383 384 msg_type = type(msg).__name__ 385 assert msg_type.endswith("Request") 386 rpc_method = getattr(self._stub, msg_type[:-7]) 387 388 self._log_msg(msg) 389 try: 390 reply = await rpc_method( 391 msg, 392 timeout=_DEFAULT_RPC_TIMEOUT, 393 ) 394 except grpc.RpcError as ex: 395 raise P4ClientError(ex, msg_type, msg=msg, schema=self._schema) from None 396 397 self._log_msg(reply) 398 return reply
Send a unary-unary P4Runtime request and wait for the response.
400 async def request_iter( 401 self, msg: p4r.ReadRequest 402 ) -> AsyncIterator[p4r.ReadResponse]: 403 "Send a unary-stream P4Runtime read request and wait for the responses." 404 assert self._stub is not None 405 406 if self._complete_request: 407 self._complete_request(msg) 408 409 msg_type = type(msg).__name__ 410 assert msg_type.endswith("Request") 411 rpc_method = getattr(self._stub, msg_type[:-7]) 412 413 self._log_msg(msg) 414 try: 415 async for reply in rpc_method( 416 msg, 417 timeout=_DEFAULT_RPC_TIMEOUT, 418 ): 419 self._log_msg(reply) 420 yield reply 421 except grpc.RpcError as ex: 422 raise P4ClientError(ex, msg_type) from None
Send a unary-stream P4Runtime read request and wait for the responses.
125class P4ClientError(Exception): 126 "Wrap `grpc.RpcError`." 127 128 _operation: str 129 _status: P4RpcStatus 130 _outer_code: GRPCStatusCode 131 _outer_message: str 132 _schema: P4Schema | None = None # for annotating sub-value details 133 134 def __init__( 135 self, 136 error: grpc.RpcError, 137 operation: str, 138 *, 139 msg: pbutil.PBMessage | None = None, 140 schema: P4Schema | None = None, 141 ): 142 super().__init__() 143 assert isinstance(error, grpc.aio.AioRpcError) 144 145 self._operation = operation 146 self._status = P4RpcStatus.from_rpc_error(error) 147 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 148 self._outer_message = error.details() or "" 149 150 if msg is not None and self.details: 151 self._attach_details(msg) 152 self._schema = schema 153 154 LOGGER.debug("%s failed: %s", operation, self) 155 156 @property 157 def code(self) -> GRPCStatusCode: 158 "GRPC status code." 159 return self._status.code 160 161 @property 162 def message(self) -> str: 163 "GRPC status message." 164 return self._status.message 165 166 @property 167 def details(self) -> dict[int, P4Error]: 168 "Optional details about P4Runtime Write updates that failed." 169 return self._status.details 170 171 @property 172 def is_not_found_only(self) -> bool: 173 """Return True if the only sub-errors are NOT_FOUND.""" 174 if self.code != GRPCStatusCode.UNKNOWN: 175 return False 176 177 for err in self.details.values(): 178 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 179 return False 180 return True 181 182 @property 183 def is_election_id_used(self) -> bool: 184 """Return true if error is that election ID is in use.""" 185 return ( 186 self.code == GRPCStatusCode.INVALID_ARGUMENT 187 and _ELECTION_ID_EXISTS.search(self.message) is not None 188 ) 189 190 @property 191 def is_pipeline_missing(self) -> bool: 192 "Return true if error is that no pipeline config is set." 193 return ( 194 self.code == GRPCStatusCode.FAILED_PRECONDITION 195 and _NO_PIPELINE_CONFIG.search(self.message) is not None 196 ) 197 198 def _attach_details(self, msg: pbutil.PBMessage): 199 "Attach the subvalue(s) from the message that caused the error." 200 if isinstance(msg, p4r.WriteRequest): 201 for key, value in self.details.items(): 202 value.subvalue = msg.updates[key] 203 204 def __str__(self) -> str: 205 "Return string representation of P4ClientError object." 206 if self.details: 207 208 def _show(value: P4Error): 209 s = repr(value) 210 if self._schema: 211 s = pbutil.log_annotate(s, self._schema) 212 s = s.replace("\n}\n)", "\n})") # tidy multiline repr 213 return s.replace("\n", "\n" + " " * 6) # indent 6 spaces 214 215 items = [""] + [ 216 f" [details.{key}] {_show(val)}" for key, val in self.details.items() 217 ] 218 details = "\n".join(items) 219 else: 220 details = "" 221 222 if self.code == self._outer_code and self.message == self._outer_message: 223 return ( 224 f"operation={self._operation} code={self.code!r} " 225 f"message={self.message!r} {details}" 226 ) 227 return ( 228 f"code={self.code!r} message={self.message!r} " 229 f"details={self.details!r} operation={self._operation} " 230 f"_outer_message={self._outer_message!r} _outer_code={self._outer_code!r}" 231 )
Wrap grpc.RpcError
.
134 def __init__( 135 self, 136 error: grpc.RpcError, 137 operation: str, 138 *, 139 msg: pbutil.PBMessage | None = None, 140 schema: P4Schema | None = None, 141 ): 142 super().__init__() 143 assert isinstance(error, grpc.aio.AioRpcError) 144 145 self._operation = operation 146 self._status = P4RpcStatus.from_rpc_error(error) 147 self._outer_code = GRPCStatusCode.from_status_code(error.code()) 148 self._outer_message = error.details() or "" 149 150 if msg is not None and self.details: 151 self._attach_details(msg) 152 self._schema = schema 153 154 LOGGER.debug("%s failed: %s", operation, self)
156 @property 157 def code(self) -> GRPCStatusCode: 158 "GRPC status code." 159 return self._status.code
GRPC status code.
161 @property 162 def message(self) -> str: 163 "GRPC status message." 164 return self._status.message
GRPC status message.
166 @property 167 def details(self) -> dict[int, P4Error]: 168 "Optional details about P4Runtime Write updates that failed." 169 return self._status.details
Optional details about P4Runtime Write updates that failed.
171 @property 172 def is_not_found_only(self) -> bool: 173 """Return True if the only sub-errors are NOT_FOUND.""" 174 if self.code != GRPCStatusCode.UNKNOWN: 175 return False 176 177 for err in self.details.values(): 178 if err.canonical_code != GRPCStatusCode.NOT_FOUND: 179 return False 180 return True
Return True if the only sub-errors are NOT_FOUND.
182 @property 183 def is_election_id_used(self) -> bool: 184 """Return true if error is that election ID is in use.""" 185 return ( 186 self.code == GRPCStatusCode.INVALID_ARGUMENT 187 and _ELECTION_ID_EXISTS.search(self.message) is not None 188 )
Return true if error is that election ID is in use.
190 @property 191 def is_pipeline_missing(self) -> bool: 192 "Return true if error is that no pipeline config is set." 193 return ( 194 self.code == GRPCStatusCode.FAILED_PRECONDITION 195 and _NO_PIPELINE_CONFIG.search(self.message) is not None 196 )
Return true if error is that no pipeline config is set.
1267@decodable("clone_session_entry") 1268@dataclass(slots=True) 1269class P4CloneSessionEntry(_P4Writable): 1270 "Represents a P4Runtime CloneSessionEntry." 1271 1272 session_id: int = 0 1273 _: KW_ONLY 1274 class_of_service: int = 0 1275 packet_length_bytes: int = 0 1276 replicas: Sequence[_ReplicaType] = () 1277 1278 def encode(self, schema: P4Schema) -> p4r.Entity: 1279 "Encode CloneSessionEntry data as protobuf." 1280 entry = p4r.CloneSessionEntry( 1281 session_id=self.session_id, 1282 class_of_service=self.class_of_service, 1283 packet_length_bytes=self.packet_length_bytes, 1284 replicas=[encode_replica(replica) for replica in self.replicas], 1285 ) 1286 return p4r.Entity( 1287 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1288 clone_session_entry=entry 1289 ) 1290 ) 1291 1292 @classmethod 1293 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1294 "Decode protobuf to CloneSessionEntry data." 1295 entry = msg.packet_replication_engine_entry.clone_session_entry 1296 return cls( 1297 session_id=entry.session_id, 1298 class_of_service=entry.class_of_service, 1299 packet_length_bytes=entry.packet_length_bytes, 1300 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1301 ) 1302 1303 def replicas_str(self) -> str: 1304 "Format the replicas as a human-readable, canonical string." 1305 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime CloneSessionEntry.
1278 def encode(self, schema: P4Schema) -> p4r.Entity: 1279 "Encode CloneSessionEntry data as protobuf." 1280 entry = p4r.CloneSessionEntry( 1281 session_id=self.session_id, 1282 class_of_service=self.class_of_service, 1283 packet_length_bytes=self.packet_length_bytes, 1284 replicas=[encode_replica(replica) for replica in self.replicas], 1285 ) 1286 return p4r.Entity( 1287 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1288 clone_session_entry=entry 1289 ) 1290 )
Encode CloneSessionEntry data as protobuf.
1292 @classmethod 1293 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1294 "Decode protobuf to CloneSessionEntry data." 1295 entry = msg.packet_replication_engine_entry.clone_session_entry 1296 return cls( 1297 session_id=entry.session_id, 1298 class_of_service=entry.class_of_service, 1299 packet_length_bytes=entry.packet_length_bytes, 1300 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1301 )
Decode protobuf to CloneSessionEntry data.
826@dataclass(kw_only=True, slots=True) 827class P4CounterData: 828 """Represents a P4Runtime object that keeps statistics of bytes and packets. 829 830 Attributes: 831 byte_count (int): the number of octets 832 packet_count (int): the number of packets 833 834 See Also: 835 - P4TableEntry 836 - P4MeterCounterData 837 - P4CounterEntry 838 - P4DirectCounterEntry 839 """ 840 841 byte_count: int = 0 842 "The number of octets." 843 packet_count: int = 0 844 "The number of packets." 845 846 def encode(self) -> p4r.CounterData: 847 "Encode object as CounterData." 848 return p4r.CounterData( 849 byte_count=self.byte_count, packet_count=self.packet_count 850 ) 851 852 @classmethod 853 def decode(cls, msg: p4r.CounterData) -> Self: 854 "Decode CounterData." 855 return cls(byte_count=msg.byte_count, packet_count=msg.packet_count)
Represents a P4Runtime object that keeps statistics of bytes and packets.
Attributes:
- byte_count (int): the number of octets
- packet_count (int): the number of packets
See Also:
- P4TableEntry
- P4MeterCounterData
- P4CounterEntry
- P4DirectCounterEntry
1653@decodable("counter_entry") 1654@dataclass(slots=True) 1655class P4CounterEntry(_P4ModifyOnly): 1656 "Represents a P4Runtime CounterEntry." 1657 1658 counter_id: str = "" 1659 _: KW_ONLY 1660 index: int | None = None 1661 data: P4CounterData | None = None 1662 1663 @property 1664 def packet_count(self) -> int: 1665 "Packet count from counter data (or 0 if there is no data)." 1666 if self.data is not None: 1667 return self.data.packet_count 1668 return 0 1669 1670 @property 1671 def byte_count(self) -> int: 1672 "Byte count from counter data (or 0 if there is no data)." 1673 if self.data is not None: 1674 return self.data.byte_count 1675 return 0 1676 1677 def encode(self, schema: P4Schema) -> p4r.Entity: 1678 "Encode P4CounterEntry as protobuf." 1679 if not self.counter_id: 1680 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1681 1682 counter = schema.counters[self.counter_id] 1683 1684 if self.index is not None: 1685 index = p4r.Index(index=self.index) 1686 else: 1687 index = None 1688 1689 if self.data is not None: 1690 data = self.data.encode() 1691 else: 1692 data = None 1693 1694 entry = p4r.CounterEntry( 1695 counter_id=counter.id, 1696 index=index, 1697 data=data, 1698 ) 1699 return p4r.Entity(counter_entry=entry) 1700 1701 @classmethod 1702 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1703 "Decode protobuf to P4CounterEntry." 1704 entry = msg.counter_entry 1705 if not entry.counter_id: 1706 return cls() 1707 1708 counter = schema.counters[entry.counter_id] 1709 1710 if entry.HasField("index"): 1711 index = entry.index.index 1712 else: 1713 index = None 1714 1715 if entry.HasField("data"): 1716 data = P4CounterData.decode(entry.data) 1717 else: 1718 data = None 1719 1720 return cls(counter_id=counter.alias, index=index, data=data)
Represents a P4Runtime CounterEntry.
1663 @property 1664 def packet_count(self) -> int: 1665 "Packet count from counter data (or 0 if there is no data)." 1666 if self.data is not None: 1667 return self.data.packet_count 1668 return 0
Packet count from counter data (or 0 if there is no data).
1670 @property 1671 def byte_count(self) -> int: 1672 "Byte count from counter data (or 0 if there is no data)." 1673 if self.data is not None: 1674 return self.data.byte_count 1675 return 0
Byte count from counter data (or 0 if there is no data).
1677 def encode(self, schema: P4Schema) -> p4r.Entity: 1678 "Encode P4CounterEntry as protobuf." 1679 if not self.counter_id: 1680 return p4r.Entity(counter_entry=p4r.CounterEntry()) 1681 1682 counter = schema.counters[self.counter_id] 1683 1684 if self.index is not None: 1685 index = p4r.Index(index=self.index) 1686 else: 1687 index = None 1688 1689 if self.data is not None: 1690 data = self.data.encode() 1691 else: 1692 data = None 1693 1694 entry = p4r.CounterEntry( 1695 counter_id=counter.id, 1696 index=index, 1697 data=data, 1698 ) 1699 return p4r.Entity(counter_entry=entry)
Encode P4CounterEntry as protobuf.
1701 @classmethod 1702 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1703 "Decode protobuf to P4CounterEntry." 1704 entry = msg.counter_entry 1705 if not entry.counter_id: 1706 return cls() 1707 1708 counter = schema.counters[entry.counter_id] 1709 1710 if entry.HasField("index"): 1711 index = entry.index.index 1712 else: 1713 index = None 1714 1715 if entry.HasField("data"): 1716 data = P4CounterData.decode(entry.data) 1717 else: 1718 data = None 1719 1720 return cls(counter_id=counter.alias, index=index, data=data)
Decode protobuf to P4CounterEntry.
79class P4CounterUnit(_EnumBase): 80 "IntEnum equivalent to `p4i.CounterSpec.Unit`." 81 UNSPECIFIED = p4i.CounterSpec.Unit.UNSPECIFIED 82 BYTES = p4i.CounterSpec.Unit.BYTES 83 PACKETS = p4i.CounterSpec.Unit.PACKETS 84 BOTH = p4i.CounterSpec.Unit.BOTH
IntEnum equivalent to p4i.CounterSpec.Unit
.
1308@decodable("digest_entry") 1309@dataclass(slots=True) 1310class P4DigestEntry(_P4Writable): 1311 "Represents a P4Runtime DigestEntry." 1312 1313 digest_id: str = "" 1314 _: KW_ONLY 1315 max_list_size: int = 0 1316 max_timeout_ns: int = 0 1317 ack_timeout_ns: int = 0 1318 1319 def encode(self, schema: P4Schema) -> p4r.Entity: 1320 "Encode DigestEntry data as protobuf." 1321 if not self.digest_id: 1322 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1323 1324 digest = schema.digests[self.digest_id] 1325 1326 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1327 config = None 1328 else: 1329 config = p4r.DigestEntry.Config( 1330 max_timeout_ns=self.max_timeout_ns, 1331 max_list_size=self.max_list_size, 1332 ack_timeout_ns=self.ack_timeout_ns, 1333 ) 1334 1335 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1336 return p4r.Entity(digest_entry=entry) 1337 1338 @classmethod 1339 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1340 "Decode protobuf to DigestEntry data." 1341 entry = msg.digest_entry 1342 if entry.digest_id == 0: 1343 return cls() 1344 1345 digest = schema.digests[entry.digest_id] 1346 1347 config = entry.config 1348 return cls( 1349 digest.alias, 1350 max_list_size=config.max_list_size, 1351 max_timeout_ns=config.max_timeout_ns, 1352 ack_timeout_ns=config.ack_timeout_ns, 1353 )
Represents a P4Runtime DigestEntry.
1319 def encode(self, schema: P4Schema) -> p4r.Entity: 1320 "Encode DigestEntry data as protobuf." 1321 if not self.digest_id: 1322 return p4r.Entity(digest_entry=p4r.DigestEntry()) 1323 1324 digest = schema.digests[self.digest_id] 1325 1326 if self.max_list_size == self.max_timeout_ns == self.ack_timeout_ns == 0: 1327 config = None 1328 else: 1329 config = p4r.DigestEntry.Config( 1330 max_timeout_ns=self.max_timeout_ns, 1331 max_list_size=self.max_list_size, 1332 ack_timeout_ns=self.ack_timeout_ns, 1333 ) 1334 1335 entry = p4r.DigestEntry(digest_id=digest.id, config=config) 1336 return p4r.Entity(digest_entry=entry)
Encode DigestEntry data as protobuf.
1338 @classmethod 1339 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1340 "Decode protobuf to DigestEntry data." 1341 entry = msg.digest_entry 1342 if entry.digest_id == 0: 1343 return cls() 1344 1345 digest = schema.digests[entry.digest_id] 1346 1347 config = entry.config 1348 return cls( 1349 digest.alias, 1350 max_list_size=config.max_list_size, 1351 max_timeout_ns=config.max_timeout_ns, 1352 ack_timeout_ns=config.ack_timeout_ns, 1353 )
Decode protobuf to DigestEntry data.
1967@decodable("digest") 1968@dataclass(slots=True) 1969class P4DigestList: 1970 "Represents a P4Runtime DigestList." 1971 1972 digest_id: str 1973 _: KW_ONLY 1974 list_id: int 1975 timestamp: int 1976 data: list[_DataValueType] 1977 1978 @classmethod 1979 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1980 "Decode protobuf to DigestList data." 1981 digest_list = msg.digest 1982 digest = schema.digests[digest_list.digest_id] 1983 1984 type_spec = digest.type_spec 1985 return cls( 1986 digest_id=digest.alias, 1987 list_id=digest_list.list_id, 1988 timestamp=digest_list.timestamp, 1989 data=[type_spec.decode_data(item) for item in digest_list.data], 1990 ) 1991 1992 def __len__(self) -> int: 1993 "Return number of values in digest list." 1994 return len(self.data) 1995 1996 def __getitem__(self, key: int) -> _DataValueType: 1997 "Retrieve value at given index from digest list." 1998 return self.data[key] 1999 2000 def __iter__(self) -> Iterator[_DataValueType]: 2001 "Iterate over values in digest list." 2002 return iter(self.data) 2003 2004 def ack(self) -> "P4DigestListAck": 2005 "Return the corresponding DigestListAck message." 2006 return P4DigestListAck(self.digest_id, self.list_id)
Represents a P4Runtime DigestList.
1978 @classmethod 1979 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1980 "Decode protobuf to DigestList data." 1981 digest_list = msg.digest 1982 digest = schema.digests[digest_list.digest_id] 1983 1984 type_spec = digest.type_spec 1985 return cls( 1986 digest_id=digest.alias, 1987 list_id=digest_list.list_id, 1988 timestamp=digest_list.timestamp, 1989 data=[type_spec.decode_data(item) for item in digest_list.data], 1990 )
Decode protobuf to DigestList data.
1992 def __len__(self) -> int: 1993 "Return number of values in digest list." 1994 return len(self.data)
Return number of values in digest list.
1996 def __getitem__(self, key: int) -> _DataValueType: 1997 "Retrieve value at given index from digest list." 1998 return self.data[key]
Retrieve value at given index from digest list.
2000 def __iter__(self) -> Iterator[_DataValueType]: 2001 "Iterate over values in digest list." 2002 return iter(self.data)
Iterate over values in digest list.
2009@dataclass(slots=True) 2010class P4DigestListAck: 2011 "Represents a P4Runtime DigestListAck." 2012 2013 digest_id: str 2014 list_id: int 2015 2016 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2017 "Encode DigestListAck data as protobuf." 2018 digest = schema.digests[self.digest_id] 2019 2020 return p4r.StreamMessageRequest( 2021 digest_ack=p4r.DigestListAck( 2022 digest_id=digest.id, 2023 list_id=self.list_id, 2024 ) 2025 )
Represents a P4Runtime DigestListAck.
2016 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 2017 "Encode DigestListAck data as protobuf." 2018 digest = schema.digests[self.digest_id] 2019 2020 return p4r.StreamMessageRequest( 2021 digest_ack=p4r.DigestListAck( 2022 digest_id=digest.id, 2023 list_id=self.list_id, 2024 ) 2025 )
Encode DigestListAck data as protobuf.
1723@decodable("direct_counter_entry") 1724@dataclass(slots=True) 1725class P4DirectCounterEntry(_P4ModifyOnly): 1726 "Represents a P4Runtime DirectCounterEntry." 1727 1728 counter_id: str = "" 1729 _: KW_ONLY 1730 table_entry: P4TableEntry | None = None 1731 data: P4CounterData | None = None 1732 1733 @property 1734 def table_id(self) -> str: 1735 "Return table_id of related table." 1736 if self.table_entry is None: 1737 return "" 1738 return self.table_entry.table_id 1739 1740 @property 1741 def packet_count(self) -> int: 1742 "Packet count from counter data (or 0 if there is no data)." 1743 if self.data is not None: 1744 return self.data.packet_count 1745 return 0 1746 1747 @property 1748 def byte_count(self) -> int: 1749 "Byte count from counter data (or 0 if there is no data)." 1750 if self.data is not None: 1751 return self.data.byte_count 1752 return 0 1753 1754 def encode(self, schema: P4Schema) -> p4r.Entity: 1755 "Encode P4DirectCounterEntry as protobuf." 1756 if self.table_entry is None: 1757 # Use `counter_id` to construct a `P4TableEntry` with the proper 1758 # table name. 1759 if self.counter_id: 1760 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1761 table_entry = P4TableEntry(tb_name) 1762 else: 1763 table_entry = P4TableEntry() 1764 else: 1765 table_entry = self.table_entry 1766 1767 if self.data is not None: 1768 data = self.data.encode() 1769 else: 1770 data = None 1771 1772 entry = p4r.DirectCounterEntry( 1773 table_entry=table_entry.encode_entry(schema), 1774 data=data, 1775 ) 1776 return p4r.Entity(direct_counter_entry=entry) 1777 1778 @classmethod 1779 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1780 "Decode protobuf to P4DirectCounterEntry." 1781 entry = msg.direct_counter_entry 1782 1783 if entry.HasField("table_entry"): 1784 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1785 else: 1786 table_entry = None 1787 1788 if entry.HasField("data"): 1789 data = P4CounterData.decode(entry.data) 1790 else: 1791 data = None 1792 1793 # Determine `counter_id` from table_entry. 1794 counter_id = "" 1795 if table_entry is not None and table_entry.table_id: 1796 direct_counter = schema.tables[table_entry.table_id].direct_counter 1797 assert direct_counter is not None 1798 counter_id = direct_counter.alias 1799 1800 return cls(counter_id, table_entry=table_entry, data=data)
Represents a P4Runtime DirectCounterEntry.
1733 @property 1734 def table_id(self) -> str: 1735 "Return table_id of related table." 1736 if self.table_entry is None: 1737 return "" 1738 return self.table_entry.table_id
Return table_id of related table.
1740 @property 1741 def packet_count(self) -> int: 1742 "Packet count from counter data (or 0 if there is no data)." 1743 if self.data is not None: 1744 return self.data.packet_count 1745 return 0
Packet count from counter data (or 0 if there is no data).
1747 @property 1748 def byte_count(self) -> int: 1749 "Byte count from counter data (or 0 if there is no data)." 1750 if self.data is not None: 1751 return self.data.byte_count 1752 return 0
Byte count from counter data (or 0 if there is no data).
1754 def encode(self, schema: P4Schema) -> p4r.Entity: 1755 "Encode P4DirectCounterEntry as protobuf." 1756 if self.table_entry is None: 1757 # Use `counter_id` to construct a `P4TableEntry` with the proper 1758 # table name. 1759 if self.counter_id: 1760 tb_name = schema.direct_counters[self.counter_id].direct_table_name 1761 table_entry = P4TableEntry(tb_name) 1762 else: 1763 table_entry = P4TableEntry() 1764 else: 1765 table_entry = self.table_entry 1766 1767 if self.data is not None: 1768 data = self.data.encode() 1769 else: 1770 data = None 1771 1772 entry = p4r.DirectCounterEntry( 1773 table_entry=table_entry.encode_entry(schema), 1774 data=data, 1775 ) 1776 return p4r.Entity(direct_counter_entry=entry)
Encode P4DirectCounterEntry as protobuf.
1778 @classmethod 1779 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1780 "Decode protobuf to P4DirectCounterEntry." 1781 entry = msg.direct_counter_entry 1782 1783 if entry.HasField("table_entry"): 1784 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1785 else: 1786 table_entry = None 1787 1788 if entry.HasField("data"): 1789 data = P4CounterData.decode(entry.data) 1790 else: 1791 data = None 1792 1793 # Determine `counter_id` from table_entry. 1794 counter_id = "" 1795 if table_entry is not None and table_entry.table_id: 1796 direct_counter = schema.tables[table_entry.table_id].direct_counter 1797 assert direct_counter is not None 1798 counter_id = direct_counter.alias 1799 1800 return cls(counter_id, table_entry=table_entry, data=data)
Decode protobuf to P4DirectCounterEntry.
1593@decodable("direct_meter_entry") 1594@dataclass(kw_only=True, slots=True) 1595class P4DirectMeterEntry(_P4ModifyOnly): 1596 "Represents a P4Runtime DirectMeterEntry." 1597 1598 table_entry: P4TableEntry | None = None 1599 config: P4MeterConfig | None = None 1600 counter_data: P4MeterCounterData | None = None 1601 1602 def encode(self, schema: P4Schema) -> p4r.Entity: 1603 "Encode P4DirectMeterEntry as protobuf." 1604 if self.table_entry is not None: 1605 table_entry = self.table_entry.encode_entry(schema) 1606 else: 1607 table_entry = None 1608 1609 if self.config is not None: 1610 config = self.config.encode() 1611 else: 1612 config = None 1613 1614 if self.counter_data is not None: 1615 counter_data = self.counter_data.encode() 1616 else: 1617 counter_data = None 1618 1619 entry = p4r.DirectMeterEntry( 1620 table_entry=table_entry, 1621 config=config, 1622 counter_data=counter_data, 1623 ) 1624 return p4r.Entity(direct_meter_entry=entry) 1625 1626 @classmethod 1627 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1628 "Decode protobuf to P4DirectMeterEntry." 1629 entry = msg.direct_meter_entry 1630 1631 if entry.HasField("table_entry"): 1632 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1633 else: 1634 table_entry = None 1635 1636 if entry.HasField("config"): 1637 config = P4MeterConfig.decode(entry.config) 1638 else: 1639 config = None 1640 1641 if entry.HasField("counter_data"): 1642 counter_data = P4MeterCounterData.decode(entry.counter_data) 1643 else: 1644 counter_data = None 1645 1646 return cls( 1647 table_entry=table_entry, 1648 config=config, 1649 counter_data=counter_data, 1650 )
Represents a P4Runtime DirectMeterEntry.
1602 def encode(self, schema: P4Schema) -> p4r.Entity: 1603 "Encode P4DirectMeterEntry as protobuf." 1604 if self.table_entry is not None: 1605 table_entry = self.table_entry.encode_entry(schema) 1606 else: 1607 table_entry = None 1608 1609 if self.config is not None: 1610 config = self.config.encode() 1611 else: 1612 config = None 1613 1614 if self.counter_data is not None: 1615 counter_data = self.counter_data.encode() 1616 else: 1617 counter_data = None 1618 1619 entry = p4r.DirectMeterEntry( 1620 table_entry=table_entry, 1621 config=config, 1622 counter_data=counter_data, 1623 ) 1624 return p4r.Entity(direct_meter_entry=entry)
Encode P4DirectMeterEntry as protobuf.
1626 @classmethod 1627 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1628 "Decode protobuf to P4DirectMeterEntry." 1629 entry = msg.direct_meter_entry 1630 1631 if entry.HasField("table_entry"): 1632 table_entry = P4TableEntry.decode_entry(entry.table_entry, schema) 1633 else: 1634 table_entry = None 1635 1636 if entry.HasField("config"): 1637 config = P4MeterConfig.decode(entry.config) 1638 else: 1639 config = None 1640 1641 if entry.HasField("counter_data"): 1642 counter_data = P4MeterCounterData.decode(entry.counter_data) 1643 else: 1644 counter_data = None 1645 1646 return cls( 1647 table_entry=table_entry, 1648 config=config, 1649 counter_data=counter_data, 1650 )
Decode protobuf to P4DirectMeterEntry.
58@dataclass 59class P4Error: 60 "P4Runtime Error message used to report a single P4-entity error." 61 62 canonical_code: GRPCStatusCode 63 message: str 64 space: str 65 code: int 66 subvalue: pbutil.PBMessage | None = None
P4Runtime Error message used to report a single P4-entity error.
2060@decodable("extern_entry") 2061@dataclass(kw_only=True, slots=True) 2062class P4ExternEntry(_P4Writable): 2063 "Represents a P4Runtime ExternEntry." 2064 2065 extern_type_id: str 2066 extern_id: str 2067 entry: pbutil.PBAny 2068 2069 def encode(self, schema: P4Schema) -> p4r.Entity: 2070 "Encode ExternEntry data as protobuf." 2071 extern = schema.externs[self.extern_type_id, self.extern_id] 2072 entry = p4r.ExternEntry( 2073 extern_type_id=extern.extern_type_id, 2074 extern_id=extern.id, 2075 entry=self.entry, 2076 ) 2077 return p4r.Entity(extern_entry=entry) 2078 2079 @classmethod 2080 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2081 "Decode protobuf to ExternEntry data." 2082 entry = msg.extern_entry 2083 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2084 return cls( 2085 extern_type_id=extern.extern_type_name, 2086 extern_id=extern.name, 2087 entry=entry.entry, 2088 )
Represents a P4Runtime ExternEntry.
2069 def encode(self, schema: P4Schema) -> p4r.Entity: 2070 "Encode ExternEntry data as protobuf." 2071 extern = schema.externs[self.extern_type_id, self.extern_id] 2072 entry = p4r.ExternEntry( 2073 extern_type_id=extern.extern_type_id, 2074 extern_id=extern.id, 2075 entry=self.entry, 2076 ) 2077 return p4r.Entity(extern_entry=entry)
Encode ExternEntry data as protobuf.
2079 @classmethod 2080 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 2081 "Decode protobuf to ExternEntry data." 2082 entry = msg.extern_entry 2083 extern = schema.externs[entry.extern_type_id, entry.extern_id] 2084 return cls( 2085 extern_type_id=extern.extern_type_name, 2086 extern_id=extern.name, 2087 entry=entry.entry, 2088 )
Decode protobuf to ExternEntry data.
IndirectAction
is an alias for P4IndirectAction.
618@dataclass(slots=True) 619class P4IndirectAction: 620 """Represents a P4Runtime Action reference for an indirect table. 621 622 An indirect action can be either: 623 624 1. a "one-shot" action (action_set) 625 2. a reference to an action profile member (member_id) 626 3. a reference to an action profile group (group_id) 627 628 Only one of action_set, member_id or group_id may be configured. The other 629 values must be None. 630 631 Attributes: 632 action_set: sequence of weighted actions for one-shot 633 member_id: ID of action profile member 634 group_id: ID of action profile group 635 636 Examples: 637 638 ```python 639 # Construct a one-shot action profile. 640 one_shot = P4IndirectAction( 641 2 * P4TableAction("forward", port=1), 642 1 * P4TableAction("forward", port=2), 643 ) 644 645 # Refer to an action profile member by ID. 646 member_action = P4IndirectAction(member_id=1) 647 648 # Refer to an action profile group by ID. 649 group_action = P4IndirectAction(group_id=2) 650 ``` 651 652 References: 653 - "9.1.2. Action Specification", 654 - "9.2.3. One Shot Action Selector Programming" 655 656 TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and 657 P4GroupAction. 658 659 See Also: 660 - P4TableEntry 661 """ 662 663 action_set: Sequence[P4WeightedAction] | None = None 664 "Sequence of weighted actions defining one-shot action profile." 665 _: KW_ONLY 666 member_id: int | None = None 667 "ID of action profile member." 668 group_id: int | None = None 669 "ID of action profile group." 670 671 def __post_init__(self) -> None: 672 if not self._check_invariant(): 673 raise ValueError( 674 "exactly one of action_set, member_id, or group_id must be set" 675 ) 676 677 def _check_invariant(self) -> bool: 678 "Return true if instance satisfies class invariant." 679 if self.action_set is not None: 680 return self.member_id is None and self.group_id is None 681 if self.member_id is not None: 682 return self.group_id is None 683 return self.group_id is not None 684 685 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 686 "Encode object as a TableAction." 687 if self.action_set is not None: 688 return p4r.TableAction( 689 action_profile_action_set=self.encode_action_set(table) 690 ) 691 692 if self.member_id is not None: 693 return p4r.TableAction(action_profile_member_id=self.member_id) 694 695 assert self.group_id is not None 696 return p4r.TableAction(action_profile_group_id=self.group_id) 697 698 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 699 "Encode object as an ActionProfileActionSet." 700 assert self.action_set is not None 701 702 profile_actions: list[p4r.ActionProfileAction] = [] 703 for weight, table_action in self.action_set: 704 action = table_action.encode_action(table) 705 706 match weight: 707 case int(weight_value): 708 watch_port = None 709 case (weight_value, int(watch)): 710 watch_port = encode_watch_port(watch) 711 case _: # pyright: ignore[reportUnnecessaryComparison] 712 raise ValueError(f"unexpected action weight: {weight!r}") 713 714 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 715 if watch_port is not None: 716 profile.watch_port = watch_port 717 profile_actions.append(profile) 718 719 return p4r.ActionProfileActionSet(action_profile_actions=profile_actions) 720 721 @classmethod 722 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 723 "Decode ActionProfileActionSet." 724 action_set = list[P4WeightedAction]() 725 726 for action in msg.action_profile_actions: 727 match action.WhichOneof("watch_kind"): 728 case "watch_port": 729 weight = (action.weight, decode_watch_port(action.watch_port)) 730 case None: 731 weight = action.weight 732 case other: 733 # "watch" (deprecated) is not supported 734 raise ValueError(f"unexpected oneof: {other!r}") 735 736 table_action = P4TableAction.decode_action(action.action, table) 737 action_set.append((weight, table_action)) 738 739 return cls(action_set) 740 741 def format_str(self, table: P4Table) -> str: 742 """Format the indirect table action as a human-readable string.""" 743 if self.action_set is not None: 744 weighted_actions = [ 745 f"{weight}*{action.format_str(table)}" 746 for weight, action in self.action_set 747 ] 748 return " ".join(weighted_actions) 749 750 # Use the name of the action_profile, if we can get it. If not, just 751 # use the placeholder "__indirect". 752 if table.action_profile is not None: 753 profile_name = f"@{table.action_profile.alias}" 754 else: 755 profile_name = "__indirect" 756 757 if self.member_id is not None: 758 return f"{profile_name}[[{self.member_id:#x}]]" 759 760 return f"{profile_name}[{self.group_id:#x}]" 761 762 def __repr__(self) -> str: 763 "Customize representation to make it more concise." 764 if self.action_set is not None: 765 return f"P4IndirectAction(action_set={self.action_set!r})" 766 if self.member_id is not None: 767 return f"P4IndirectAction(member_id={self.member_id!r})" 768 return f"P4IndirectAction(group_id={self.group_id!r})"
Represents a P4Runtime Action reference for an indirect table.
An indirect action can be either:
- a "one-shot" action (action_set)
- a reference to an action profile member (member_id)
- a reference to an action profile group (group_id)
Only one of action_set, member_id or group_id may be configured. The other values must be None.
Attributes:
- action_set: sequence of weighted actions for one-shot
- member_id: ID of action profile member
- group_id: ID of action profile group
Examples:
# Construct a one-shot action profile.
one_shot = P4IndirectAction(
2 * P4TableAction("forward", port=1),
1 * P4TableAction("forward", port=2),
)
# Refer to an action profile member by ID.
member_action = P4IndirectAction(member_id=1)
# Refer to an action profile group by ID.
group_action = P4IndirectAction(group_id=2)
References:
- "9.1.2. Action Specification",
- "9.2.3. One Shot Action Selector Programming"
TODO: Refactor into three classes? P4OneShotAction, P4MemberAction, and P4GroupAction.
See Also:
- P4TableEntry
Sequence of weighted actions defining one-shot action profile.
685 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 686 "Encode object as a TableAction." 687 if self.action_set is not None: 688 return p4r.TableAction( 689 action_profile_action_set=self.encode_action_set(table) 690 ) 691 692 if self.member_id is not None: 693 return p4r.TableAction(action_profile_member_id=self.member_id) 694 695 assert self.group_id is not None 696 return p4r.TableAction(action_profile_group_id=self.group_id)
Encode object as a TableAction.
698 def encode_action_set(self, table: P4Table) -> p4r.ActionProfileActionSet: 699 "Encode object as an ActionProfileActionSet." 700 assert self.action_set is not None 701 702 profile_actions: list[p4r.ActionProfileAction] = [] 703 for weight, table_action in self.action_set: 704 action = table_action.encode_action(table) 705 706 match weight: 707 case int(weight_value): 708 watch_port = None 709 case (weight_value, int(watch)): 710 watch_port = encode_watch_port(watch) 711 case _: # pyright: ignore[reportUnnecessaryComparison] 712 raise ValueError(f"unexpected action weight: {weight!r}") 713 714 profile = p4r.ActionProfileAction(action=action, weight=weight_value) 715 if watch_port is not None: 716 profile.watch_port = watch_port 717 profile_actions.append(profile) 718 719 return p4r.ActionProfileActionSet(action_profile_actions=profile_actions)
Encode object as an ActionProfileActionSet.
721 @classmethod 722 def decode_action_set(cls, msg: p4r.ActionProfileActionSet, table: P4Table) -> Self: 723 "Decode ActionProfileActionSet." 724 action_set = list[P4WeightedAction]() 725 726 for action in msg.action_profile_actions: 727 match action.WhichOneof("watch_kind"): 728 case "watch_port": 729 weight = (action.weight, decode_watch_port(action.watch_port)) 730 case None: 731 weight = action.weight 732 case other: 733 # "watch" (deprecated) is not supported 734 raise ValueError(f"unexpected oneof: {other!r}") 735 736 table_action = P4TableAction.decode_action(action.action, table) 737 action_set.append((weight, table_action)) 738 739 return cls(action_set)
Decode ActionProfileActionSet.
741 def format_str(self, table: P4Table) -> str: 742 """Format the indirect table action as a human-readable string.""" 743 if self.action_set is not None: 744 weighted_actions = [ 745 f"{weight}*{action.format_str(table)}" 746 for weight, action in self.action_set 747 ] 748 return " ".join(weighted_actions) 749 750 # Use the name of the action_profile, if we can get it. If not, just 751 # use the placeholder "__indirect". 752 if table.action_profile is not None: 753 profile_name = f"@{table.action_profile.alias}" 754 else: 755 profile_name = "__indirect" 756 757 if self.member_id is not None: 758 return f"{profile_name}[[{self.member_id:#x}]]" 759 760 return f"{profile_name}[{self.group_id:#x}]"
Format the indirect table action as a human-readable string.
1412@dataclass(slots=True) 1413class P4Member: 1414 """Represents an ActionProfileGroup Member. 1415 1416 See Also: 1417 - P4ActionProfileGroup 1418 """ 1419 1420 member_id: int 1421 _: KW_ONLY 1422 weight: P4Weight 1423 1424 def encode(self) -> p4r.ActionProfileGroup.Member: 1425 "Encode P4Member as protobuf." 1426 match self.weight: 1427 case int(weight): 1428 watch_port = None 1429 case (int(weight), int(watch)): 1430 watch_port = encode_watch_port(watch) 1431 case other: # pyright: ignore[reportUnnecessaryComparison] 1432 raise ValueError(f"unexpected weight: {other!r}") 1433 1434 member = p4r.ActionProfileGroup.Member( 1435 member_id=self.member_id, 1436 weight=weight, 1437 ) 1438 1439 if watch_port is not None: 1440 member.watch_port = watch_port 1441 return member 1442 1443 @classmethod 1444 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1445 "Decode protobuf to P4Member." 1446 match msg.WhichOneof("watch_kind"): 1447 case "watch_port": 1448 weight = (msg.weight, decode_watch_port(msg.watch_port)) 1449 case None: 1450 weight = msg.weight 1451 case other: 1452 # "watch" (deprecated) is not supported 1453 raise ValueError(f"unknown oneof: {other!r}") 1454 1455 return cls(member_id=msg.member_id, weight=weight)
Represents an ActionProfileGroup Member.
See Also:
- P4ActionProfileGroup
1424 def encode(self) -> p4r.ActionProfileGroup.Member: 1425 "Encode P4Member as protobuf." 1426 match self.weight: 1427 case int(weight): 1428 watch_port = None 1429 case (int(weight), int(watch)): 1430 watch_port = encode_watch_port(watch) 1431 case other: # pyright: ignore[reportUnnecessaryComparison] 1432 raise ValueError(f"unexpected weight: {other!r}") 1433 1434 member = p4r.ActionProfileGroup.Member( 1435 member_id=self.member_id, 1436 weight=weight, 1437 ) 1438 1439 if watch_port is not None: 1440 member.watch_port = watch_port 1441 return member
Encode P4Member as protobuf.
1443 @classmethod 1444 def decode(cls, msg: p4r.ActionProfileGroup.Member) -> Self: 1445 "Decode protobuf to P4Member." 1446 match msg.WhichOneof("watch_kind"): 1447 case "watch_port": 1448 weight = (msg.weight, decode_watch_port(msg.watch_port)) 1449 case None: 1450 weight = msg.weight 1451 case other: 1452 # "watch" (deprecated) is not supported 1453 raise ValueError(f"unknown oneof: {other!r}") 1454 1455 return cls(member_id=msg.member_id, weight=weight)
Decode protobuf to P4Member.
771@dataclass(kw_only=True, slots=True) 772class P4MeterConfig: 773 """Represents a P4Runtime MeterConfig. 774 775 Attributes: 776 cir (int): Committed information rate (units/sec). 777 cburst (int): Committed burst size. 778 pir (int): Peak information rate (units/sec). 779 pburst (int): Peak burst size. 780 eburst (int): Excess burst size (only used by srTCM). [default=0] 781 782 Example: 783 ``` 784 config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20) 785 ``` 786 787 See Also: 788 - P4TableEntry 789 - P4MeterEntry 790 - P4DirectMeterEntry 791 """ 792 793 cir: int 794 "Committed information rate (units/sec)." 795 cburst: int 796 "Committed burst size." 797 pir: int 798 "Peak information rate (units/sec)." 799 pburst: int 800 "Peak burst size." 801 eburst: int = 0 802 "Excess burst size (only used by srTCM)." 803 804 def encode(self) -> p4r.MeterConfig: 805 "Encode object as MeterConfig." 806 return p4r.MeterConfig( 807 cir=self.cir, 808 cburst=self.cburst, 809 pir=self.pir, 810 pburst=self.pburst, 811 eburst=self.eburst, 812 ) 813 814 @classmethod 815 def decode(cls, msg: p4r.MeterConfig) -> Self: 816 "Decode MeterConfig." 817 return cls( 818 cir=msg.cir, 819 cburst=msg.cburst, 820 pir=msg.pir, 821 pburst=msg.pburst, 822 eburst=msg.eburst, 823 )
Represents a P4Runtime MeterConfig.
Attributes:
- cir (int): Committed information rate (units/sec).
- cburst (int): Committed burst size.
- pir (int): Peak information rate (units/sec).
- pburst (int): Peak burst size.
- eburst (int): Excess burst size (only used by srTCM). [default=0]
Example:
config = P4MeterConfig(cir=10, cburst=20, pir=10, pburst=20)
See Also:
- P4TableEntry
- P4MeterEntry
- P4DirectMeterEntry
804 def encode(self) -> p4r.MeterConfig: 805 "Encode object as MeterConfig." 806 return p4r.MeterConfig( 807 cir=self.cir, 808 cburst=self.cburst, 809 pir=self.pir, 810 pburst=self.pburst, 811 eburst=self.eburst, 812 )
Encode object as MeterConfig.
814 @classmethod 815 def decode(cls, msg: p4r.MeterConfig) -> Self: 816 "Decode MeterConfig." 817 return cls( 818 cir=msg.cir, 819 cburst=msg.cburst, 820 pir=msg.pir, 821 pburst=msg.pburst, 822 eburst=msg.eburst, 823 )
Decode MeterConfig.
858@dataclass(kw_only=True, slots=True) 859class P4MeterCounterData: 860 """Represents a P4Runtime MeterCounterData that stores per-color counters. 861 862 Attributes: 863 green (CounterData): counter data for packets marked GREEN. 864 yellow (CounterData): counter data for packets marked YELLOW. 865 red (CounterData): counter data for packets marked RED. 866 867 See Also: 868 - P4TableEntry 869 - P4MeterEntry 870 - P4DirectMeterEntry 871 """ 872 873 green: P4CounterData 874 "Counter of packets marked GREEN." 875 yellow: P4CounterData 876 "Counter of packets marked YELLOW." 877 red: P4CounterData 878 "Counter of packets marked RED." 879 880 def encode(self) -> p4r.MeterCounterData: 881 "Encode object as MeterCounterData." 882 return p4r.MeterCounterData( 883 green=self.green.encode(), 884 yellow=self.yellow.encode(), 885 red=self.red.encode(), 886 ) 887 888 @classmethod 889 def decode(cls, msg: p4r.MeterCounterData) -> Self: 890 "Decode MeterCounterData." 891 return cls( 892 green=P4CounterData.decode(msg.green), 893 yellow=P4CounterData.decode(msg.yellow), 894 red=P4CounterData.decode(msg.red), 895 )
Represents a P4Runtime MeterCounterData that stores per-color counters.
Attributes:
- green (CounterData): counter data for packets marked GREEN.
- yellow (CounterData): counter data for packets marked YELLOW.
- red (CounterData): counter data for packets marked RED.
See Also:
- P4TableEntry
- P4MeterEntry
- P4DirectMeterEntry
880 def encode(self) -> p4r.MeterCounterData: 881 "Encode object as MeterCounterData." 882 return p4r.MeterCounterData( 883 green=self.green.encode(), 884 yellow=self.yellow.encode(), 885 red=self.red.encode(), 886 )
Encode object as MeterCounterData.
888 @classmethod 889 def decode(cls, msg: p4r.MeterCounterData) -> Self: 890 "Decode MeterCounterData." 891 return cls( 892 green=P4CounterData.decode(msg.green), 893 yellow=P4CounterData.decode(msg.yellow), 894 red=P4CounterData.decode(msg.red), 895 )
Decode MeterCounterData.
1520@decodable("meter_entry") 1521@dataclass(slots=True) 1522class P4MeterEntry(_P4ModifyOnly): 1523 "Represents a P4Runtime MeterEntry." 1524 1525 meter_id: str = "" 1526 _: KW_ONLY 1527 index: int | None = None 1528 config: P4MeterConfig | None = None 1529 counter_data: P4MeterCounterData | None = None 1530 1531 def encode(self, schema: P4Schema) -> p4r.Entity: 1532 "Encode P4MeterEntry to protobuf." 1533 if not self.meter_id: 1534 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1535 1536 meter = schema.meters[self.meter_id] 1537 1538 if self.index is not None: 1539 index = p4r.Index(index=self.index) 1540 else: 1541 index = None 1542 1543 if self.config is not None: 1544 config = self.config.encode() 1545 else: 1546 config = None 1547 1548 if self.counter_data is not None: 1549 counter_data = self.counter_data.encode() 1550 else: 1551 counter_data = None 1552 1553 entry = p4r.MeterEntry( 1554 meter_id=meter.id, 1555 index=index, 1556 config=config, 1557 counter_data=counter_data, 1558 ) 1559 return p4r.Entity(meter_entry=entry) 1560 1561 @classmethod 1562 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1563 "Decode protobuf to P4MeterEntry." 1564 entry = msg.meter_entry 1565 if not entry.meter_id: 1566 return cls() 1567 1568 meter = schema.meters[entry.meter_id] 1569 1570 if entry.HasField("index"): 1571 index = entry.index.index 1572 else: 1573 index = None 1574 1575 if entry.HasField("config"): 1576 config = P4MeterConfig.decode(entry.config) 1577 else: 1578 config = None 1579 1580 if entry.HasField("counter_data"): 1581 counter_data = P4MeterCounterData.decode(entry.counter_data) 1582 else: 1583 counter_data = None 1584 1585 return cls( 1586 meter_id=meter.alias, 1587 index=index, 1588 config=config, 1589 counter_data=counter_data, 1590 )
Represents a P4Runtime MeterEntry.
1531 def encode(self, schema: P4Schema) -> p4r.Entity: 1532 "Encode P4MeterEntry to protobuf." 1533 if not self.meter_id: 1534 return p4r.Entity(meter_entry=p4r.MeterEntry()) 1535 1536 meter = schema.meters[self.meter_id] 1537 1538 if self.index is not None: 1539 index = p4r.Index(index=self.index) 1540 else: 1541 index = None 1542 1543 if self.config is not None: 1544 config = self.config.encode() 1545 else: 1546 config = None 1547 1548 if self.counter_data is not None: 1549 counter_data = self.counter_data.encode() 1550 else: 1551 counter_data = None 1552 1553 entry = p4r.MeterEntry( 1554 meter_id=meter.id, 1555 index=index, 1556 config=config, 1557 counter_data=counter_data, 1558 ) 1559 return p4r.Entity(meter_entry=entry)
Encode P4MeterEntry to protobuf.
1561 @classmethod 1562 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1563 "Decode protobuf to P4MeterEntry." 1564 entry = msg.meter_entry 1565 if not entry.meter_id: 1566 return cls() 1567 1568 meter = schema.meters[entry.meter_id] 1569 1570 if entry.HasField("index"): 1571 index = entry.index.index 1572 else: 1573 index = None 1574 1575 if entry.HasField("config"): 1576 config = P4MeterConfig.decode(entry.config) 1577 else: 1578 config = None 1579 1580 if entry.HasField("counter_data"): 1581 counter_data = P4MeterCounterData.decode(entry.counter_data) 1582 else: 1583 counter_data = None 1584 1585 return cls( 1586 meter_id=meter.alias, 1587 index=index, 1588 config=config, 1589 counter_data=counter_data, 1590 )
Decode protobuf to P4MeterEntry.
1229@decodable("multicast_group_entry") 1230@dataclass(slots=True) 1231class P4MulticastGroupEntry(_P4Writable): 1232 "Represents a P4Runtime MulticastGroupEntry." 1233 1234 multicast_group_id: int = 0 1235 _: KW_ONLY 1236 replicas: Sequence[_ReplicaType] = () 1237 metadata: bytes = b"" 1238 1239 def encode(self, schema: P4Schema) -> p4r.Entity: 1240 "Encode MulticastGroupEntry data as protobuf." 1241 entry = p4r.MulticastGroupEntry( 1242 multicast_group_id=self.multicast_group_id, 1243 replicas=[encode_replica(replica) for replica in self.replicas], 1244 metadata=self.metadata, 1245 ) 1246 return p4r.Entity( 1247 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1248 multicast_group_entry=entry 1249 ) 1250 ) 1251 1252 @classmethod 1253 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1254 "Decode protobuf to MulticastGroupEntry data." 1255 entry = msg.packet_replication_engine_entry.multicast_group_entry 1256 return cls( 1257 multicast_group_id=entry.multicast_group_id, 1258 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1259 metadata=entry.metadata, 1260 ) 1261 1262 def replicas_str(self) -> str: 1263 "Format the replicas as a human-readable, canonical string." 1264 return " ".join(format_replica(rep) for rep in self.replicas)
Represents a P4Runtime MulticastGroupEntry.
1239 def encode(self, schema: P4Schema) -> p4r.Entity: 1240 "Encode MulticastGroupEntry data as protobuf." 1241 entry = p4r.MulticastGroupEntry( 1242 multicast_group_id=self.multicast_group_id, 1243 replicas=[encode_replica(replica) for replica in self.replicas], 1244 metadata=self.metadata, 1245 ) 1246 return p4r.Entity( 1247 packet_replication_engine_entry=p4r.PacketReplicationEngineEntry( 1248 multicast_group_entry=entry 1249 ) 1250 )
Encode MulticastGroupEntry data as protobuf.
1252 @classmethod 1253 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1254 "Decode protobuf to MulticastGroupEntry data." 1255 entry = msg.packet_replication_engine_entry.multicast_group_entry 1256 return cls( 1257 multicast_group_id=entry.multicast_group_id, 1258 replicas=tuple(decode_replica(replica) for replica in entry.replicas), 1259 metadata=entry.metadata, 1260 )
Decode protobuf to MulticastGroupEntry data.
1897@decodable("packet") 1898@dataclass(slots=True) 1899class P4PacketIn: 1900 "Represents a P4Runtime PacketIn." 1901 1902 payload: bytes 1903 _: KW_ONLY 1904 metadata: _MetadataDictType 1905 1906 @classmethod 1907 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1908 "Decode protobuf to PacketIn data." 1909 packet = msg.packet 1910 cpm = schema.controller_packet_metadata.get("packet_in") 1911 if cpm is None: 1912 # There is no controller metadata. Warn if message has any. 1913 pkt_meta = packet.metadata 1914 if pkt_meta: 1915 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 1916 return cls(packet.payload, metadata={}) 1917 1918 return cls( 1919 packet.payload, 1920 metadata=cpm.decode(packet.metadata), 1921 ) 1922 1923 def __getitem__(self, key: str) -> Any: 1924 "Retrieve metadata value." 1925 return self.metadata[key] 1926 1927 def __repr__(self) -> str: 1928 "Return friendlier hexadecimal description of packet." 1929 if self.metadata: 1930 return f"P4PacketIn(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 1931 return f"P4PacketIn(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketIn.
1906 @classmethod 1907 def decode(cls, msg: p4r.StreamMessageResponse, schema: P4Schema) -> Self: 1908 "Decode protobuf to PacketIn data." 1909 packet = msg.packet 1910 cpm = schema.controller_packet_metadata.get("packet_in") 1911 if cpm is None: 1912 # There is no controller metadata. Warn if message has any. 1913 pkt_meta = packet.metadata 1914 if pkt_meta: 1915 LOGGER.warning("P4PacketIn unexpected metadata: %r", pkt_meta) 1916 return cls(packet.payload, metadata={}) 1917 1918 return cls( 1919 packet.payload, 1920 metadata=cpm.decode(packet.metadata), 1921 )
Decode protobuf to PacketIn data.
1934@dataclass(slots=True) 1935class P4PacketOut: 1936 "Represents a P4Runtime PacketOut." 1937 1938 payload: bytes 1939 _: KW_ONLY 1940 metadata: _MetadataDictType 1941 1942 def __init__(self, __payload: bytes, /, **metadata: Any): 1943 self.payload = __payload 1944 self.metadata = metadata 1945 1946 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 1947 "Encode PacketOut data as protobuf." 1948 cpm = schema.controller_packet_metadata["packet_out"] 1949 return p4r.StreamMessageRequest( 1950 packet=p4r.PacketOut( 1951 payload=self.payload, 1952 metadata=cpm.encode(self.metadata), 1953 ) 1954 ) 1955 1956 def __getitem__(self, key: str) -> Any: 1957 "Retrieve metadata value." 1958 return self.metadata[key] 1959 1960 def __repr__(self) -> str: 1961 "Return friendlier hexadecimal description of packet." 1962 if self.metadata: 1963 return f"P4PacketOut(metadata={self.metadata!r}, payload=h'{self.payload.hex()}')" 1964 return f"P4PacketOut(payload=h'{self.payload.hex()}')"
Represents a P4Runtime PacketOut.
1946 def encode_update(self, schema: P4Schema) -> p4r.StreamMessageRequest: 1947 "Encode PacketOut data as protobuf." 1948 cpm = schema.controller_packet_metadata["packet_out"] 1949 return p4r.StreamMessageRequest( 1950 packet=p4r.PacketOut( 1951 payload=self.payload, 1952 metadata=cpm.encode(self.metadata), 1953 ) 1954 )
Encode PacketOut data as protobuf.
1169@decodable("register_entry") 1170@dataclass(slots=True) 1171class P4RegisterEntry(_P4ModifyOnly): 1172 "Represents a P4Runtime RegisterEntry." 1173 1174 register_id: str = "" 1175 _: KW_ONLY 1176 index: int | None = None 1177 data: _DataValueType | None = None 1178 1179 def encode(self, schema: P4Schema) -> p4r.Entity: 1180 "Encode RegisterEntry data as protobuf." 1181 if not self.register_id: 1182 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1183 1184 register = schema.registers[self.register_id] 1185 1186 if self.index is not None: 1187 index = p4r.Index(index=self.index) 1188 else: 1189 index = None 1190 1191 if self.data is not None: 1192 data = register.type_spec.encode_data(self.data) 1193 else: 1194 data = None 1195 1196 entry = p4r.RegisterEntry( 1197 register_id=register.id, 1198 index=index, 1199 data=data, 1200 ) 1201 return p4r.Entity(register_entry=entry) 1202 1203 @classmethod 1204 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1205 "Decode protobuf to RegisterEntry data." 1206 entry = msg.register_entry 1207 if entry.register_id == 0: 1208 return cls() 1209 1210 register = schema.registers[entry.register_id] 1211 1212 if entry.HasField("index"): 1213 index = entry.index.index 1214 else: 1215 index = None 1216 1217 if entry.HasField("data"): 1218 data = register.type_spec.decode_data(entry.data) 1219 else: 1220 data = None 1221 1222 return cls( 1223 register.alias, 1224 index=index, 1225 data=data, 1226 )
Represents a P4Runtime RegisterEntry.
1179 def encode(self, schema: P4Schema) -> p4r.Entity: 1180 "Encode RegisterEntry data as protobuf." 1181 if not self.register_id: 1182 return p4r.Entity(register_entry=p4r.RegisterEntry()) 1183 1184 register = schema.registers[self.register_id] 1185 1186 if self.index is not None: 1187 index = p4r.Index(index=self.index) 1188 else: 1189 index = None 1190 1191 if self.data is not None: 1192 data = register.type_spec.encode_data(self.data) 1193 else: 1194 data = None 1195 1196 entry = p4r.RegisterEntry( 1197 register_id=register.id, 1198 index=index, 1199 data=data, 1200 ) 1201 return p4r.Entity(register_entry=entry)
Encode RegisterEntry data as protobuf.
1203 @classmethod 1204 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1205 "Decode protobuf to RegisterEntry data." 1206 entry = msg.register_entry 1207 if entry.register_id == 0: 1208 return cls() 1209 1210 register = schema.registers[entry.register_id] 1211 1212 if entry.HasField("index"): 1213 index = entry.index.index 1214 else: 1215 index = None 1216 1217 if entry.HasField("data"): 1218 data = register.type_spec.decode_data(entry.data) 1219 else: 1220 data = None 1221 1222 return cls( 1223 register.alias, 1224 index=index, 1225 data=data, 1226 )
Decode protobuf to RegisterEntry data.
Action
is an alias for P4TableAction.
439@dataclass(init=False, slots=True) 440class P4TableAction: 441 """Represents a P4Runtime Action reference for a direct table. 442 443 Attributes: 444 name (str): the name of the action. 445 args (dict[str, Any]): the action's arguments as a dictionary. 446 447 Example: 448 If the name of the action is "ipv4_forward" and it takes a single 449 "port" parameter, you can construct the action as: 450 451 ``` 452 action = P4TableAction("ipv4_forward", port=1) 453 ``` 454 455 Reference "9.1.2 Action Specification": 456 The Action Protobuf has fields: (action_id, params). Finsy translates 457 `name` to the appropriate `action_id` as determined by P4Info. It also 458 translates each named argument in `args` to the appropriate `param_id`. 459 460 See Also: 461 To specify an action for an indirect table, use `P4IndirectAction`. 462 Note that P4TableAction will automatically be promoted to an "indirect" 463 action if needed. 464 465 Operators: 466 A `P4TableAction` supports the multiplication operator (*) for 467 constructing "weighted actions". A weighted action is used in specifying 468 indirect actions. Here is an action with a weight of 3: 469 470 ``` 471 weighted_action = 3 * P4TableAction("ipv4_forward", port=1) 472 ``` 473 474 To specify a weight with a `watch_port`, use a tuple `(weight, port)`. 475 The weight is always a positive integer. 476 477 See Also: 478 - P4TableEntry 479 """ 480 481 name: str 482 "The name of the action." 483 args: dict[str, Any] 484 "The action's arguments as a dictionary." 485 486 def __init__(self, __name: str, /, **args: Any): 487 self.name = __name 488 self.args = args 489 490 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 491 """Encode TableAction data as protobuf. 492 493 If the table is indirect, promote the action to a "one-shot" indirect 494 action. 495 """ 496 try: 497 action = table.actions[self.name] 498 except Exception as ex: 499 raise ValueError(f"{table.name!r}: {ex}") from ex 500 501 action_p4 = self._encode_action(action) 502 503 if table.action_profile is not None: 504 # Promote action to ActionProfileActionSet entry with weight=1. 505 return p4r.TableAction( 506 action_profile_action_set=p4r.ActionProfileActionSet( 507 action_profile_actions=[ 508 p4r.ActionProfileAction(action=action_p4, weight=1) 509 ] 510 ) 511 ) 512 513 return p4r.TableAction(action=action_p4) 514 515 def _fail_missing_params(self, action: P4ActionRef | P4Action) -> NoReturn: 516 "Report missing parameters." 517 seen = {param.name for param in action.params} 518 for name in self.args: 519 param = action.params[name] 520 seen.remove(param.name) 521 522 raise ValueError(f"Action {action.alias!r}: missing parameters {seen}") 523 524 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 525 "Encode Action data as protobuf." 526 action = schema.actions[self.name] 527 return self._encode_action(action) 528 529 def _encode_action(self, action: P4ActionRef | P4Action) -> p4r.Action: 530 "Helper to encode an action." 531 aps = action.params 532 try: 533 params = [ 534 aps[name].encode_param(value) for name, value in self.args.items() 535 ] 536 except ValueError as ex: 537 raise ValueError(f"{action.alias!r}: {ex}") from ex 538 539 # Check for missing action parameters. We always accept an action with 540 # no parameters (for wildcard ReadRequests). 541 param_count = len(params) 542 if param_count > 0 and param_count != len(aps): 543 self._fail_missing_params(action) 544 545 return p4r.Action(action_id=action.id, params=params) 546 547 @classmethod 548 def decode_table_action( 549 cls, msg: p4r.TableAction, table: P4Table 550 ) -> Self | "P4IndirectAction": 551 "Decode protobuf to TableAction data." 552 match msg.WhichOneof("type"): 553 case "action": 554 return cls.decode_action(msg.action, table) 555 case "action_profile_member_id": 556 return P4IndirectAction(member_id=msg.action_profile_member_id) 557 case "action_profile_group_id": 558 return P4IndirectAction(group_id=msg.action_profile_group_id) 559 case "action_profile_action_set": 560 return P4IndirectAction.decode_action_set( 561 msg.action_profile_action_set, table 562 ) 563 case other: 564 raise ValueError(f"unknown oneof: {other!r}") 565 566 @classmethod 567 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 568 "Decode protobuf to Action data." 569 action = parent.actions[msg.action_id] 570 args = {} 571 for param in msg.params: 572 action_param = action.params[param.param_id] 573 value = action_param.decode_param(param) 574 args[action_param.name] = value 575 576 return cls(action.alias, **args) 577 578 def format_str(self, schema: P4Schema | P4Table) -> str: 579 """Format the table action as a human-readable string. 580 581 The result is formatted to look like a function call: 582 583 ``` 584 name(param1=value1, ...) 585 ``` 586 587 Where `name` is the action name, and `(param<N>, value<N>)` are the 588 action parameters. The format of `value<N>` is schema-dependent. 589 """ 590 aps = schema.actions[self.name].params 591 args = [ 592 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 593 ] 594 595 return f"{self.name}({', '.join(args)})" 596 597 def __mul__(self, weight: P4Weight) -> P4WeightedAction: 598 "Make a weighted action." 599 if not isinstance( 600 weight, (int, tuple) 601 ): # pyright: ignore[reportUnnecessaryIsInstance] 602 raise NotImplementedError("expected P4Weight") 603 return (weight, self) 604 605 def __rmul__(self, weight: P4Weight) -> P4WeightedAction: 606 "Make a weighted action." 607 if not isinstance( 608 weight, (int, tuple) 609 ): # pyright: ignore[reportUnnecessaryIsInstance] 610 raise NotImplementedError("expected P4Weight") 611 return (weight, self) 612 613 def __call__(self, **params: Any) -> Self: 614 "Return a new action with the updated parameters." 615 return self.__class__(self.name, **(self.args | params))
Represents a P4Runtime Action reference for a direct table.
Attributes:
- name (str): the name of the action.
- args (dict[str, Any]): the action's arguments as a dictionary.
Example:
If the name of the action is "ipv4_forward" and it takes a single "port" parameter, you can construct the action as:
action = P4TableAction("ipv4_forward", port=1)
Reference "9.1.2 Action Specification":
The Action Protobuf has fields: (action_id, params). Finsy translates
name
to the appropriate action_id
as determined by P4Info. It also
translates each named argument in args
to the appropriate param_id
.
See Also:
To specify an action for an indirect table, use
P4IndirectAction
. Note that P4TableAction will automatically be promoted to an "indirect" action if needed.
Operators:
A
P4TableAction
supports the multiplication operator (*) for constructing "weighted actions". A weighted action is used in specifying indirect actions. Here is an action with a weight of 3:weighted_action = 3 * P4TableAction("ipv4_forward", port=1)
To specify a weight with a
watch_port
, use a tuple(weight, port)
. The weight is always a positive integer.
See Also:
- P4TableEntry
490 def encode_table_action(self, table: P4Table) -> p4r.TableAction: 491 """Encode TableAction data as protobuf. 492 493 If the table is indirect, promote the action to a "one-shot" indirect 494 action. 495 """ 496 try: 497 action = table.actions[self.name] 498 except Exception as ex: 499 raise ValueError(f"{table.name!r}: {ex}") from ex 500 501 action_p4 = self._encode_action(action) 502 503 if table.action_profile is not None: 504 # Promote action to ActionProfileActionSet entry with weight=1. 505 return p4r.TableAction( 506 action_profile_action_set=p4r.ActionProfileActionSet( 507 action_profile_actions=[ 508 p4r.ActionProfileAction(action=action_p4, weight=1) 509 ] 510 ) 511 ) 512 513 return p4r.TableAction(action=action_p4)
Encode TableAction data as protobuf.
If the table is indirect, promote the action to a "one-shot" indirect action.
524 def encode_action(self, schema: P4Schema | P4Table) -> p4r.Action: 525 "Encode Action data as protobuf." 526 action = schema.actions[self.name] 527 return self._encode_action(action)
Encode Action data as protobuf.
547 @classmethod 548 def decode_table_action( 549 cls, msg: p4r.TableAction, table: P4Table 550 ) -> Self | "P4IndirectAction": 551 "Decode protobuf to TableAction data." 552 match msg.WhichOneof("type"): 553 case "action": 554 return cls.decode_action(msg.action, table) 555 case "action_profile_member_id": 556 return P4IndirectAction(member_id=msg.action_profile_member_id) 557 case "action_profile_group_id": 558 return P4IndirectAction(group_id=msg.action_profile_group_id) 559 case "action_profile_action_set": 560 return P4IndirectAction.decode_action_set( 561 msg.action_profile_action_set, table 562 ) 563 case other: 564 raise ValueError(f"unknown oneof: {other!r}")
Decode protobuf to TableAction data.
566 @classmethod 567 def decode_action(cls, msg: p4r.Action, parent: P4Schema | P4Table) -> Self: 568 "Decode protobuf to Action data." 569 action = parent.actions[msg.action_id] 570 args = {} 571 for param in msg.params: 572 action_param = action.params[param.param_id] 573 value = action_param.decode_param(param) 574 args[action_param.name] = value 575 576 return cls(action.alias, **args)
Decode protobuf to Action data.
578 def format_str(self, schema: P4Schema | P4Table) -> str: 579 """Format the table action as a human-readable string. 580 581 The result is formatted to look like a function call: 582 583 ``` 584 name(param1=value1, ...) 585 ``` 586 587 Where `name` is the action name, and `(param<N>, value<N>)` are the 588 action parameters. The format of `value<N>` is schema-dependent. 589 """ 590 aps = schema.actions[self.name].params 591 args = [ 592 f"{key}={aps[key].format_param(value)}" for key, value in self.args.items() 593 ] 594 595 return f"{self.name}({', '.join(args)})"
Format the table action as a human-readable string.
The result is formatted to look like a function call:
name(param1=value1, ...)
Where name
is the action name, and (param<N>, value<N>)
are the
action parameters. The format of value<N>
is schema-dependent.
898@decodable("table_entry") 899@dataclass(slots=True) 900class P4TableEntry(_P4Writable): 901 """Represents a P4Runtime table entry. 902 903 Attributes: 904 table_id (str): Name of the table. 905 match (P4TableMatch | None): Entry's match fields. 906 action (P4TableAction | P4IndirectAction | None): Entry's action. 907 is_default_action (bool): True if entry is the default table entry. 908 priority (int): Priority of a table entry when match implies TCAM lookup. 909 metadata (bytes): Arbitrary controller cookie (1.2.0). 910 controller_metadata (int): Deprecated controller cookie (< 1.2.0). 911 meter_config (P4MeterConfig | None): Meter configuration. 912 counter_data (P4CounterData | None): Counter data for table entry. 913 meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0). 914 idle_timeout_ns (int): Idle timeout in nanoseconds. 915 time_since_last_hit (int | None): Nanoseconds since entry last matched. 916 is_const (bool): True if entry is constant (1.4.0). 917 918 The most commonly used fields are table_id, match, action, is_default_action, 919 and priority. See the P4Runtime Spec for usage examples regarding the other 920 attributes. 921 922 When writing a P4TableEntry, you can specify the type of update using '+', 923 '-', and '~'. 924 925 Examples: 926 ``` 927 # Specify all tables when using "read". 928 entry = fy.P4TableEntry() 929 930 # Specify the table named "ipv4" when using "read". 931 entry = fy.P4TableEntry("ipv4") 932 933 # Specify the default entry in the "ipv4" table when using "read". 934 entry = fy.P4TableEntry("ipv4", is_default_action=True) 935 936 # Insert an entry into the "ipv4" table. 937 update = +fy.P4TableEntry( 938 "ipv4", 939 match=fy.Match(ipv4_dst="10.0.0.0/8"), 940 action=fy.Action("forward", port=1), 941 ) 942 943 # Modify the default action in the "ipv4" table. 944 update = ~fy.P4TableEntry( 945 "ipv4", 946 action=fy.Action("forward", port=5), 947 is_default_action=True 948 ) 949 ``` 950 951 Operators: 952 You can retrieve a match field from a table entry using `[]`. For 953 example, `entry["ipv4_dst"]` is the same as `entry.match["ipv4_dst"]`. 954 955 Formatting Helpers: 956 The `match_str` and `action_str` methods provide P4Info-aware formatting 957 of the match and action attributes. 958 """ 959 960 table_id: str = "" 961 "Name of the table." 962 _: KW_ONLY 963 match: P4TableMatch | None = None 964 "Entry's match fields." 965 action: P4TableAction | P4IndirectAction | None = None 966 "Entry's action." 967 is_default_action: bool = False 968 "True if entry is the default table entry." 969 priority: int = 0 970 "Priority of a table entry when match implies TCAM lookup." 971 metadata: bytes = b"" 972 "Arbitrary controller cookie. (1.2.0)." 973 controller_metadata: int = 0 974 "Deprecated controller cookie (< 1.2.0)." 975 meter_config: P4MeterConfig | None = None 976 "Meter configuration." 977 counter_data: P4CounterData | None = None 978 "Counter data for table entry." 979 meter_counter_data: P4MeterCounterData | None = None 980 "Meter counter data (1.4.0)." 981 idle_timeout_ns: int = 0 982 "Idle timeout in nanoseconds." 983 time_since_last_hit: int | None = None 984 "Nanoseconds since entry last matched." 985 is_const: bool = False 986 "True if entry is constant (1.4.0)." 987 988 def __getitem__(self, key: str) -> Any: 989 "Convenience accessor to retrieve a value from the `match` property." 990 if self.match is not None: 991 return self.match[key] 992 raise KeyError(key) 993 994 def encode(self, schema: P4Schema) -> p4r.Entity: 995 "Encode TableEntry data as protobuf." 996 return p4r.Entity(table_entry=self.encode_entry(schema)) 997 998 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 999 "Encode TableEntry data as protobuf." 1000 if not self.table_id: 1001 return self._encode_empty() 1002 1003 table = schema.tables[self.table_id] 1004 1005 if self.match: 1006 match = self.match.encode(table) 1007 else: 1008 match = None 1009 1010 if self.action: 1011 action = self.action.encode_table_action(table) 1012 else: 1013 action = None 1014 1015 if self.meter_config: 1016 meter_config = self.meter_config.encode() 1017 else: 1018 meter_config = None 1019 1020 if self.counter_data: 1021 counter_data = self.counter_data.encode() 1022 else: 1023 counter_data = None 1024 1025 if self.meter_counter_data: 1026 meter_counter_data = self.meter_counter_data.encode() 1027 else: 1028 meter_counter_data = None 1029 1030 if self.time_since_last_hit is not None: 1031 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1032 elapsed_ns=self.time_since_last_hit 1033 ) 1034 else: 1035 time_since_last_hit = None 1036 1037 return p4r.TableEntry( 1038 table_id=table.id, 1039 match=match, 1040 action=action, 1041 priority=self.priority, 1042 controller_metadata=self.controller_metadata, 1043 meter_config=meter_config, 1044 counter_data=counter_data, 1045 meter_counter_data=meter_counter_data, 1046 is_default_action=self.is_default_action, 1047 idle_timeout_ns=self.idle_timeout_ns, 1048 time_since_last_hit=time_since_last_hit, 1049 metadata=self.metadata, 1050 is_const=self.is_const, 1051 ) 1052 1053 def _encode_empty(self) -> p4r.TableEntry: 1054 "Encode an empty wildcard request." 1055 if self.counter_data is not None: 1056 counter_data = self.counter_data.encode() 1057 else: 1058 counter_data = None 1059 1060 # FIXME: time_since_last_hit not supported for wildcard reads? 1061 if self.time_since_last_hit is not None: 1062 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1063 elapsed_ns=self.time_since_last_hit 1064 ) 1065 else: 1066 time_since_last_hit = None 1067 1068 return p4r.TableEntry( 1069 counter_data=counter_data, 1070 time_since_last_hit=time_since_last_hit, 1071 ) 1072 1073 @classmethod 1074 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1075 "Decode protobuf to TableEntry data." 1076 return cls.decode_entry(msg.table_entry, schema) 1077 1078 @classmethod 1079 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1080 "Decode protobuf to TableEntry data." 1081 if entry.table_id == 0: 1082 return cls("") 1083 1084 table = schema.tables[entry.table_id] 1085 1086 if entry.match: 1087 match = P4TableMatch.decode(entry.match, table) 1088 else: 1089 match = None 1090 1091 if entry.HasField("action"): 1092 action = P4TableAction.decode_table_action(entry.action, table) 1093 else: 1094 action = None 1095 1096 if entry.HasField("time_since_last_hit"): 1097 last_hit = entry.time_since_last_hit.elapsed_ns 1098 else: 1099 last_hit = None 1100 1101 if entry.HasField("meter_config"): 1102 meter_config = P4MeterConfig.decode(entry.meter_config) 1103 else: 1104 meter_config = None 1105 1106 if entry.HasField("counter_data"): 1107 counter_data = P4CounterData.decode(entry.counter_data) 1108 else: 1109 counter_data = None 1110 1111 if entry.HasField("meter_counter_data"): 1112 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1113 else: 1114 meter_counter_data = None 1115 1116 return cls( 1117 table_id=table.alias, 1118 match=match, 1119 action=action, 1120 priority=entry.priority, 1121 controller_metadata=entry.controller_metadata, 1122 meter_config=meter_config, 1123 counter_data=counter_data, 1124 meter_counter_data=meter_counter_data, 1125 is_default_action=entry.is_default_action, 1126 idle_timeout_ns=entry.idle_timeout_ns, 1127 time_since_last_hit=last_hit, 1128 metadata=entry.metadata, 1129 is_const=entry.is_const, 1130 ) 1131 1132 def match_dict( 1133 self, 1134 schema: P4Schema, 1135 *, 1136 wildcard: str | None = None, 1137 ) -> dict[str, str]: 1138 """Format the match fields as a dictionary of strings. 1139 1140 If `wildcard` is None, only include match fields that have values. If 1141 `wildcard` is set, include all field names but replace unset values with 1142 given wildcard value (e.g. "*") 1143 """ 1144 table = schema.tables[self.table_id] 1145 if self.match is not None: 1146 return self.match.format_dict(table, wildcard=wildcard) 1147 return P4TableMatch().format_dict(table, wildcard=wildcard) 1148 1149 def match_str( 1150 self, 1151 schema: P4Schema, 1152 *, 1153 wildcard: str | None = None, 1154 ) -> str: 1155 "Format the match fields as a human-readable, canonical string." 1156 table = schema.tables[self.table_id] 1157 if self.match is not None: 1158 return self.match.format_str(table, wildcard=wildcard) 1159 return P4TableMatch().format_str(table, wildcard=wildcard) 1160 1161 def action_str(self, schema: P4Schema) -> str: 1162 "Format the actions as a human-readable, canonical string." 1163 table = schema.tables[self.table_id] 1164 if self.action is None: 1165 return NOACTION_STR 1166 return self.action.format_str(table)
Represents a P4Runtime table entry.
Attributes:
- table_id (str): Name of the table.
- match (P4TableMatch | None): Entry's match fields.
- action (P4TableAction | P4IndirectAction | None): Entry's action.
- is_default_action (bool): True if entry is the default table entry.
- priority (int): Priority of a table entry when match implies TCAM lookup.
- metadata (bytes): Arbitrary controller cookie (1.2.0).
- controller_metadata (int): Deprecated controller cookie (< 1.2.0).
- meter_config (P4MeterConfig | None): Meter configuration.
- counter_data (P4CounterData | None): Counter data for table entry.
- meter_counter_data (P4MeterCounterData | None): Meter counter data (1.4.0).
- idle_timeout_ns (int): Idle timeout in nanoseconds.
- time_since_last_hit (int | None): Nanoseconds since entry last matched.
- is_const (bool): True if entry is constant (1.4.0).
The most commonly used fields are table_id, match, action, is_default_action, and priority. See the P4Runtime Spec for usage examples regarding the other attributes.
When writing a P4TableEntry, you can specify the type of update using '+', '-', and '~'.
Examples:
# Specify all tables when using "read".
entry = fy.P4TableEntry()
# Specify the table named "ipv4" when using "read".
entry = fy.P4TableEntry("ipv4")
# Specify the default entry in the "ipv4" table when using "read".
entry = fy.P4TableEntry("ipv4", is_default_action=True)
# Insert an entry into the "ipv4" table.
update = +fy.P4TableEntry(
"ipv4",
match=fy.Match(ipv4_dst="10.0.0.0/8"),
action=fy.Action("forward", port=1),
)
# Modify the default action in the "ipv4" table.
update = ~fy.P4TableEntry(
"ipv4",
action=fy.Action("forward", port=5),
is_default_action=True
)
Operators:
You can retrieve a match field from a table entry using
[]
. For example,entry["ipv4_dst"]
is the same asentry.match["ipv4_dst"]
.
Formatting Helpers:
The
match_str
andaction_str
methods provide P4Info-aware formatting of the match and action attributes.
988 def __getitem__(self, key: str) -> Any: 989 "Convenience accessor to retrieve a value from the `match` property." 990 if self.match is not None: 991 return self.match[key] 992 raise KeyError(key)
Convenience accessor to retrieve a value from the match
property.
994 def encode(self, schema: P4Schema) -> p4r.Entity: 995 "Encode TableEntry data as protobuf." 996 return p4r.Entity(table_entry=self.encode_entry(schema))
Encode TableEntry data as protobuf.
998 def encode_entry(self, schema: P4Schema) -> p4r.TableEntry: 999 "Encode TableEntry data as protobuf." 1000 if not self.table_id: 1001 return self._encode_empty() 1002 1003 table = schema.tables[self.table_id] 1004 1005 if self.match: 1006 match = self.match.encode(table) 1007 else: 1008 match = None 1009 1010 if self.action: 1011 action = self.action.encode_table_action(table) 1012 else: 1013 action = None 1014 1015 if self.meter_config: 1016 meter_config = self.meter_config.encode() 1017 else: 1018 meter_config = None 1019 1020 if self.counter_data: 1021 counter_data = self.counter_data.encode() 1022 else: 1023 counter_data = None 1024 1025 if self.meter_counter_data: 1026 meter_counter_data = self.meter_counter_data.encode() 1027 else: 1028 meter_counter_data = None 1029 1030 if self.time_since_last_hit is not None: 1031 time_since_last_hit = p4r.TableEntry.IdleTimeout( 1032 elapsed_ns=self.time_since_last_hit 1033 ) 1034 else: 1035 time_since_last_hit = None 1036 1037 return p4r.TableEntry( 1038 table_id=table.id, 1039 match=match, 1040 action=action, 1041 priority=self.priority, 1042 controller_metadata=self.controller_metadata, 1043 meter_config=meter_config, 1044 counter_data=counter_data, 1045 meter_counter_data=meter_counter_data, 1046 is_default_action=self.is_default_action, 1047 idle_timeout_ns=self.idle_timeout_ns, 1048 time_since_last_hit=time_since_last_hit, 1049 metadata=self.metadata, 1050 is_const=self.is_const, 1051 )
Encode TableEntry data as protobuf.
1073 @classmethod 1074 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1075 "Decode protobuf to TableEntry data." 1076 return cls.decode_entry(msg.table_entry, schema)
Decode protobuf to TableEntry data.
1078 @classmethod 1079 def decode_entry(cls, entry: p4r.TableEntry, schema: P4Schema) -> Self: 1080 "Decode protobuf to TableEntry data." 1081 if entry.table_id == 0: 1082 return cls("") 1083 1084 table = schema.tables[entry.table_id] 1085 1086 if entry.match: 1087 match = P4TableMatch.decode(entry.match, table) 1088 else: 1089 match = None 1090 1091 if entry.HasField("action"): 1092 action = P4TableAction.decode_table_action(entry.action, table) 1093 else: 1094 action = None 1095 1096 if entry.HasField("time_since_last_hit"): 1097 last_hit = entry.time_since_last_hit.elapsed_ns 1098 else: 1099 last_hit = None 1100 1101 if entry.HasField("meter_config"): 1102 meter_config = P4MeterConfig.decode(entry.meter_config) 1103 else: 1104 meter_config = None 1105 1106 if entry.HasField("counter_data"): 1107 counter_data = P4CounterData.decode(entry.counter_data) 1108 else: 1109 counter_data = None 1110 1111 if entry.HasField("meter_counter_data"): 1112 meter_counter_data = P4MeterCounterData.decode(entry.meter_counter_data) 1113 else: 1114 meter_counter_data = None 1115 1116 return cls( 1117 table_id=table.alias, 1118 match=match, 1119 action=action, 1120 priority=entry.priority, 1121 controller_metadata=entry.controller_metadata, 1122 meter_config=meter_config, 1123 counter_data=counter_data, 1124 meter_counter_data=meter_counter_data, 1125 is_default_action=entry.is_default_action, 1126 idle_timeout_ns=entry.idle_timeout_ns, 1127 time_since_last_hit=last_hit, 1128 metadata=entry.metadata, 1129 is_const=entry.is_const, 1130 )
Decode protobuf to TableEntry data.
1132 def match_dict( 1133 self, 1134 schema: P4Schema, 1135 *, 1136 wildcard: str | None = None, 1137 ) -> dict[str, str]: 1138 """Format the match fields as a dictionary of strings. 1139 1140 If `wildcard` is None, only include match fields that have values. If 1141 `wildcard` is set, include all field names but replace unset values with 1142 given wildcard value (e.g. "*") 1143 """ 1144 table = schema.tables[self.table_id] 1145 if self.match is not None: 1146 return self.match.format_dict(table, wildcard=wildcard) 1147 return P4TableMatch().format_dict(table, wildcard=wildcard)
Format the match fields as a dictionary of strings.
If wildcard
is None, only include match fields that have values. If
wildcard
is set, include all field names but replace unset values with
given wildcard value (e.g. "*")
1149 def match_str( 1150 self, 1151 schema: P4Schema, 1152 *, 1153 wildcard: str | None = None, 1154 ) -> str: 1155 "Format the match fields as a human-readable, canonical string." 1156 table = schema.tables[self.table_id] 1157 if self.match is not None: 1158 return self.match.format_str(table, wildcard=wildcard) 1159 return P4TableMatch().format_str(table, wildcard=wildcard)
Format the match fields as a human-readable, canonical string.
1161 def action_str(self, schema: P4Schema) -> str: 1162 "Format the actions as a human-readable, canonical string." 1163 table = schema.tables[self.table_id] 1164 if self.action is None: 1165 return NOACTION_STR 1166 return self.action.format_str(table)
Format the actions as a human-readable, canonical string.
Match
is an alias for P4TableMatch.
292class P4TableMatch(dict[str, Any]): 293 """Represents a set of P4Runtime field matches. 294 295 Each match field is stored as a dictionary key, where the key is the name 296 of the match field. The field's value should be appropriate for the type 297 of match (EXACT, LPM, TERNARY, etc.) 298 299 Construct a match similar to a dictionary. 300 301 Example: 302 ``` 303 # Keyword arguments: 304 match = P4TableMatch(ipv4_dst="10.0.0.1") 305 306 # Dictionary argument: 307 match = P4TableMatch({"ipv4_dst": "10.0.0.1"}) 308 309 # List of 2-tuples: 310 match = P4TableMatch([("ipv4_dst", "10.0.0.1")]) 311 ``` 312 313 P4TableMatch is implemented as a subclass of `dict`. It supports all of the 314 standard dictionary methods: 315 ``` 316 match = P4TableMatch() 317 match["ipv4_dst"] = "10.0.0.1" 318 assert len(match) == 1 319 ``` 320 321 Reference "9.1.1 Match Format": 322 Each match field is translated to a FieldMatch Protobuf message by 323 translating the entry key to a `field_id`. The type of match (EXACT, 324 LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the 325 value is converted to the Protobuf representation. 326 327 Supported String Values: 328 ``` 329 EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01" 330 331 LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128", 332 "00:00:00:00:00:01/48" 333 (+ all exact formats are promoted to all-1 masks) 334 335 TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255", 336 "2000::1/&128", "00:00:00:00:00:01/&48" 337 (+ all exact formats are promoted to all-1 masks) 338 (+ all lpm formats are promoted to the specified contiguous mask) 339 340 RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9", 341 "2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09" 342 (+ all exact formats are promoted to single-value ranges) 343 344 OPTIONAL: Same as exact format. 345 ``` 346 347 See the `p4values.py` module for all supported value classes. 348 349 TODO: 350 - Change range delimiter to '-' (and drop '-' delimited MAC's). 351 - Consider supporting ternary values with just '/' (and drop support 352 for decimal masks; mask must be hexadecimal number). 353 354 See Also: 355 - P4TableEntry 356 """ 357 358 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 359 "Encode TableMatch data as a list of Protobuf fields." 360 result: list[p4r.FieldMatch] = [] 361 match_fields = table.match_fields 362 363 for key, value in self.items(): 364 try: 365 field = match_fields[key].encode_field(value) 366 if field is not None: 367 result.append(field) 368 except Exception as ex: 369 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 370 371 return result 372 373 @classmethod 374 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 375 "Decode Protobuf fields as TableMatch data." 376 result: dict[str, Any] = {} 377 match_fields = table.match_fields 378 379 for field in msgs: 380 fld = match_fields[field.field_id] 381 result[fld.alias] = fld.decode_field(field) 382 383 return cls(result) 384 385 def format_dict( 386 self, 387 table: P4Table, 388 *, 389 wildcard: str | None = None, 390 ) -> dict[str, str]: 391 """Format the table match fields as a human-readable dictionary. 392 393 The result is a dictionary showing the TableMatch data for fields 394 included in the match. If `wildcard` is specified, all fields defined 395 in P4Info will be included with their value set to the wildcard string. 396 397 Values are formatted using the format/type specified in P4Info. 398 """ 399 result: dict[str, str] = {} 400 401 for fld in table.match_fields: 402 value = self.get(fld.alias, None) 403 if value is not None: 404 result[fld.alias] = fld.format_field(value) 405 elif wildcard is not None: 406 result[fld.alias] = wildcard 407 408 return result 409 410 def format_str( 411 self, 412 table: P4Table, 413 *, 414 wildcard: str | None = None, 415 ) -> str: 416 """Format the table match fields as a human-readable string. 417 418 The result is a string showing the TableMatch data for fields included 419 in the match. If `wildcard` is specified, all fields defined in P4Info 420 will be included with their value set to the wildcard string. 421 422 All fields are formatted as "name=value" and they are delimited by 423 spaces. 424 425 Values are formatted using the format/type specified in P4Info. 426 """ 427 result: list[str] = [] 428 429 for fld in table.match_fields: 430 value = self.get(fld.alias, None) 431 if value is not None: 432 result.append(f"{fld.alias}={fld.format_field(value)}") 433 elif wildcard is not None: 434 result.append(f"{fld.alias}={wildcard}") 435 436 return " ".join(result)
Represents a set of P4Runtime field matches.
Each match field is stored as a dictionary key, where the key is the name of the match field. The field's value should be appropriate for the type of match (EXACT, LPM, TERNARY, etc.)
Construct a match similar to a dictionary.
Example:
# Keyword arguments:
match = P4TableMatch(ipv4_dst="10.0.0.1")
# Dictionary argument:
match = P4TableMatch({"ipv4_dst": "10.0.0.1"})
# List of 2-tuples:
match = P4TableMatch([("ipv4_dst", "10.0.0.1")])
P4TableMatch is implemented as a subclass of dict
. It supports all of the
standard dictionary methods:
match = P4TableMatch()
match["ipv4_dst"] = "10.0.0.1"
assert len(match) == 1
Reference "9.1.1 Match Format":
Each match field is translated to a FieldMatch Protobuf message by
translating the entry key to a field_id
. The type of match (EXACT,
LPM, TERNARY, OPTIONAL, or RANGE) is determined by the P4Info, and the
value is converted to the Protobuf representation.
Supported String Values:
EXACT: "255", "0xFF", "10.0.0.1", "2000::1", "00:00:00:00:00:01"
LPM: "255/8", "0xFF/8", "10.0.0.1/32", "2000::1/128",
"00:00:00:00:00:01/48"
(+ all exact formats are promoted to all-1 masks)
TERNARY: "255/&255", "0xFF/&0xFF", "10.0.0.1/&255.255.255.255",
"2000::1/&128", "00:00:00:00:00:01/&48"
(+ all exact formats are promoted to all-1 masks)
(+ all lpm formats are promoted to the specified contiguous mask)
RANGE: "0...255", "0x00...0xFF", "10.0.0.1...10.0.0.9",
"2000::1...2001::9", "00:00:00:00:00:01...00:00:00:00:00:09"
(+ all exact formats are promoted to single-value ranges)
OPTIONAL: Same as exact format.
See the p4values.py
module for all supported value classes.
TODO:
- Change range delimiter to '-' (and drop '-' delimited MAC's).
- Consider supporting ternary values with just '/' (and drop support for decimal masks; mask must be hexadecimal number).
See Also:
- P4TableEntry
358 def encode(self, table: P4Table) -> list[p4r.FieldMatch]: 359 "Encode TableMatch data as a list of Protobuf fields." 360 result: list[p4r.FieldMatch] = [] 361 match_fields = table.match_fields 362 363 for key, value in self.items(): 364 try: 365 field = match_fields[key].encode_field(value) 366 if field is not None: 367 result.append(field) 368 except Exception as ex: 369 raise ValueError(f"{table.name!r}: Match field {key!r}: {ex}") from ex 370 371 return result
Encode TableMatch data as a list of Protobuf fields.
373 @classmethod 374 def decode(cls, msgs: Iterable[p4r.FieldMatch], table: P4Table) -> Self: 375 "Decode Protobuf fields as TableMatch data." 376 result: dict[str, Any] = {} 377 match_fields = table.match_fields 378 379 for field in msgs: 380 fld = match_fields[field.field_id] 381 result[fld.alias] = fld.decode_field(field) 382 383 return cls(result)
Decode Protobuf fields as TableMatch data.
385 def format_dict( 386 self, 387 table: P4Table, 388 *, 389 wildcard: str | None = None, 390 ) -> dict[str, str]: 391 """Format the table match fields as a human-readable dictionary. 392 393 The result is a dictionary showing the TableMatch data for fields 394 included in the match. If `wildcard` is specified, all fields defined 395 in P4Info will be included with their value set to the wildcard string. 396 397 Values are formatted using the format/type specified in P4Info. 398 """ 399 result: dict[str, str] = {} 400 401 for fld in table.match_fields: 402 value = self.get(fld.alias, None) 403 if value is not None: 404 result[fld.alias] = fld.format_field(value) 405 elif wildcard is not None: 406 result[fld.alias] = wildcard 407 408 return result
Format the table match fields as a human-readable dictionary.
The result is a dictionary showing the TableMatch data for fields
included in the match. If wildcard
is specified, all fields defined
in P4Info will be included with their value set to the wildcard string.
Values are formatted using the format/type specified in P4Info.
410 def format_str( 411 self, 412 table: P4Table, 413 *, 414 wildcard: str | None = None, 415 ) -> str: 416 """Format the table match fields as a human-readable string. 417 418 The result is a string showing the TableMatch data for fields included 419 in the match. If `wildcard` is specified, all fields defined in P4Info 420 will be included with their value set to the wildcard string. 421 422 All fields are formatted as "name=value" and they are delimited by 423 spaces. 424 425 Values are formatted using the format/type specified in P4Info. 426 """ 427 result: list[str] = [] 428 429 for fld in table.match_fields: 430 value = self.get(fld.alias, None) 431 if value is not None: 432 result.append(f"{fld.alias}={fld.format_field(value)}") 433 elif wildcard is not None: 434 result.append(f"{fld.alias}={wildcard}") 435 436 return " ".join(result)
Format the table match fields as a human-readable string.
The result is a string showing the TableMatch data for fields included
in the match. If wildcard
is specified, all fields defined in P4Info
will be included with their value set to the wildcard string.
All fields are formatted as "name=value" and they are delimited by spaces.
Values are formatted using the format/type specified in P4Info.
1861@decodable("value_set_entry") 1862@dataclass(slots=True) 1863class P4ValueSetEntry(_P4ModifyOnly): 1864 "Represents a P4Runtime ValueSetEntry." 1865 1866 value_set_id: str 1867 _: KW_ONLY 1868 members: list[P4ValueSetMember] 1869 1870 def encode(self, schema: P4Schema) -> p4r.Entity: 1871 "Encode P4ValueSetEntry as protobuf." 1872 value_set = schema.value_sets[self.value_set_id] 1873 members = [ 1874 p4r.ValueSetMember(match=member.encode(value_set)) 1875 for member in self.members 1876 ] 1877 1878 return p4r.Entity( 1879 value_set_entry=p4r.ValueSetEntry( 1880 value_set_id=value_set.id, members=members 1881 ) 1882 ) 1883 1884 @classmethod 1885 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1886 "Decode protobuf to P4ValueSetEntry." 1887 entry = msg.value_set_entry 1888 value_set = schema.value_sets[entry.value_set_id] 1889 1890 members = [ 1891 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 1892 ] 1893 1894 return cls(value_set.alias, members=members)
Represents a P4Runtime ValueSetEntry.
1870 def encode(self, schema: P4Schema) -> p4r.Entity: 1871 "Encode P4ValueSetEntry as protobuf." 1872 value_set = schema.value_sets[self.value_set_id] 1873 members = [ 1874 p4r.ValueSetMember(match=member.encode(value_set)) 1875 for member in self.members 1876 ] 1877 1878 return p4r.Entity( 1879 value_set_entry=p4r.ValueSetEntry( 1880 value_set_id=value_set.id, members=members 1881 ) 1882 )
Encode P4ValueSetEntry as protobuf.
1884 @classmethod 1885 def decode(cls, msg: p4r.Entity, schema: P4Schema) -> Self: 1886 "Decode protobuf to P4ValueSetEntry." 1887 entry = msg.value_set_entry 1888 value_set = schema.value_sets[entry.value_set_id] 1889 1890 members = [ 1891 P4ValueSetMember.decode(member.match, value_set) for member in entry.members 1892 ] 1893 1894 return cls(value_set.alias, members=members)
Decode protobuf to P4ValueSetEntry.
117class P4ConfigAction(_EnumBase): 118 "IntEnum equivalent to `p4r.SetForwardingPipelineConfigRequest.Action`." 119 UNSPECIFIED = p4r.SetForwardingPipelineConfigRequest.Action.UNSPECIFIED 120 VERIFY = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY 121 VERIFY_AND_SAVE = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_SAVE 122 VERIFY_AND_COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT 123 COMMIT = p4r.SetForwardingPipelineConfigRequest.Action.COMMIT 124 RECONCILE_AND_COMMIT = ( 125 p4r.SetForwardingPipelineConfigRequest.Action.RECONCILE_AND_COMMIT 126 ) 127 128 def vt(self) -> p4r.SetForwardingPipelineConfigRequest.Action.ValueType: 129 "Cast `self` to `ValueType`." 130 return cast(p4r.SetForwardingPipelineConfigRequest.Action.ValueType, self)
IntEnum equivalent to p4r.SetForwardingPipelineConfigRequest.Action
.
487class P4Schema(_ReprMixin): 488 """Represents a P4Info file and its associated P4 blob (optional). 489 490 ``` 491 p4 = P4Schema(Path("basic.p4info.txtpb")) 492 ``` 493 494 This class parses the P4Info contents to produce an in-memory representation 495 of the tables, actions, types, etc. inside. This in-memory graph of the 496 contents of the P4Info file may be shared when we parse identical 497 P4Info files. The sharing of P4Info data is controlled by the 498 `P4SchemaCache` class. 499 """ 500 501 _p4info: p4i.P4Info | None 502 _p4blob: Path | bytes | SupportsBytes | None 503 _p4defs: _P4Defs # possibly shared in-memory representation 504 _p4cookie: int = 0 505 506 def __init__( 507 self, 508 p4info: p4i.P4Info | Path | None = None, 509 p4blob: Path | bytes | SupportsBytes | None = None, 510 ): 511 self._p4blob = p4blob 512 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 513 p4info, 514 self._p4blob, 515 ) 516 517 @property 518 def exists(self) -> bool: 519 "True if p4info is configured." 520 return self._p4info is not None 521 522 @property 523 def is_authoritative(self) -> bool: 524 "True if both p4info and p4blob are configured." 525 return self._p4info is not None and self._p4blob is not None 526 527 @property 528 def p4info(self) -> p4i.P4Info: 529 "P4Info value." 530 if self._p4info is None: 531 raise ValueError("No P4Info configured.") 532 return self._p4info 533 534 def set_p4info(self, p4info: p4i.P4Info) -> None: 535 "Set P4Info using value returned from switch." 536 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 537 p4info, 538 self._p4blob, 539 ) 540 541 def has_p4info(self, p4info: p4i.P4Info) -> bool: 542 "Return true if the current P4Info equals the given P4Info." 543 if self._p4info is None: 544 return False 545 return self._p4info.SerializeToString( 546 deterministic=True 547 ) == p4info.SerializeToString(deterministic=True) 548 549 @property 550 def p4blob(self) -> bytes: 551 "P4Blob value a.k.a p4_device_config." 552 return _blob_bytes(self._p4blob) 553 554 @property 555 def p4cookie(self) -> int: 556 """Cookie value for p4info and p4blob.""" 557 return self._p4cookie 558 559 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 560 """The forwarding pipeline configuration.""" 561 return p4r.ForwardingPipelineConfig( 562 p4info=self.p4info, 563 p4_device_config=self.p4blob, 564 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 565 ) 566 567 def get_pipeline_info(self) -> str: 568 "Concise string description of the pipeline (suitable for logging)." 569 if self.exists: 570 pipeline = self.name 571 version = self.version 572 arch = self.arch 573 return f"{pipeline=} {version=} {arch=}" 574 575 return "<No pipeline exists>" 576 577 @property 578 def name(self) -> str: 579 "Name from pkg_info." 580 if self._p4info is None: 581 return "" 582 return self._p4info.pkg_info.name 583 584 @property 585 def version(self) -> str: 586 "Version from pkg_info." 587 if self._p4info is None: 588 return "" 589 return self._p4info.pkg_info.version 590 591 @property 592 def arch(self) -> str: 593 "Arch from pkg_info." 594 if self._p4info is None: 595 return "" 596 return self._p4info.pkg_info.arch 597 598 @property 599 def pkg_info(self) -> p4i.PkgInfo: 600 """Protobuf message containing original `PkgInfo` header. 601 602 Use this to access less frequently used fields like `contact`, `url`, 603 and `platform_properties`. 604 """ 605 if self._p4info is None: 606 raise ValueError("P4Info: No pipeline configured") 607 return self._p4info.pkg_info 608 609 @property 610 def tables(self) -> P4EntityMap["P4Table"]: 611 "Collection of P4 tables." 612 return self._p4defs.tables 613 614 @property 615 def actions(self) -> P4EntityMap["P4Action"]: 616 "Collection of P4 actions." 617 return self._p4defs.actions 618 619 @property 620 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 621 "Collection of P4 action profiles." 622 return self._p4defs.action_profiles 623 624 @property 625 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 626 "Collection of P4 controller packet metadata." 627 return self._p4defs.controller_packet_metadata 628 629 @property 630 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 631 "Collection of P4 direct counters." 632 return self._p4defs.direct_counters 633 634 @property 635 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 636 "Collection of P4 direct meters." 637 return self._p4defs.direct_meters 638 639 @property 640 def counters(self) -> P4EntityMap["P4Counter"]: 641 "Collection of P4 counters." 642 return self._p4defs.counters 643 644 @property 645 def meters(self) -> P4EntityMap["P4Meter"]: 646 "Collection of P4 meters." 647 return self._p4defs.meters 648 649 @property 650 def registers(self) -> P4EntityMap["P4Register"]: 651 "Collection of P4 registers." 652 return self._p4defs.registers 653 654 @property 655 def digests(self) -> P4EntityMap["P4Digest"]: 656 "Collection of P4 digests." 657 return self._p4defs.digests 658 659 @property 660 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 661 "Collection of P4 value sets." 662 return self._p4defs.value_sets 663 664 @property 665 def type_info(self) -> "P4TypeInfo": 666 "Type Info object." 667 return self._p4defs.type_info 668 669 @property 670 def externs(self) -> "P4ExternMap": 671 "Collection of P4 extern instances." 672 return self._p4defs.externs 673 674 def __str__(self) -> str: 675 if self._p4info is None: 676 return "<P4Info: No pipeline configured>" 677 return str(P4SchemaDescription(self))
Represents a P4Info file and its associated P4 blob (optional).
p4 = P4Schema(Path("basic.p4info.txtpb"))
This class parses the P4Info contents to produce an in-memory representation
of the tables, actions, types, etc. inside. This in-memory graph of the
contents of the P4Info file may be shared when we parse identical
P4Info files. The sharing of P4Info data is controlled by the
P4SchemaCache
class.
517 @property 518 def exists(self) -> bool: 519 "True if p4info is configured." 520 return self._p4info is not None
True if p4info is configured.
527 @property 528 def p4info(self) -> p4i.P4Info: 529 "P4Info value." 530 if self._p4info is None: 531 raise ValueError("No P4Info configured.") 532 return self._p4info
P4Info value.
534 def set_p4info(self, p4info: p4i.P4Info) -> None: 535 "Set P4Info using value returned from switch." 536 self._p4info, self._p4defs, self._p4cookie = P4SchemaCache.load_p4info( 537 p4info, 538 self._p4blob, 539 )
Set P4Info using value returned from switch.
541 def has_p4info(self, p4info: p4i.P4Info) -> bool: 542 "Return true if the current P4Info equals the given P4Info." 543 if self._p4info is None: 544 return False 545 return self._p4info.SerializeToString( 546 deterministic=True 547 ) == p4info.SerializeToString(deterministic=True)
Return true if the current P4Info equals the given P4Info.
549 @property 550 def p4blob(self) -> bytes: 551 "P4Blob value a.k.a p4_device_config." 552 return _blob_bytes(self._p4blob)
P4Blob value a.k.a p4_device_config.
559 def get_pipeline_config(self) -> p4r.ForwardingPipelineConfig: 560 """The forwarding pipeline configuration.""" 561 return p4r.ForwardingPipelineConfig( 562 p4info=self.p4info, 563 p4_device_config=self.p4blob, 564 cookie=p4r.ForwardingPipelineConfig.Cookie(cookie=self.p4cookie), 565 )
The forwarding pipeline configuration.
567 def get_pipeline_info(self) -> str: 568 "Concise string description of the pipeline (suitable for logging)." 569 if self.exists: 570 pipeline = self.name 571 version = self.version 572 arch = self.arch 573 return f"{pipeline=} {version=} {arch=}" 574 575 return "<No pipeline exists>"
Concise string description of the pipeline (suitable for logging).
577 @property 578 def name(self) -> str: 579 "Name from pkg_info." 580 if self._p4info is None: 581 return "" 582 return self._p4info.pkg_info.name
Name from pkg_info.
584 @property 585 def version(self) -> str: 586 "Version from pkg_info." 587 if self._p4info is None: 588 return "" 589 return self._p4info.pkg_info.version
Version from pkg_info.
591 @property 592 def arch(self) -> str: 593 "Arch from pkg_info." 594 if self._p4info is None: 595 return "" 596 return self._p4info.pkg_info.arch
Arch from pkg_info.
598 @property 599 def pkg_info(self) -> p4i.PkgInfo: 600 """Protobuf message containing original `PkgInfo` header. 601 602 Use this to access less frequently used fields like `contact`, `url`, 603 and `platform_properties`. 604 """ 605 if self._p4info is None: 606 raise ValueError("P4Info: No pipeline configured") 607 return self._p4info.pkg_info
Protobuf message containing original PkgInfo
header.
Use this to access less frequently used fields like contact
, url
,
and platform_properties
.
609 @property 610 def tables(self) -> P4EntityMap["P4Table"]: 611 "Collection of P4 tables." 612 return self._p4defs.tables
Collection of P4 tables.
614 @property 615 def actions(self) -> P4EntityMap["P4Action"]: 616 "Collection of P4 actions." 617 return self._p4defs.actions
Collection of P4 actions.
619 @property 620 def action_profiles(self) -> P4EntityMap["P4ActionProfile"]: 621 "Collection of P4 action profiles." 622 return self._p4defs.action_profiles
Collection of P4 action profiles.
624 @property 625 def controller_packet_metadata(self) -> P4EntityMap["P4ControllerPacketMetadata"]: 626 "Collection of P4 controller packet metadata." 627 return self._p4defs.controller_packet_metadata
Collection of P4 controller packet metadata.
629 @property 630 def direct_counters(self) -> P4EntityMap["P4DirectCounter"]: 631 "Collection of P4 direct counters." 632 return self._p4defs.direct_counters
Collection of P4 direct counters.
634 @property 635 def direct_meters(self) -> P4EntityMap["P4DirectMeter"]: 636 "Collection of P4 direct meters." 637 return self._p4defs.direct_meters
Collection of P4 direct meters.
639 @property 640 def counters(self) -> P4EntityMap["P4Counter"]: 641 "Collection of P4 counters." 642 return self._p4defs.counters
Collection of P4 counters.
644 @property 645 def meters(self) -> P4EntityMap["P4Meter"]: 646 "Collection of P4 meters." 647 return self._p4defs.meters
Collection of P4 meters.
649 @property 650 def registers(self) -> P4EntityMap["P4Register"]: 651 "Collection of P4 registers." 652 return self._p4defs.registers
Collection of P4 registers.
654 @property 655 def digests(self) -> P4EntityMap["P4Digest"]: 656 "Collection of P4 digests." 657 return self._p4defs.digests
Collection of P4 digests.
659 @property 660 def value_sets(self) -> P4EntityMap["P4ValueSet"]: 661 "Collection of P4 value sets." 662 return self._p4defs.value_sets
Collection of P4 value sets.
154@final 155class Switch: 156 """Represents a P4Runtime Switch. 157 158 A `Switch` is constructed with a `name`, `address` and an optional 159 `SwitchOptions` configuration. 160 161 The `name` is up to the user but should uniquely identify the switch. 162 163 The `address` identifies the target endpoint of the GRPC channel. It should 164 have the format `<ADDRESS>:<PORT>` where `<ADDRESS>` can be a domain name, 165 IPv4 address, or IPv6 address in square brackets, and `<PORT>` is the TCP 166 port number. 167 168 The `options` is a `SwitchOptions` object that specifies how the `Switch` 169 will behave. 170 171 ``` 172 opts = SwitchOptions(p4info=..., p4blob=...) 173 sw1 = Switch('sw1', '10.0.0.1:50000', opts) 174 ``` 175 176 The `stash` is an optional dictionary for storing arbitrary per-switch 177 information. The dictionary keys are strings. You can use this to store 178 anything that you need to manage the switch. 179 180 Each switch object has an event emitter `ee`. Use the EventEmitter to listen 181 for port change events like PORT_UP and PORT_DOWN. See the `SwitchEvent` 182 class for a list of other switch events. 183 """ 184 185 _name: str 186 _address: str 187 _options: SwitchOptions 188 _stash: dict[str, Any] 189 _ee: "SwitchEmitter" 190 _p4client: P4Client | None 191 _p4schema: P4Schema 192 _tasks: "SwitchTasks | None" 193 _packet_queues: list[tuple[Callable[[bytes], bool], Queue[p4entity.P4PacketIn]]] 194 _digest_queues: dict[str, Queue[p4entity.P4DigestList]] 195 _timeout_queue: Queue[p4entity.P4IdleTimeoutNotification] | None 196 _arbitrator: "Arbitrator" 197 _gnmi_client: GNMIClient | None 198 _ports: SwitchPortList 199 _is_channel_up: bool = False 200 _api_version: ApiVersion = ApiVersion(1, 0, 0, "") 201 _control_task: asyncio.Task[Any] | None = None 202 203 def __init__( 204 self, 205 name: str, 206 address: str, 207 options: SwitchOptions | None = None, 208 *, 209 stash: dict[str, Any] | None = None, 210 ) -> None: 211 """Initialize switch with name, address and options. 212 213 Args: 214 name: A human-readable name to uniquely identify the switch. 215 address: The target address of the P4Runtime GRPC channel. 216 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or 217 IP address, and `<PORT>` is the TCP port number. 218 options: Configuration options for the switch. 219 stash: Optional user-controlled dictionary. 220 Used to store information that user code needs to access or share. 221 """ 222 if options is None: 223 options = SwitchOptions() 224 225 self._name = name 226 self._address = address 227 self._options = options 228 self._stash = stash or {} 229 self._ee = SwitchEmitter(self) 230 self._p4client = None 231 self._p4schema = P4Schema(options.p4info, options.p4blob) 232 self._tasks = None 233 self._packet_queues = [] 234 self._digest_queues = {} 235 self._timeout_queue = None 236 self._arbitrator = Arbitrator( 237 options.initial_election_id, options.role_name, options.role_config 238 ) 239 self._gnmi_client = None 240 self._ports = SwitchPortList() 241 242 @property 243 def name(self) -> str: 244 "Name of the switch." 245 return self._name 246 247 @property 248 def address(self) -> str: 249 """Address of the switch formatted as a GRPC target. 250 251 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP 252 address, and `<PORT>` is the TCP port number. 253 """ 254 return self._address 255 256 @property 257 def options(self) -> SwitchOptions: 258 "Switch options." 259 return self._options 260 261 @options.setter 262 def options(self, opts: SwitchOptions) -> None: 263 "Set switch options to a new value." 264 if self._p4client is not None: 265 raise RuntimeError("Cannot change switch options while client is open.") 266 267 self._options = opts 268 self._p4schema = P4Schema(opts.p4info, opts.p4blob) 269 self._arbitrator = Arbitrator( 270 opts.initial_election_id, opts.role_name, opts.role_config 271 ) 272 273 @property 274 def stash(self) -> dict[str, Any]: 275 "Switch stash. Used to store per-switch data for any purpose." 276 return self._stash 277 278 @property 279 def ee(self) -> "SwitchEmitter": 280 "Switch event emitter. See `SwitchEvent` for more details on events." 281 return self._ee 282 283 @property 284 def device_id(self) -> int: 285 "Switch's device ID." 286 return self._options.device_id 287 288 @property 289 def is_up(self) -> bool: 290 "True if switch is UP." 291 return self._is_channel_up 292 293 @property 294 def is_primary(self) -> bool: 295 "True if switch is primary." 296 return self._arbitrator.is_primary 297 298 @property 299 def primary_id(self) -> int: 300 "Election ID of switch that is currently primary." 301 return self._arbitrator.primary_id 302 303 @property 304 def election_id(self) -> int: 305 "Switch's current election ID." 306 return self._arbitrator.election_id 307 308 @property 309 def role_name(self) -> str: 310 "Switch's current role name." 311 return self._arbitrator.role_name 312 313 @property 314 def p4info(self) -> P4Schema: 315 "Switch's P4 schema." 316 return self._p4schema 317 318 @property 319 def gnmi_client(self) -> GNMIClient | None: 320 "Switch's gNMI client." 321 return self._gnmi_client 322 323 @property 324 def ports(self) -> SwitchPortList: 325 "Switch's list of interfaces." 326 return self._ports 327 328 @property 329 def api_version(self) -> ApiVersion: 330 "P4Runtime protocol version." 331 return self._api_version 332 333 @overload 334 async def read( 335 self, 336 entities: _ET, 337 ) -> AsyncGenerator[_ET, None]: 338 "Overload for read of a single P4Entity subtype." 339 ... # pragma: no cover 340 341 @overload 342 async def read( 343 self, 344 entities: Iterable[_ET], 345 ) -> AsyncGenerator[_ET, None]: 346 "Overload for read of an iterable of the same P4Entity subtype." 347 ... # pragma: no cover 348 349 @overload 350 async def read( 351 self, 352 entities: Iterable[p4entity.P4EntityList], 353 ) -> AsyncGenerator[p4entity.P4Entity, None]: 354 "Most general overload: we can't determine the return type exactly." 355 ... # pragma: no cover 356 357 async def read( 358 self, 359 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 360 ) -> AsyncGenerator[p4entity.P4Entity, None]: 361 "Async iterator that reads entities from the switch." 362 assert self._p4client is not None 363 364 if not entities: 365 return 366 367 if isinstance(entities, p4entity.P4Entity): 368 entities = [entities] 369 370 request = p4r.ReadRequest( 371 device_id=self.device_id, 372 entities=p4entity.encode_entities(entities, self.p4info), 373 ) 374 375 async for reply in self._p4client.request_iter(request): 376 for ent in reply.entities: 377 yield p4entity.decode_entity(ent, self.p4info) 378 379 async def read_packets( 380 self, 381 *, 382 queue_size: int = _DEFAULT_QUEUE_SIZE, 383 eth_types: Iterable[int] | None = None, 384 ) -> AsyncIterator["p4entity.P4PacketIn"]: 385 "Async iterator for incoming packets (P4PacketIn)." 386 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 387 388 if eth_types is None: 389 390 def _pkt_filter(_payload: bytes) -> bool: 391 return True 392 393 else: 394 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 395 396 def _pkt_filter(_payload: bytes) -> bool: 397 return _payload[12:14] in _filter 398 399 queue = Queue[p4entity.P4PacketIn](queue_size) 400 queue_filter = (_pkt_filter, queue) 401 self._packet_queues.append(queue_filter) 402 403 try: 404 while True: 405 yield await queue.get() 406 finally: 407 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 408 self._packet_queues.remove(queue_filter) 409 410 async def read_digests( 411 self, 412 digest_id: str, 413 *, 414 queue_size: int = _DEFAULT_QUEUE_SIZE, 415 ) -> AsyncIterator["p4entity.P4DigestList"]: 416 "Async iterator for incoming digest lists (P4DigestList)." 417 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 418 419 if digest_id in self._digest_queues: 420 raise ValueError(f"queue for digest_id {digest_id!r} already open") 421 422 queue = Queue[p4entity.P4DigestList](queue_size) 423 self._digest_queues[digest_id] = queue 424 try: 425 while True: 426 yield await queue.get() 427 finally: 428 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 429 del self._digest_queues[digest_id] 430 431 async def read_idle_timeouts( 432 self, 433 *, 434 queue_size: int = _DEFAULT_QUEUE_SIZE, 435 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 436 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 437 LOGGER.debug("read_idle_timeouts: opening queue") 438 439 if self._timeout_queue is not None: 440 raise ValueError("timeout queue already open") 441 442 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 443 self._timeout_queue = queue 444 try: 445 while True: 446 yield await queue.get() 447 finally: 448 LOGGER.debug("read_idle_timeouts: closing queue") 449 self._timeout_queue = None 450 451 async def write( 452 self, 453 entities: Iterable[p4entity.P4UpdateList], 454 *, 455 strict: bool = True, 456 warn_only: bool = False, 457 ) -> None: 458 """Write updates and stream messages to the switch. 459 460 If `strict` is False, MODIFY and DELETE operations will NOT raise an 461 error if the entity does not exist (NOT_FOUND). 462 463 If `warn_only` is True, no operations will raise an error. Instead, 464 the exception will be logged as a WARNING and the method will return 465 normally. 466 """ 467 assert self._p4client is not None 468 469 if not entities: 470 return 471 472 msgs = p4entity.encode_updates(entities, self.p4info) 473 474 updates: list[p4r.Update] = [] 475 for msg in msgs: 476 if isinstance(msg, p4r.StreamMessageRequest): 477 # StreamMessageRequests are transmitted immediately. 478 # TODO: Understand what happens with backpressure? 479 await self._p4client.send(msg) 480 else: 481 updates.append(msg) 482 483 if updates: 484 await self._write_request(updates, strict, warn_only) 485 486 async def insert( 487 self, 488 entities: Iterable[p4entity.P4EntityList], 489 *, 490 warn_only: bool = False, 491 ) -> None: 492 """Insert the specified entities. 493 494 If `warn_only` is True, errors will be logged as warnings instead of 495 raising an exception. 496 """ 497 if entities: 498 await self._write_request( 499 [ 500 p4r.Update(type=p4r.Update.INSERT, entity=ent) 501 for ent in p4entity.encode_entities(entities, self.p4info) 502 ], 503 True, 504 warn_only, 505 ) 506 507 async def modify( 508 self, 509 entities: Iterable[p4entity.P4EntityList], 510 *, 511 strict: bool = True, 512 warn_only: bool = False, 513 ) -> None: 514 """Modify the specified entities. 515 516 If `strict` is False, NOT_FOUND errors will be ignored. 517 518 If `warn_only` is True, errors will be logged as warnings instead of 519 raising an exception. 520 """ 521 if entities: 522 await self._write_request( 523 [ 524 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 525 for ent in p4entity.encode_entities(entities, self.p4info) 526 ], 527 strict, 528 warn_only, 529 ) 530 531 async def delete( 532 self, 533 entities: Iterable[p4entity.P4EntityList], 534 *, 535 strict: bool = True, 536 warn_only: bool = False, 537 ) -> None: 538 """Delete the specified entities. 539 540 If `strict` is False, NOT_FOUND errors will be ignored. 541 542 If `warn_only` is True, errors will be logged as warnings instead of 543 raising an exception. 544 """ 545 if entities: 546 await self._write_request( 547 [ 548 p4r.Update(type=p4r.Update.DELETE, entity=ent) 549 for ent in p4entity.encode_entities(entities, self.p4info) 550 ], 551 strict, 552 warn_only, 553 ) 554 555 async def delete_all(self) -> None: 556 """Delete all entities if no parameter is passed. Otherwise, delete 557 items that match `entities`. 558 559 This method does not attempt to delete entries in const tables. 560 561 TODO: This method does not affect indirect counters, meters or 562 value_sets. 563 """ 564 await self.delete_many( 565 [ 566 p4entity.P4TableEntry(), 567 p4entity.P4MulticastGroupEntry(), 568 p4entity.P4CloneSessionEntry(), 569 ] 570 ) 571 572 # Reset all default table entries. 573 default_entries = [ 574 p4entity.P4TableEntry(table.alias, is_default_action=True) 575 for table in self.p4info.tables 576 if table.const_default_action is None and table.action_profile is None 577 ] 578 if default_entries: 579 await self.modify(default_entries) 580 581 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 582 # We do this after deleting the P4TableEntry's in case a client is using 583 # "one-shot" references; these are incompatible with separate 584 # action profiles. 585 await self.delete_many( 586 [ 587 p4entity.P4ActionProfileGroup(), 588 p4entity.P4ActionProfileMember(), 589 ] 590 ) 591 592 # Delete DigestEntry separately. Wildcard reads are not supported. 593 digest_entries = [ 594 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 595 ] 596 if digest_entries: 597 await self.delete(digest_entries, strict=False) 598 599 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 600 """Delete entities that match a wildcard read. 601 602 This method always skips over entries in const tables. It is an error 603 to attempt to delete those. 604 """ 605 assert self._p4client is not None 606 607 request = p4r.ReadRequest( 608 device_id=self.device_id, 609 entities=p4entity.encode_entities(entities, self.p4info), 610 ) 611 612 # Compute set of all const table ID's (may be empty). 613 to_skip = {table.id for table in self.p4info.tables if table.is_const} 614 615 async for reply in self._p4client.request_iter(request): 616 if reply.entities: 617 if to_skip: 618 await self.delete( 619 reply 620 for reply in reply.entities 621 if reply.HasField("table_entry") 622 and reply.table_entry.table_id not in to_skip 623 ) 624 else: 625 await self.delete(reply.entities) 626 627 async def run(self) -> None: 628 "Run the switch's lifecycle repeatedly." 629 assert self._p4client is None 630 assert self._tasks is None 631 632 self._tasks = SwitchTasks(self._options.fail_fast) 633 self._p4client = P4Client(self._address, self._options.channel_credentials) 634 self._switch_start() 635 636 try: 637 while True: 638 # If the switch fails and restarts too quickly, slow it down. 639 async with _throttle_failure(): 640 self.create_task(self._run(), background=True) 641 await self._tasks.wait() 642 self._arbitrator.reset() 643 644 finally: 645 self._p4client = None 646 self._tasks = None 647 self._switch_stop() 648 649 def create_task( 650 self, 651 coro: Coroutine[Any, Any, _T], 652 *, 653 background: bool = False, 654 name: str | None = None, 655 ) -> asyncio.Task[_T]: 656 "Create an asyncio task tied to the Switch's lifecycle." 657 assert self._tasks is not None 658 659 return self._tasks.create_task( 660 coro, 661 switch=self, 662 background=background, 663 name=name, 664 ) 665 666 async def _run(self): 667 "Main Switch task runs the stream." 668 assert not self._is_channel_up 669 assert self._p4client is not None 670 671 try: 672 await self._p4client.open( 673 schema=self.p4info, 674 complete_request=self._arbitrator.complete_request, 675 ) 676 await self._arbitrator.handshake(self) 677 await self._fetch_capabilities() 678 await self._start_gnmi() 679 self._channel_up() 680 681 # Receive messages from the stream until it closes. 682 await self._receive_until_closed() 683 684 finally: 685 await self._stop_gnmi() 686 await self._p4client.close() 687 self._channel_down() 688 689 async def _receive_until_closed(self): 690 "Receive messages from stream until EOF." 691 assert self._p4client is not None 692 693 client = self._p4client 694 695 while True: 696 try: 697 msg = await client.receive() 698 except P4ClientError as ex: 699 if not ex.is_election_id_used: 700 raise 701 # Handle "election ID in use" error. 702 await self._arbitrator.handshake(self, conflict=True) 703 else: 704 await self._handle_stream_message(msg) 705 706 async def _handle_stream_message(self, msg: p4r.StreamMessageResponse): 707 "Handle a P4Runtime StreamMessageResponse." 708 match msg.WhichOneof("update"): 709 case "packet": 710 self._stream_packet_message(msg) 711 case "digest": 712 self._stream_digest_message(msg) 713 case "idle_timeout_notification": 714 self._stream_timeout_message(msg) 715 case "arbitration": 716 await self._arbitrator.update(self, msg.arbitration) 717 case "error": 718 self._stream_error_message(msg) 719 case other: 720 LOGGER.error("_handle_stream_message: unknown update %r", other) 721 722 async def __aenter__(self) -> Self: 723 "Similar to run() but provides a one-time context manager interface." 724 assert self._p4client is None 725 assert self._tasks is None 726 727 self._tasks = SwitchTasks(self._options.fail_fast) 728 self._p4client = P4Client( 729 self._address, 730 self._options.channel_credentials, 731 wait_for_ready=False, 732 ) 733 self._switch_start() 734 735 try: 736 # Start the switch's `_run` task in the background. Then, wait for 737 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 738 # cannot connect or fails in some other way, it will finish before 739 # the `ready` future. We need to handle the error in this case. 740 741 run = self.create_task(self._run(), background=True) 742 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 743 done, _ = await asyncio.wait( 744 [run, ready], return_when=asyncio.FIRST_COMPLETED 745 ) 746 if run in done: 747 await run 748 749 except BaseException: 750 await self.__aexit__(None, None, None) 751 raise 752 753 return self 754 755 async def __aexit__( 756 self, 757 _exc_type: type[BaseException] | None, 758 _exc_val: BaseException | None, 759 _exc_tb: TracebackType | None, 760 ) -> bool | None: 761 "Similar to run() but provides a one-time context manager interface." 762 assert self._p4client is not None 763 assert self._tasks is not None 764 765 self._tasks.cancel_all() 766 await self._tasks.wait() 767 self._arbitrator.reset() 768 self._p4client = None 769 self._tasks = None 770 self._switch_stop() 771 772 def _switch_start(self): 773 "Called when switch starts its run() cycle." 774 assert not self._is_channel_up 775 776 LOGGER.info( 777 "Switch start (name=%r, address=%r, device_id=%r, role_name=%r, initial_election_id=%r)", 778 self.name, 779 self.address, 780 self.device_id, 781 self.role_name, 782 self.options.initial_election_id, 783 ) 784 self.ee.emit(SwitchEvent.SWITCH_START) 785 786 def _switch_stop(self): 787 "Called when switch stops its run() cycle." 788 assert not self._is_channel_up 789 790 LOGGER.info( 791 "Switch stop (name=%r, address=%r, device_id=%r, role_name=%r)", 792 self.name, 793 self.address, 794 self.device_id, 795 self.role_name, 796 ) 797 self.ee.emit(SwitchEvent.SWITCH_STOP) 798 799 def _channel_up(self): 800 "Called when P4Runtime channel is UP." 801 assert not self._is_channel_up 802 803 ports = " ".join(f"({port.id}){port.name}" for port in self.ports) 804 LOGGER.info( 805 "Channel up (is_primary=%r, role_name=%r, p4r=%s): %s", 806 self.is_primary, 807 self.role_name, 808 self.api_version, 809 ports, 810 ) 811 self._is_channel_up = True 812 self.create_task(self._ready()) 813 814 self.ee.emit(SwitchEvent.CHANNEL_UP, self) 815 816 def _channel_down(self): 817 "Called when P4Runtime channel is DOWN." 818 if not self._is_channel_up: 819 return # do nothing! 820 821 LOGGER.info( 822 "Channel down (is_primary=%r, role_name=%r)", 823 self.is_primary, 824 self.role_name, 825 ) 826 self._is_channel_up = False 827 828 self.ee.emit(SwitchEvent.CHANNEL_DOWN, self) 829 830 def _become_primary(self): 831 "Called when a P4Runtime backup channel becomes the primary." 832 assert self._tasks is not None 833 834 LOGGER.info( 835 "Become primary (is_primary=%r, role_name=%r)", 836 self.is_primary, 837 self.role_name, 838 ) 839 840 self._tasks.cancel_primary() 841 self.create_task(self._ready()) 842 843 self.ee.emit(SwitchEvent.BECOME_PRIMARY, self) 844 845 def _become_backup(self): 846 "Called when a P4Runtime primary channel becomes a backup." 847 assert self._tasks is not None 848 849 LOGGER.info( 850 "Become backup (is_primary=%r, role_name=%r)", 851 self.is_primary, 852 self.role_name, 853 ) 854 855 self._tasks.cancel_primary() 856 self.create_task(self._ready()) 857 858 self.ee.emit(SwitchEvent.BECOME_BACKUP, self) 859 860 def _channel_ready(self): 861 "Called when a P4Runtime channel is READY." 862 LOGGER.info( 863 "Channel ready (is_primary=%r, role_name=%r): %s", 864 self.is_primary, 865 self.role_name, 866 self.p4info.get_pipeline_info(), 867 ) 868 869 if self._options.ready_handler: 870 self.create_task(self._options.ready_handler(self)) 871 872 self.ee.emit(SwitchEvent.CHANNEL_READY, self) 873 874 def _stream_packet_message(self, msg: p4r.StreamMessageResponse): 875 "Called when a P4Runtime packet-in response is received." 876 packet = p4entity.decode_stream(msg, self.p4info) 877 878 was_queued = False 879 for pkt_filter, queue in self._packet_queues: 880 if not queue.full() and pkt_filter(packet.payload): 881 queue.put_nowait(packet) 882 was_queued = True 883 884 if not was_queued: 885 LOGGER.warning("packet ignored: %r", packet) 886 887 def _stream_digest_message(self, msg: p4r.StreamMessageResponse): 888 "Called when a P4Runtime digest response is received." 889 try: 890 # Decode the digest list message. 891 digest: p4entity.P4DigestList = p4entity.decode_stream(msg, self.p4info) 892 except ValueError as ex: 893 # It's possible to receive a digest for a different P4Info file, or 894 # even before a P4Info is fetched from the switch. 895 LOGGER.warning("digest decode failed: %s", ex) 896 else: 897 # Place the decoded digest list in a queue, if one is waiting. 898 queue = self._digest_queues.get(digest.digest_id) 899 if queue is not None and not queue.full(): 900 queue.put_nowait(digest) 901 else: 902 LOGGER.warning("digest ignored: %r", digest) 903 904 def _stream_timeout_message(self, msg: p4r.StreamMessageResponse): 905 "Called when a P4Runtime timeout notification is received." 906 timeout: p4entity.P4IdleTimeoutNotification = p4entity.decode_stream( 907 msg, self.p4info 908 ) 909 queue = self._timeout_queue 910 911 if queue is not None and not queue.full(): 912 queue.put_nowait(timeout) 913 else: 914 LOGGER.warning("timeout ignored: %r", timeout) 915 916 def _stream_error_message(self, msg: p4r.StreamMessageResponse): 917 "Called when a P4Runtime stream error response is received." 918 assert self._p4client is not None 919 920 # Log the message at the ERROR level. 921 pbutil.log_msg(self._p4client.channel, msg, self.p4info, level=logging.ERROR) 922 923 self.ee.emit(SwitchEvent.STREAM_ERROR, self, msg) 924 925 async def _ready(self): 926 "Prepare the pipeline." 927 if self.p4info.is_authoritative and self.is_primary: 928 await self._set_pipeline() 929 else: 930 await self._get_pipeline() 931 932 self._channel_ready() 933 934 async def _get_pipeline(self): 935 "Get the switch's P4Info." 936 has_pipeline = False 937 938 try: 939 reply = await self._get_pipeline_config_request( 940 response_type=P4ConfigResponseType.P4INFO_AND_COOKIE 941 ) 942 943 if reply.config.HasField("p4info"): 944 has_pipeline = True 945 p4info = reply.config.p4info 946 if not self.p4info.exists: 947 # If we don't have P4Info yet, set it. 948 self.p4info.set_p4info(p4info) 949 elif not self.p4info.has_p4info(p4info): 950 # If P4Info is not identical, log a warning message. 951 LOGGER.warning("Retrieved P4Info is different than expected!") 952 953 except P4ClientError as ex: 954 if not ex.is_pipeline_missing: 955 raise 956 957 if not has_pipeline and self.p4info.exists: 958 LOGGER.warning("Forwarding pipeline is not configured") 959 960 async def _set_pipeline(self): 961 """Set up the pipeline. 962 963 If `p4force` is false (the default), we first retrieve the cookie for 964 the current pipeline and see if it matches the new pipeline's cookie. 965 If the cookies match, we are done; there is no need to set the pipeline. 966 967 If `p4force` is true, we always load the new pipeline. 968 """ 969 cookie = -1 970 try: 971 if not self.options.p4force: 972 reply = await self._get_pipeline_config_request() 973 if reply.config.HasField("cookie"): 974 cookie = reply.config.cookie.cookie 975 976 except P4ClientError as ex: 977 if not ex.is_pipeline_missing: 978 raise 979 980 if cookie != self.p4info.p4cookie: 981 LOGGER.debug( 982 "cookie %#x does not match expected %#x", cookie, self.p4info.p4cookie 983 ) 984 await self._set_pipeline_config_request( 985 config=self.p4info.get_pipeline_config() 986 ) 987 LOGGER.info("Pipeline installed: %s", self.p4info.get_pipeline_info()) 988 989 async def _get_pipeline_config_request( 990 self, 991 *, 992 response_type: P4ConfigResponseType = P4ConfigResponseType.COOKIE_ONLY, 993 ) -> p4r.GetForwardingPipelineConfigResponse: 994 "Send a GetForwardingPipelineConfigRequest and await the response." 995 assert self._p4client is not None 996 997 return await self._p4client.request( 998 p4r.GetForwardingPipelineConfigRequest( 999 device_id=self.device_id, 1000 response_type=response_type.vt(), 1001 ) 1002 ) 1003 1004 async def _set_pipeline_config_request( 1005 self, 1006 *, 1007 action: P4ConfigAction = P4ConfigAction.VERIFY_AND_COMMIT, 1008 config: p4r.ForwardingPipelineConfig, 1009 ) -> p4r.SetForwardingPipelineConfigResponse: 1010 "Send a SetForwardingPipelineConfigRequest and await the response." 1011 assert self._p4client is not None 1012 1013 return await self._p4client.request( 1014 p4r.SetForwardingPipelineConfigRequest( 1015 device_id=self.device_id, 1016 action=action.vt(), 1017 config=config, 1018 ) 1019 ) 1020 1021 async def _write_request( 1022 self, 1023 updates: list[p4r.Update], 1024 strict: bool, 1025 warn_only: bool, 1026 ): 1027 "Send a P4Runtime WriteRequest." 1028 assert self._p4client is not None 1029 1030 try: 1031 await self._p4client.request( 1032 p4r.WriteRequest( 1033 device_id=self.device_id, 1034 updates=updates, 1035 ) 1036 ) 1037 except P4ClientError as ex: 1038 if strict or not ex.is_not_found_only: 1039 if warn_only: 1040 LOGGER.warning( 1041 "WriteRequest with `warn_only=True` failed", 1042 exc_info=True, 1043 ) 1044 else: 1045 raise 1046 1047 assert (not strict and ex.is_not_found_only) or warn_only 1048 1049 async def _fetch_capabilities(self): 1050 "Check the P4Runtime protocol version supported by the other end." 1051 assert self._p4client is not None 1052 1053 try: 1054 reply = await self._p4client.request(p4r.CapabilitiesRequest()) 1055 self._api_version = ApiVersion.parse(reply.p4runtime_api_version) 1056 1057 except P4ClientError as ex: 1058 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1059 raise 1060 LOGGER.warning("CapabilitiesRequest is not implemented") 1061 1062 async def _start_gnmi(self): 1063 "Start the associated gNMI client." 1064 assert self._gnmi_client is None 1065 assert self._p4client is not None 1066 1067 self._gnmi_client = GNMIClient(self._address, self._options.channel_credentials) 1068 await self._gnmi_client.open(channel=self._p4client.channel) 1069 1070 try: 1071 await self._ports.subscribe(self._gnmi_client) 1072 if self._ports: 1073 self.create_task(self._ports.listen(), background=True, name="_ports") 1074 1075 except GNMIClientError as ex: 1076 if ex.code != GRPCStatusCode.UNIMPLEMENTED: 1077 raise 1078 LOGGER.warning("gNMI is not implemented") 1079 await self._gnmi_client.close() 1080 self._gnmi_client = None 1081 1082 async def _stop_gnmi(self): 1083 "Stop the associated gNMI client." 1084 if self._gnmi_client is not None: 1085 self._ports.close() 1086 await self._gnmi_client.close() 1087 self._gnmi_client = None 1088 1089 def __repr__(self) -> str: 1090 "Return string representation of switch." 1091 return f"Switch(name={self._name!r}, address={self._address!r})"
Represents a P4Runtime Switch.
A Switch
is constructed with a name
, address
and an optional
SwitchOptions
configuration.
The name
is up to the user but should uniquely identify the switch.
The address
identifies the target endpoint of the GRPC channel. It should
have the format <ADDRESS>:<PORT>
where <ADDRESS>
can be a domain name,
IPv4 address, or IPv6 address in square brackets, and <PORT>
is the TCP
port number.
The options
is a SwitchOptions
object that specifies how the Switch
will behave.
opts = SwitchOptions(p4info=..., p4blob=...)
sw1 = Switch('sw1', '10.0.0.1:50000', opts)
The stash
is an optional dictionary for storing arbitrary per-switch
information. The dictionary keys are strings. You can use this to store
anything that you need to manage the switch.
Each switch object has an event emitter ee
. Use the EventEmitter to listen
for port change events like PORT_UP and PORT_DOWN. See the SwitchEvent
class for a list of other switch events.
203 def __init__( 204 self, 205 name: str, 206 address: str, 207 options: SwitchOptions | None = None, 208 *, 209 stash: dict[str, Any] | None = None, 210 ) -> None: 211 """Initialize switch with name, address and options. 212 213 Args: 214 name: A human-readable name to uniquely identify the switch. 215 address: The target address of the P4Runtime GRPC channel. 216 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or 217 IP address, and `<PORT>` is the TCP port number. 218 options: Configuration options for the switch. 219 stash: Optional user-controlled dictionary. 220 Used to store information that user code needs to access or share. 221 """ 222 if options is None: 223 options = SwitchOptions() 224 225 self._name = name 226 self._address = address 227 self._options = options 228 self._stash = stash or {} 229 self._ee = SwitchEmitter(self) 230 self._p4client = None 231 self._p4schema = P4Schema(options.p4info, options.p4blob) 232 self._tasks = None 233 self._packet_queues = [] 234 self._digest_queues = {} 235 self._timeout_queue = None 236 self._arbitrator = Arbitrator( 237 options.initial_election_id, options.role_name, options.role_config 238 ) 239 self._gnmi_client = None 240 self._ports = SwitchPortList()
Initialize switch with name, address and options.
Arguments:
- name: A human-readable name to uniquely identify the switch.
- address: The target address of the P4Runtime GRPC channel.
Format is
<ADDRESS>:<PORT>
where<ADDRESS>
is a DNS name or IP address, and<PORT>
is the TCP port number. - options: Configuration options for the switch.
- stash: Optional user-controlled dictionary. Used to store information that user code needs to access or share.
247 @property 248 def address(self) -> str: 249 """Address of the switch formatted as a GRPC target. 250 251 Format is `<ADDRESS>:<PORT>` where `<ADDRESS>` is a DNS name or IP 252 address, and `<PORT>` is the TCP port number. 253 """ 254 return self._address
Address of the switch formatted as a GRPC target.
Format is <ADDRESS>:<PORT>
where <ADDRESS>
is a DNS name or IP
address, and <PORT>
is the TCP port number.
256 @property 257 def options(self) -> SwitchOptions: 258 "Switch options." 259 return self._options
Switch options.
273 @property 274 def stash(self) -> dict[str, Any]: 275 "Switch stash. Used to store per-switch data for any purpose." 276 return self._stash
Switch stash. Used to store per-switch data for any purpose.
278 @property 279 def ee(self) -> "SwitchEmitter": 280 "Switch event emitter. See `SwitchEvent` for more details on events." 281 return self._ee
Switch event emitter. See SwitchEvent
for more details on events.
283 @property 284 def device_id(self) -> int: 285 "Switch's device ID." 286 return self._options.device_id
Switch's device ID.
288 @property 289 def is_up(self) -> bool: 290 "True if switch is UP." 291 return self._is_channel_up
True if switch is UP.
293 @property 294 def is_primary(self) -> bool: 295 "True if switch is primary." 296 return self._arbitrator.is_primary
True if switch is primary.
298 @property 299 def primary_id(self) -> int: 300 "Election ID of switch that is currently primary." 301 return self._arbitrator.primary_id
Election ID of switch that is currently primary.
303 @property 304 def election_id(self) -> int: 305 "Switch's current election ID." 306 return self._arbitrator.election_id
Switch's current election ID.
308 @property 309 def role_name(self) -> str: 310 "Switch's current role name." 311 return self._arbitrator.role_name
Switch's current role name.
318 @property 319 def gnmi_client(self) -> GNMIClient | None: 320 "Switch's gNMI client." 321 return self._gnmi_client
Switch's gNMI client.
323 @property 324 def ports(self) -> SwitchPortList: 325 "Switch's list of interfaces." 326 return self._ports
Switch's list of interfaces.
328 @property 329 def api_version(self) -> ApiVersion: 330 "P4Runtime protocol version." 331 return self._api_version
P4Runtime protocol version.
357 async def read( 358 self, 359 entities: Iterable[p4entity.P4EntityList] | p4entity.P4Entity, 360 ) -> AsyncGenerator[p4entity.P4Entity, None]: 361 "Async iterator that reads entities from the switch." 362 assert self._p4client is not None 363 364 if not entities: 365 return 366 367 if isinstance(entities, p4entity.P4Entity): 368 entities = [entities] 369 370 request = p4r.ReadRequest( 371 device_id=self.device_id, 372 entities=p4entity.encode_entities(entities, self.p4info), 373 ) 374 375 async for reply in self._p4client.request_iter(request): 376 for ent in reply.entities: 377 yield p4entity.decode_entity(ent, self.p4info)
Async iterator that reads entities from the switch.
379 async def read_packets( 380 self, 381 *, 382 queue_size: int = _DEFAULT_QUEUE_SIZE, 383 eth_types: Iterable[int] | None = None, 384 ) -> AsyncIterator["p4entity.P4PacketIn"]: 385 "Async iterator for incoming packets (P4PacketIn)." 386 LOGGER.debug("read_packets: opening queue: eth_types=%r", eth_types) 387 388 if eth_types is None: 389 390 def _pkt_filter(_payload: bytes) -> bool: 391 return True 392 393 else: 394 _filter = {eth.to_bytes(2, "big") for eth in eth_types} 395 396 def _pkt_filter(_payload: bytes) -> bool: 397 return _payload[12:14] in _filter 398 399 queue = Queue[p4entity.P4PacketIn](queue_size) 400 queue_filter = (_pkt_filter, queue) 401 self._packet_queues.append(queue_filter) 402 403 try: 404 while True: 405 yield await queue.get() 406 finally: 407 LOGGER.debug("read_packets: closing queue: eth_types=%r", eth_types) 408 self._packet_queues.remove(queue_filter)
Async iterator for incoming packets (P4PacketIn).
410 async def read_digests( 411 self, 412 digest_id: str, 413 *, 414 queue_size: int = _DEFAULT_QUEUE_SIZE, 415 ) -> AsyncIterator["p4entity.P4DigestList"]: 416 "Async iterator for incoming digest lists (P4DigestList)." 417 LOGGER.debug("read_digests: opening queue: digest_id=%r", digest_id) 418 419 if digest_id in self._digest_queues: 420 raise ValueError(f"queue for digest_id {digest_id!r} already open") 421 422 queue = Queue[p4entity.P4DigestList](queue_size) 423 self._digest_queues[digest_id] = queue 424 try: 425 while True: 426 yield await queue.get() 427 finally: 428 LOGGER.debug("read_digests: closing queue: digest_id=%r", digest_id) 429 del self._digest_queues[digest_id]
Async iterator for incoming digest lists (P4DigestList).
431 async def read_idle_timeouts( 432 self, 433 *, 434 queue_size: int = _DEFAULT_QUEUE_SIZE, 435 ) -> AsyncIterator["p4entity.P4IdleTimeoutNotification"]: 436 "Async iterator for incoming idle timeouts (P4IdleTimeoutNotification)." 437 LOGGER.debug("read_idle_timeouts: opening queue") 438 439 if self._timeout_queue is not None: 440 raise ValueError("timeout queue already open") 441 442 queue = Queue[p4entity.P4IdleTimeoutNotification](queue_size) 443 self._timeout_queue = queue 444 try: 445 while True: 446 yield await queue.get() 447 finally: 448 LOGGER.debug("read_idle_timeouts: closing queue") 449 self._timeout_queue = None
Async iterator for incoming idle timeouts (P4IdleTimeoutNotification).
451 async def write( 452 self, 453 entities: Iterable[p4entity.P4UpdateList], 454 *, 455 strict: bool = True, 456 warn_only: bool = False, 457 ) -> None: 458 """Write updates and stream messages to the switch. 459 460 If `strict` is False, MODIFY and DELETE operations will NOT raise an 461 error if the entity does not exist (NOT_FOUND). 462 463 If `warn_only` is True, no operations will raise an error. Instead, 464 the exception will be logged as a WARNING and the method will return 465 normally. 466 """ 467 assert self._p4client is not None 468 469 if not entities: 470 return 471 472 msgs = p4entity.encode_updates(entities, self.p4info) 473 474 updates: list[p4r.Update] = [] 475 for msg in msgs: 476 if isinstance(msg, p4r.StreamMessageRequest): 477 # StreamMessageRequests are transmitted immediately. 478 # TODO: Understand what happens with backpressure? 479 await self._p4client.send(msg) 480 else: 481 updates.append(msg) 482 483 if updates: 484 await self._write_request(updates, strict, warn_only)
Write updates and stream messages to the switch.
If strict
is False, MODIFY and DELETE operations will NOT raise an
error if the entity does not exist (NOT_FOUND).
If warn_only
is True, no operations will raise an error. Instead,
the exception will be logged as a WARNING and the method will return
normally.
486 async def insert( 487 self, 488 entities: Iterable[p4entity.P4EntityList], 489 *, 490 warn_only: bool = False, 491 ) -> None: 492 """Insert the specified entities. 493 494 If `warn_only` is True, errors will be logged as warnings instead of 495 raising an exception. 496 """ 497 if entities: 498 await self._write_request( 499 [ 500 p4r.Update(type=p4r.Update.INSERT, entity=ent) 501 for ent in p4entity.encode_entities(entities, self.p4info) 502 ], 503 True, 504 warn_only, 505 )
Insert the specified entities.
If warn_only
is True, errors will be logged as warnings instead of
raising an exception.
507 async def modify( 508 self, 509 entities: Iterable[p4entity.P4EntityList], 510 *, 511 strict: bool = True, 512 warn_only: bool = False, 513 ) -> None: 514 """Modify the specified entities. 515 516 If `strict` is False, NOT_FOUND errors will be ignored. 517 518 If `warn_only` is True, errors will be logged as warnings instead of 519 raising an exception. 520 """ 521 if entities: 522 await self._write_request( 523 [ 524 p4r.Update(type=p4r.Update.MODIFY, entity=ent) 525 for ent in p4entity.encode_entities(entities, self.p4info) 526 ], 527 strict, 528 warn_only, 529 )
Modify the specified entities.
If strict
is False, NOT_FOUND errors will be ignored.
If warn_only
is True, errors will be logged as warnings instead of
raising an exception.
531 async def delete( 532 self, 533 entities: Iterable[p4entity.P4EntityList], 534 *, 535 strict: bool = True, 536 warn_only: bool = False, 537 ) -> None: 538 """Delete the specified entities. 539 540 If `strict` is False, NOT_FOUND errors will be ignored. 541 542 If `warn_only` is True, errors will be logged as warnings instead of 543 raising an exception. 544 """ 545 if entities: 546 await self._write_request( 547 [ 548 p4r.Update(type=p4r.Update.DELETE, entity=ent) 549 for ent in p4entity.encode_entities(entities, self.p4info) 550 ], 551 strict, 552 warn_only, 553 )
Delete the specified entities.
If strict
is False, NOT_FOUND errors will be ignored.
If warn_only
is True, errors will be logged as warnings instead of
raising an exception.
555 async def delete_all(self) -> None: 556 """Delete all entities if no parameter is passed. Otherwise, delete 557 items that match `entities`. 558 559 This method does not attempt to delete entries in const tables. 560 561 TODO: This method does not affect indirect counters, meters or 562 value_sets. 563 """ 564 await self.delete_many( 565 [ 566 p4entity.P4TableEntry(), 567 p4entity.P4MulticastGroupEntry(), 568 p4entity.P4CloneSessionEntry(), 569 ] 570 ) 571 572 # Reset all default table entries. 573 default_entries = [ 574 p4entity.P4TableEntry(table.alias, is_default_action=True) 575 for table in self.p4info.tables 576 if table.const_default_action is None and table.action_profile is None 577 ] 578 if default_entries: 579 await self.modify(default_entries) 580 581 # Delete all P4ActionProfileGroup's and P4ActionProfileMember's. 582 # We do this after deleting the P4TableEntry's in case a client is using 583 # "one-shot" references; these are incompatible with separate 584 # action profiles. 585 await self.delete_many( 586 [ 587 p4entity.P4ActionProfileGroup(), 588 p4entity.P4ActionProfileMember(), 589 ] 590 ) 591 592 # Delete DigestEntry separately. Wildcard reads are not supported. 593 digest_entries = [ 594 p4entity.P4DigestEntry(digest.alias) for digest in self.p4info.digests 595 ] 596 if digest_entries: 597 await self.delete(digest_entries, strict=False)
Delete all entities if no parameter is passed. Otherwise, delete
items that match entities
.
This method does not attempt to delete entries in const tables.
TODO: This method does not affect indirect counters, meters or value_sets.
599 async def delete_many(self, entities: Iterable[p4entity.P4EntityList]) -> None: 600 """Delete entities that match a wildcard read. 601 602 This method always skips over entries in const tables. It is an error 603 to attempt to delete those. 604 """ 605 assert self._p4client is not None 606 607 request = p4r.ReadRequest( 608 device_id=self.device_id, 609 entities=p4entity.encode_entities(entities, self.p4info), 610 ) 611 612 # Compute set of all const table ID's (may be empty). 613 to_skip = {table.id for table in self.p4info.tables if table.is_const} 614 615 async for reply in self._p4client.request_iter(request): 616 if reply.entities: 617 if to_skip: 618 await self.delete( 619 reply 620 for reply in reply.entities 621 if reply.HasField("table_entry") 622 and reply.table_entry.table_id not in to_skip 623 ) 624 else: 625 await self.delete(reply.entities)
Delete entities that match a wildcard read.
This method always skips over entries in const tables. It is an error to attempt to delete those.
627 async def run(self) -> None: 628 "Run the switch's lifecycle repeatedly." 629 assert self._p4client is None 630 assert self._tasks is None 631 632 self._tasks = SwitchTasks(self._options.fail_fast) 633 self._p4client = P4Client(self._address, self._options.channel_credentials) 634 self._switch_start() 635 636 try: 637 while True: 638 # If the switch fails and restarts too quickly, slow it down. 639 async with _throttle_failure(): 640 self.create_task(self._run(), background=True) 641 await self._tasks.wait() 642 self._arbitrator.reset() 643 644 finally: 645 self._p4client = None 646 self._tasks = None 647 self._switch_stop()
Run the switch's lifecycle repeatedly.
649 def create_task( 650 self, 651 coro: Coroutine[Any, Any, _T], 652 *, 653 background: bool = False, 654 name: str | None = None, 655 ) -> asyncio.Task[_T]: 656 "Create an asyncio task tied to the Switch's lifecycle." 657 assert self._tasks is not None 658 659 return self._tasks.create_task( 660 coro, 661 switch=self, 662 background=background, 663 name=name, 664 )
Create an asyncio task tied to the Switch's lifecycle.
722 async def __aenter__(self) -> Self: 723 "Similar to run() but provides a one-time context manager interface." 724 assert self._p4client is None 725 assert self._tasks is None 726 727 self._tasks = SwitchTasks(self._options.fail_fast) 728 self._p4client = P4Client( 729 self._address, 730 self._options.channel_credentials, 731 wait_for_ready=False, 732 ) 733 self._switch_start() 734 735 try: 736 # Start the switch's `_run` task in the background. Then, wait for 737 # `_run` task to fire the CHANNEL_READY event. If the `_run` task 738 # cannot connect or fails in some other way, it will finish before 739 # the `ready` future. We need to handle the error in this case. 740 741 run = self.create_task(self._run(), background=True) 742 ready = self.ee.event_future(SwitchEvent.CHANNEL_READY) 743 done, _ = await asyncio.wait( 744 [run, ready], return_when=asyncio.FIRST_COMPLETED 745 ) 746 if run in done: 747 await run 748 749 except BaseException: 750 await self.__aexit__(None, None, None) 751 raise 752 753 return self
Similar to run() but provides a one-time context manager interface.
1094class SwitchEvent(str, enum.Enum): 1095 "Events for Switch class." 1096 1097 CONTROLLER_ENTER = "controller_enter" # (switch) 1098 CONTROLLER_LEAVE = "controller_leave" # (switch) 1099 SWITCH_START = "switch_start" # (switch) 1100 SWITCH_STOP = "switch_stop" # (switch) 1101 CHANNEL_UP = "channel_up" # (switch) 1102 CHANNEL_DOWN = "channel_down" # (switch) 1103 CHANNEL_READY = "channel_ready" # (switch) 1104 BECOME_PRIMARY = "become_primary" # (switch) 1105 BECOME_BACKUP = "become_backup" # (switch) 1106 PORT_UP = "port_up" # (switch, port) 1107 PORT_DOWN = "port_down" # (switch, port) 1108 STREAM_ERROR = "stream_error" # (switch, p4r.StreamMessageResponse)
Events for Switch class.
70@final 71@dataclasses.dataclass(frozen=True) 72class SwitchOptions: 73 """Represents the configuration options for a `Switch`. 74 75 ``` 76 opts = SwitchOptions( 77 p4info=Path("basic.p4info.txtpb"), 78 p4blob=Path("basic.json"), 79 ready_handler=on_ready, 80 ) 81 ``` 82 83 Each `SwitchOptions` object is immutable and may be shared by multiple 84 switches. You should treat all values as read-only. 85 86 You can use function call syntax to return a copy of a `SwitchOptions` with 87 one or more propertise altered. 88 89 ``` 90 new_opts = opts(device_id=6) 91 ``` 92 """ 93 94 p4info: Path | None = None 95 "Path to P4Info protobuf text file." 96 97 p4blob: Path | SupportsBytes | None = None 98 "Path to P4Blob file, or an object that can provide the bytes value." 99 100 p4force: bool = False 101 "If true, always load the P4 program after initial handshake." 102 103 device_id: int = 1 104 "Default P4Runtime device ID." 105 106 initial_election_id: int = 10 107 "Initial P4Runtime election ID." 108 109 channel_credentials: GRPCCredentialsTLS | None = None 110 "P4Runtime channel credentials. Used for TLS support." 111 112 role_name: str = "" 113 "P4Runtime role configuration." 114 115 role_config: pbutil.PBMessage | None = None 116 "P4Runtime role configuration." 117 118 ready_handler: Callable[["Switch"], Coroutine[Any, Any, None]] | None = None 119 "Ready handler async function callback." 120 121 fail_fast: bool = False 122 """If true, log switch errors as CRITICAL and immediately abort when the 123 switch is running in a Controller.""" 124 125 def __call__(self, **kwds: Any) -> Self: 126 return dataclasses.replace(self, **kwds)
Represents the configuration options for a Switch
.
opts = SwitchOptions(
p4info=Path("basic.p4info.txtpb"),
p4blob=Path("basic.json"),
ready_handler=on_ready,
)
Each SwitchOptions
object is immutable and may be shared by multiple
switches. You should treat all values as read-only.
You can use function call syntax to return a copy of a SwitchOptions
with
one or more propertise altered.
new_opts = opts(device_id=6)
Path to P4Blob file, or an object that can provide the bytes value.
43@dataclass 44class SwitchPort: 45 "Represents a switch port." 46 47 id: int 48 name: str 49 oper_status: OperStatus = OperStatus.UNKNOWN 50 51 @property 52 def up(self) -> bool: 53 "Return true if port is basically up." 54 return self.oper_status == OperStatus.UP
Represents a switch port.
57class SwitchPortList: 58 "Represents a list of switch ports." 59 60 _ports: dict[str, SwitchPort] 61 _subscription: GNMISubscription | None = None 62 63 def __init__(self): 64 self._ports = {} 65 66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key] 69 70 def __len__(self) -> int: 71 "Return number of switch ports." 72 return len(self._ports) 73 74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values()) 77 78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist") 88 89 async def listen(self, switch: "_sw.Switch | None" = None) -> None: 90 "Listen for switch port updates." 91 assert self._subscription is not None 92 93 async for update in self._subscription.updates(): 94 self._update(update, switch) 95 96 def close(self) -> None: 97 "Close the switch port subscription." 98 if self._subscription is not None: 99 self._subscription.cancel() 100 self._subscription = None 101 self._ports = {} 102 103 async def _get_ports(self, client: GNMIClient) -> dict[str, SwitchPort]: 104 "Retrieve ID and name of each port." 105 ports: dict[str, SwitchPort] = {} 106 107 result = await client.get(_ifIndex) 108 for update in result: 109 path = update.path 110 assert path.last == _ifIndex.last 111 112 port = SwitchPort(update.value, path["name"]) 113 ports[port.name] = port 114 115 return ports 116 117 async def _get_subscription(self, client: GNMIClient) -> GNMISubscription: 118 sub = client.subscribe() 119 120 # Subscribe to change notifications. 121 for port in self._ports.values(): 122 sub.on_change(_ifOperStatus.set(name=port.name)) 123 124 # Synchronize initial settings for ports. 125 async for update in sub.synchronize(): 126 self._update(update, None) 127 128 return sub 129 130 def _update(self, update: GNMIUpdate, switch: "_sw.Switch | None"): 131 path = update.path 132 if path.last == _ifOperStatus.last: 133 status = OperStatus(update.value) 134 self._update_port(path["name"], status, switch) 135 else: 136 LOGGER.warning(f"PortList: unknown gNMI path: {path}") 137 138 def _update_port(self, name: str, status: OperStatus, switch: "_sw.Switch | None"): 139 port = self._ports[name] 140 141 prev_up = port.up 142 port.oper_status = status 143 curr_up = port.up 144 145 if switch is not None and curr_up != prev_up: 146 if curr_up: 147 switch.ee.emit(_sw.SwitchEvent.PORT_UP, switch, port) 148 else: 149 switch.ee.emit(_sw.SwitchEvent.PORT_DOWN, switch, port)
Represents a list of switch ports.
66 def __getitem__(self, key: str) -> SwitchPort: 67 "Retrieve interface by ID." 68 return self._ports[key]
Retrieve interface by ID.
74 def __iter__(self) -> Iterator[SwitchPort]: 75 "Iterate over switch ports." 76 return iter(self._ports.values())
Iterate over switch ports.
78 async def subscribe(self, client: GNMIClient) -> None: 79 """Obtain the initial list of ports and subscribe to switch port status 80 updates using GNMI.""" 81 assert self._subscription is None 82 83 self._ports = await self._get_ports(client) 84 if self._ports: 85 self._subscription = await self._get_subscription(client) 86 else: 87 LOGGER.warning("No switch ports exist")
Obtain the initial list of ports and subscribe to switch port status updates using GNMI.
119class GNMIClient: 120 """Async GNMI client. 121 122 This client implements `get`, `set`, `subscribe` and `capabilities`. 123 124 The API depends on the protobuf definition of `gnmi.TypedValue`. 125 126 Get usage: 127 ``` 128 client = GNMIClient('127.0.0.1:9339') 129 await client.open() 130 131 path = GNMIPath("interfaces/interface") 132 async for update in client.get(path): 133 print(update) 134 ``` 135 136 Subscribe usage: 137 ``` 138 path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status") 139 sub = client.subscribe() 140 sub.on_change(path) 141 142 async for initial_state in sub.synchronize(): 143 print(initial_state) 144 145 async for update in sub.updates(): 146 print(update) 147 ``` 148 149 Set usage: 150 ``` 151 enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled") 152 153 await client.set(update={ 154 enabled: gnmi.TypedValue(boolValue=True), 155 }) 156 ``` 157 """ 158 159 _address: str 160 _credentials: GRPCCredentialsTLS | None 161 _channel: grpc.aio.Channel | None = None 162 _stub: gnmi_grpc.gNMIStub | None = None 163 _channel_reused: bool = False 164 165 def __init__( 166 self, 167 address: str, 168 credentials: GRPCCredentialsTLS | None = None, 169 ): 170 self._address = address 171 self._credentials = credentials 172 173 async def __aenter__(self) -> Self: 174 await self.open() 175 return self 176 177 async def __aexit__(self, *_args: Any) -> bool | None: 178 await self.close() 179 180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel) 205 206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False 216 217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result 255 256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix) 292 293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply 311 312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp 374 375 def _log_msg(self, msg: "pbutil.PBMessage"): 376 "Log a gNMI message." 377 pbutil.log_msg(self._channel, msg, None)
Async GNMI client.
This client implements get
, set
, subscribe
and capabilities
.
The API depends on the protobuf definition of gnmi.TypedValue
.
Get usage:
client = GNMIClient('127.0.0.1:9339')
await client.open()
path = GNMIPath("interfaces/interface")
async for update in client.get(path):
print(update)
Subscribe usage:
path = GNMIPath("interfaces/interface[name=eth1]/state/oper-status")
sub = client.subscribe()
sub.on_change(path)
async for initial_state in sub.synchronize():
print(initial_state)
async for update in sub.updates():
print(update)
Set usage:
enabled = GNMIPath("interfaces/interface[name=eth1]/config/enabled")
await client.set(update={
enabled: gnmi.TypedValue(boolValue=True),
})
180 async def open( 181 self, 182 *, 183 channel: grpc.aio.Channel | None = None, 184 ) -> None: 185 """Open the client channel. 186 187 Note: This method is `async` for forward-compatible reasons. 188 """ 189 if self._channel is not None: 190 raise RuntimeError("GNMIClient: client is already open") 191 192 assert self._stub is None 193 194 if channel is not None: 195 self._channel = channel 196 self._channel_reused = True 197 else: 198 self._channel = grpc_channel( 199 self._address, 200 credentials=self._credentials, 201 client_type="GNMIClient", 202 ) 203 204 self._stub = gnmi_grpc.gNMIStub(self._channel)
Open the client channel.
Note: This method is async
for forward-compatible reasons.
206 async def close(self) -> None: 207 "Close the client channel." 208 if self._channel is not None: 209 if not self._channel_reused: 210 LOGGER.debug("GNMIClient: close channel %r", self._address) 211 await self._channel.close() 212 213 self._channel = None 214 self._stub = None 215 self._channel_reused = False
Close the client channel.
217 async def get( 218 self, 219 *path: GNMIPath, 220 prefix: GNMIPath | None = None, 221 config: bool = False, 222 ) -> Sequence[GNMIUpdate]: 223 "Retrieve value(s) using a GetRequest." 224 if self._stub is None: 225 raise RuntimeError("GNMIClient: client is not open") 226 227 request = gnmi.GetRequest( 228 path=(i.path for i in path), 229 encoding=gnmi.Encoding.PROTO, 230 ) 231 232 if prefix is not None: 233 request.prefix.CopyFrom(prefix.path) 234 235 if config: 236 request.type = gnmi.GetRequest.CONFIG 237 238 self._log_msg(request) 239 try: 240 reply = cast( 241 gnmi.GetResponse, 242 await self._stub.Get(request), 243 ) 244 except grpc.RpcError as ex: 245 raise GNMIClientError(ex) from None 246 247 self._log_msg(reply) 248 249 result: list[GNMIUpdate] = [] 250 for notification in reply.notification: 251 for update in _read_updates(notification): 252 result.append(update) 253 254 return result
Retrieve value(s) using a GetRequest.
256 def subscribe( 257 self, 258 *, 259 prefix: GNMIPath | None = None, 260 ) -> "GNMISubscription": 261 """Subscribe to gNMI change notifications. 262 263 Usage: 264 ``` 265 sub = client.subscribe() 266 sub.on_change(path1, ...) 267 sub.sample(path3, path4, sample_interval=1000000000) 268 269 async for update in sub.synchronize(): 270 # do something with initial state 271 272 async for update in sub.updates(): 273 # do something with updates 274 ``` 275 276 You can also subscribe in "ONCE" mode: 277 ``` 278 sub = client.subscribe() 279 sub.once(path1, path2, ...) 280 281 async for info in sub.synchronize(): 282 # do something with info 283 ``` 284 285 The subscription object is not re-entrant, but a fully consumed 286 subscription may be reused. 287 """ 288 if self._stub is None: 289 raise RuntimeError("GNMIClient: client is not open") 290 291 return GNMISubscription(self, prefix)
Subscribe to gNMI change notifications.
Usage:
sub = client.subscribe()
sub.on_change(path1, ...)
sub.sample(path3, path4, sample_interval=1000000000)
async for update in sub.synchronize():
# do something with initial state
async for update in sub.updates():
# do something with updates
You can also subscribe in "ONCE" mode:
sub = client.subscribe()
sub.once(path1, path2, ...)
async for info in sub.synchronize():
# do something with info
The subscription object is not re-entrant, but a fully consumed subscription may be reused.
293 async def capabilities(self) -> gnmi.CapabilityResponse: 294 "Issue a CapabilitiesRequest." 295 if self._stub is None: 296 raise RuntimeError("GNMIClient: client is not open") 297 298 request = gnmi.CapabilityRequest() 299 300 self._log_msg(request) 301 try: 302 reply = cast( 303 gnmi.CapabilityResponse, 304 await self._stub.Capabilities(request), 305 ) 306 except grpc.RpcError as ex: 307 raise GNMIClientError(ex) from None 308 309 self._log_msg(reply) 310 return reply
Issue a CapabilitiesRequest.
312 async def set( 313 self, 314 *, 315 update: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 316 replace: Sequence[tuple[GNMIPath, GNMISetValueType]] | None = None, 317 delete: Sequence[GNMIPath] | None = None, 318 prefix: GNMIPath | None = None, 319 ) -> int: 320 """Set value(s) using SetRequest. 321 322 Returns the timestamp from the successful `SetResponse`. 323 """ 324 if self._stub is None: 325 raise RuntimeError("GNMIClient: client is not open") 326 327 if update is not None: 328 updates = [gnmi_update(path, value) for path, value in update] 329 else: 330 updates = None 331 332 if replace is not None: 333 replaces = [gnmi_update(path, value) for path, value in replace] 334 else: 335 replaces = None 336 337 if delete is not None: 338 deletes = [path.path for path in delete] 339 else: 340 deletes = None 341 342 request = gnmi.SetRequest( 343 update=updates, 344 replace=replaces, 345 delete=deletes, 346 ) 347 348 if prefix is not None: 349 request.prefix.CopyFrom(prefix.path) 350 351 self._log_msg(request) 352 try: 353 reply = cast( 354 gnmi.SetResponse, 355 await self._stub.Set(request), 356 ) 357 except grpc.RpcError as ex: 358 raise GNMIClientError(ex) from None 359 360 self._log_msg(reply) 361 362 # According to the comments in the current protobuf, I expect all error 363 # results to be raised as an exception. The only useful value in a 364 # successful response is the timestamp. 365 366 if reply.HasField("message"): 367 raise NotImplementedError("SetResponse error not supported") 368 369 for result in reply.response: 370 if result.HasField("message"): 371 raise NotImplementedError("SetResponse suberror not supported") 372 373 return reply.timestamp
Set value(s) using SetRequest.
Returns the timestamp from the successful SetResponse
.
25class GNMIPath: 26 """Concrete class for working with `gnmi.Path` objects. 27 28 A `GNMIPath` should be treated as an immutable object. You can access 29 the wrapped `gnmi.Path` protobuf class using the `.path` property. 30 `GNMIPath` objects are hashable, but you must be careful that you do not 31 mutate them via the underlying `gnmi.Path`. 32 33 You can construct a GNMIPath from a string: 34 ``` 35 path = GNMIPath("interfaces/interface[name=eth1]/state") 36 ``` 37 38 You can construct a GNMIPath object from a `gnmi.Path` directly. 39 ``` 40 path = GNMIPath(update.path) 41 ``` 42 43 You can create paths by using an existing path as a template, without 44 modifying the original path. Use the `set` method: 45 ``` 46 operStatus = GNMIPath("interfaces/interface/state/oper-status") 47 path = operStatus.set("interface", name="eth1") 48 ``` 49 50 Use [] to access the name/key of path elements: 51 ``` 52 path[1] == "interface" 53 path["interface", "name"] == "eth1" 54 path["name"] == "eth1" 55 ``` 56 """ 57 58 path: gnmi.Path 59 "The underlying `gnmi.Path` protobuf representation." 60 61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path 83 84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name 88 89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name 93 94 @property 95 def origin(self) -> str: 96 "Return the path's origin." 97 return self.path.origin 98 99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target 103 104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result 119 120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path)) 123 124 @overload 125 def __getitem__( 126 self, key: int | str | tuple[int | str, str] 127 ) -> str: ... # pragma: no cover 128 129 @overload 130 def __getitem__(self, key: slice) -> "GNMIPath": ... # pragma: no cover 131 132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}") 156 157 def __eq__(self, rhs: Any) -> bool: 158 "Return True if path's are equal." 159 if not isinstance(rhs, GNMIPath): 160 return False 161 return self.path == rhs.path # pyright: ignore[reportUnknownVariableType] 162 163 def __hash__(self) -> int: 164 "Return hash of path." 165 return hash(tuple(_to_tuple(elem) for elem in self.path.elem)) 166 167 def __contains__(self, name: str) -> bool: 168 "Return True if element name is in path." 169 for elem in self.path.elem: 170 if elem.name == name: 171 return True 172 return False 173 174 def __repr__(self) -> str: 175 "Return string representation of path." 176 path = gnmistring.to_str(self.path) 177 if self.target or self.origin: 178 return f"GNMIPath({path!r}, origin={self.origin!r}, target={self.target!r})" 179 return f"GNMIPath({path!r})" 180 181 def __str__(self) -> str: 182 "Return path as string." 183 return gnmistring.to_str(self.path) 184 185 def __len__(self) -> int: 186 "Return the length of the path." 187 return len(self.path.elem) 188 189 def __truediv__(self, rhs: "GNMIPath | str") -> "GNMIPath": 190 "Append values to end of path." 191 if not isinstance(rhs, GNMIPath): 192 rhs = GNMIPath(rhs) 193 194 result = gnmi.Path( 195 elem=itertools.chain(self.path.elem, rhs.path.elem), 196 origin=self.origin, 197 target=self.target, 198 ) 199 return GNMIPath(result) 200 201 def __rtruediv__(self, lhs: str) -> "GNMIPath": 202 "Prepend values to the beginning of the path." 203 path = GNMIPath(lhs) 204 205 result = gnmi.Path(elem=itertools.chain(path.path.elem, self.path.elem)) 206 return GNMIPath(result) 207 208 def _slice( 209 self, start: int | None, stop: int | None, step: int | None 210 ) -> "GNMIPath": 211 "Return specified slice of GNMIPath." 212 if start is None: 213 start = 0 214 215 if stop is None: 216 stop = len(self) 217 elif stop < 0: 218 stop = len(self) + stop 219 220 if step is None: 221 step = 1 222 223 path = gnmi.Path() 224 for i in range(start, stop, step): 225 elem = self.path.elem[i] 226 path.elem.append(gnmi.PathElem(name=elem.name, key=elem.key)) 227 228 return GNMIPath(path) 229 230 def _rekey(self, keys: dict[str, Any]) -> "GNMIPath": 231 """Construct a new path with specified keys replaced.""" 232 if not keys: 233 raise ValueError("empty keys") 234 235 result = self.copy() 236 237 found = False 238 for elem in result.path.elem: 239 for key, value in keys.items(): 240 if key in elem.key: 241 elem.key[key] = str(value) 242 found = True 243 244 if not found: 245 raise ValueError(f"no keys found in path: {keys!r}") 246 247 return result
Concrete class for working with gnmi.Path
objects.
A GNMIPath
should be treated as an immutable object. You can access
the wrapped gnmi.Path
protobuf class using the .path
property.
GNMIPath
objects are hashable, but you must be careful that you do not
mutate them via the underlying gnmi.Path
.
You can construct a GNMIPath from a string:
path = GNMIPath("interfaces/interface[name=eth1]/state")
You can construct a GNMIPath object from a gnmi.Path
directly.
path = GNMIPath(update.path)
You can create paths by using an existing path as a template, without
modifying the original path. Use the set
method:
operStatus = GNMIPath("interfaces/interface/state/oper-status")
path = operStatus.set("interface", name="eth1")
Use [] to access the name/key of path elements:
path[1] == "interface"
path["interface", "name"] == "eth1"
path["name"] == "eth1"
61 def __init__( 62 self, 63 path: "gnmi.Path | str | GNMIPath" = "", 64 *, 65 origin: str = "", 66 target: str = "", 67 ): 68 "Construct a `GNMIPath` from a string or `gnmi.Path`." 69 if isinstance(path, str): 70 path = gnmistring.parse(path) 71 elif isinstance(path, GNMIPath): 72 path = _copy_path(path.path) 73 74 assert isinstance(path, gnmi.Path) 75 76 if origin: 77 path.origin = origin 78 79 if target: 80 path.target = target 81 82 self.path = path
Construct a GNMIPath
from a string or gnmi.Path
.
84 @property 85 def first(self) -> str: 86 "Return the first element of the path." 87 return self.path.elem[0].name
Return the first element of the path.
89 @property 90 def last(self) -> str: 91 "Return the last element of the path." 92 return self.path.elem[-1].name
Return the last element of the path.
99 @property 100 def target(self) -> str: 101 "Return the path's target." 102 return self.path.target
Return the path's target.
104 def set(self, __elem: str | int | None = None, **kwds: Any) -> "GNMIPath": 105 "Construct a new GNMIPath with keys set for the given elem." 106 if __elem is None: 107 return self._rekey(kwds) 108 109 if isinstance(__elem, str): 110 elem = _find_index(__elem, self.path) 111 else: 112 elem = __elem 113 114 result = self.copy() 115 keys = result.path.elem[elem].key 116 keys.clear() 117 keys.update({key: str(val) for key, val in kwds.items()}) 118 return result
Construct a new GNMIPath with keys set for the given elem.
120 def copy(self) -> "GNMIPath": 121 "Return a copy of the path." 122 return GNMIPath(_copy_path(self.path))
Return a copy of the path.
132 def __getitem__( 133 self, 134 key: int | str | tuple[int | str, str] | slice, 135 ) -> "str | GNMIPath": 136 "Return the specified element or key value." 137 match key: 138 case int(idx): 139 return self.path.elem[idx].name 140 case str(name): 141 return _retrieve_key(name, self.path) 142 case (int(idx), str(k)): 143 result = self.path.elem[idx].key.get(k) 144 if result is None: 145 raise KeyError(k) 146 return result 147 case (str(name), str(k)): 148 result = self.path.elem[_find_index(name, self.path)].key.get(k) 149 if result is None: 150 raise KeyError(k) 151 return result 152 case slice() as s: 153 return self._slice(s.start, s.stop, s.step) 154 case other: # pyright: ignore[reportUnnecessaryComparison] 155 raise TypeError(f"invalid key type: {other!r}")
Return the specified element or key value.
385class GNMISubscription: 386 """Represents a gNMI subscription stream. 387 388 Returned from `GNMIClient.subscribe()`. 389 """ 390 391 _client: GNMIClient 392 _stream: _StreamTypeAlias | None 393 394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path) 403 404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub) 411 412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub) 422 423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub) 442 443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None 456 457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None 467 468 def cancel(self) -> None: 469 "Cancel the subscription." 470 if self._stream is not None: 471 self._stream.cancel() 472 self._stream = None 473 474 async def _subscribe(self): 475 assert self._stream is None 476 assert self._client._stub is not None 477 478 self._stream = cast( 479 _StreamTypeAlias, 480 self._client._stub.Subscribe(wait_for_ready=True), # type: ignore 481 ) 482 483 request = gnmi.SubscribeRequest(subscribe=self._sublist) 484 self._client._log_msg(request) 485 try: 486 await self._stream.write(request) 487 except grpc.RpcError as ex: 488 raise GNMIClientError(ex) from None 489 490 async def _read(self, stop_at_sync: bool) -> AsyncIterator[GNMIUpdate]: 491 assert self._stream is not None 492 493 while True: 494 msg = cast( 495 gnmi.SubscribeResponse, 496 await self._stream.read(), # type: ignore 497 ) 498 if msg is GRPC_EOF: 499 LOGGER.warning("gNMI _read: unexpected EOF") 500 return 501 502 self._client._log_msg(msg) 503 504 match msg.WhichOneof("response"): 505 case "update": 506 for update in _read_updates(msg.update): 507 yield update 508 case "sync_response": 509 if stop_at_sync: 510 if self._is_once(): 511 # FIXME? I expected Stratum to issue an EOF after 512 # the sync_response in "ONCE" mode, but it doesn't. 513 # For now, cancel the stream explictly. 514 self.cancel() 515 # Let grpc react to the cancellation immediately. 516 await asyncio.sleep(0) 517 return # All done! 518 LOGGER.warning("gNMI _read: ignored sync_response") 519 case other: 520 LOGGER.warning("gNMI _read: unexpected oneof: %s", other) 521 522 def _is_once(self) -> bool: 523 "Return true if the subscription is in ONCE mode." 524 return self._sublist.mode == gnmi.SubscriptionList.Mode.ONCE
Represents a gNMI subscription stream.
Returned from GNMIClient.subscribe()
.
394 def __init__(self, client: GNMIClient, prefix: GNMIPath | None = None): 395 "Initialize a GNMISubscription." 396 self._client = client 397 self._stream = None 398 self._sublist = gnmi.SubscriptionList( 399 mode=gnmi.SubscriptionList.Mode.STREAM, 400 ) 401 if prefix is not None: 402 self._sublist.prefix.CopyFrom(prefix.path)
Initialize a GNMISubscription.
404 def once(self, *paths: GNMIPath) -> None: 405 "Subscribe in `ONCE` mode to given paths." 406 self._sublist.mode = gnmi.SubscriptionList.Mode.ONCE 407 408 for path in paths: 409 sub = gnmi.Subscription(path=path.path) 410 self._sublist.subscription.append(sub)
Subscribe in ONCE
mode to given paths.
412 def on_change(self, *paths: GNMIPath) -> None: 413 "Subscribe in `ON_CHANGE` mode to given paths." 414 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 415 416 for path in paths: 417 sub = gnmi.Subscription( 418 path=path.path, 419 mode=gnmi.SubscriptionMode.ON_CHANGE, 420 ) 421 self._sublist.subscription.append(sub)
Subscribe in ON_CHANGE
mode to given paths.
423 def sample( 424 self, 425 *paths: GNMIPath, 426 sample_interval: int, 427 suppress_redundant: bool = False, 428 heartbeat_interval: int = 0, 429 ) -> None: 430 "Subscribe in `SAMPLE` mode to given paths." 431 assert self._sublist.mode == gnmi.SubscriptionList.Mode.STREAM 432 433 for path in paths: 434 sub = gnmi.Subscription( 435 path=path.path, 436 mode=gnmi.SubscriptionMode.SAMPLE, 437 sample_interval=sample_interval, 438 suppress_redundant=suppress_redundant, 439 heartbeat_interval=heartbeat_interval, 440 ) 441 self._sublist.subscription.append(sub)
Subscribe in SAMPLE
mode to given paths.
443 async def synchronize(self) -> AsyncIterator[GNMIUpdate]: 444 """Async iterator for initial subscription updates. 445 446 Retrieve all updates up to `sync_response` message. 447 """ 448 if self._stream is None: 449 await self._subscribe() 450 451 try: 452 async for result in self._read(True): 453 yield result 454 except grpc.RpcError as ex: 455 raise GNMIClientError(ex) from None
Async iterator for initial subscription updates.
Retrieve all updates up to sync_response
message.
457 async def updates(self) -> AsyncIterator[GNMIUpdate]: 458 "Async iterator for all remaining subscription updates." 459 if self._stream is None: 460 await self._subscribe() 461 462 try: 463 async for result in self._read(False): 464 yield result 465 except grpc.RpcError as ex: 466 raise GNMIClientError(ex) from None
Async iterator for all remaining subscription updates.
63@dataclass 64class GNMIUpdate: 65 "Represents a gNMI update returned from GNMIClient." 66 67 timestamp: int 68 path: GNMIPath 69 typed_value: gnmi.TypedValue | None 70 71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr) 81 82 def __repr__(self) -> str: 83 "Override repr to strip newline from end of `TypedValue`." 84 value = repr(self.typed_value).rstrip() 85 return f"GNMIUpdate(timestamp={self.timestamp!r}, path={self.path!r}, typed_value=`{value}`)"
Represents a gNMI update returned from GNMIClient.
71 @property 72 def value(self) -> Any: 73 "Return the value as a Python value." 74 if self.typed_value is None: 75 return None 76 77 attr = self.typed_value.WhichOneof("value") 78 if attr is None: 79 raise ValueError("typed_value is not set") 80 return getattr(self.typed_value, attr)
Return the value as a Python value.
117@dataclass(kw_only=True) 118class GRPCCredentialsTLS: 119 "GRPC channel credentials for Transport Layer Security (TLS)." 120 121 cacert: Path | bytes | None 122 """Certificate authority used to authenticate the certificate at the other 123 end of the connection.""" 124 125 cert: Path | bytes | None 126 "Certificate that identifies this side of the connection." 127 128 private_key: Path | bytes | None 129 "Private key associated with this side's certificate identity." 130 131 target_name_override: str = "" 132 "Override the target name used for TLS host name checking (useful for testing)." 133 134 call_credentials: grpc.AuthMetadataPlugin | None = None 135 """Optional GRPC call credentials for the client channel. Be aware that the 136 auth plugin's callback takes place in a different thread.""" 137 138 def to_client_credentials(self) -> grpc.ChannelCredentials: 139 "Create native SSL client credentials object." 140 root_certificates = _coerce_tls_path(self.cacert) 141 certificate_chain = _coerce_tls_path(self.cert) 142 private_key = _coerce_tls_path(self.private_key) 143 144 return self._compose_credentials( 145 grpc.ssl_channel_credentials( # pyright: ignore[reportUnknownMemberType] 146 root_certificates=root_certificates, 147 private_key=private_key, 148 certificate_chain=certificate_chain, 149 ) 150 ) 151 152 def to_server_credentials(self) -> grpc.ServerCredentials: 153 """Create native SSL server credentials object. 154 155 On the server side, we ignore the `call_credentials`. 156 """ 157 root_certificates = _coerce_tls_path(self.cacert) 158 certificate_chain = _coerce_tls_path(self.cert) 159 private_key = _coerce_tls_path(self.private_key) 160 161 return grpc.ssl_server_credentials( # pyright: ignore[reportUnknownMemberType] 162 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 163 root_certificates=root_certificates, 164 require_client_auth=True, 165 ) 166 167 def _compose_credentials( 168 self, channel_cred: grpc.ChannelCredentials 169 ) -> grpc.ChannelCredentials: 170 "Compose call credentials with channel credentials." 171 if not self.call_credentials: 172 return channel_cred 173 174 call_cred = ( 175 grpc.metadata_call_credentials( # pyright: ignore[reportUnknownMemberType] 176 self.call_credentials 177 ) 178 ) 179 return grpc.composite_channel_credentials( # pyright: ignore[reportUnknownMemberType] 180 channel_cred, 181 call_cred, 182 )
GRPC channel credentials for Transport Layer Security (TLS).
Certificate authority used to authenticate the certificate at the other end of the connection.
Private key associated with this side's certificate identity.
Override the target name used for TLS host name checking (useful for testing).
Optional GRPC call credentials for the client channel. Be aware that the auth plugin's callback takes place in a different thread.
138 def to_client_credentials(self) -> grpc.ChannelCredentials: 139 "Create native SSL client credentials object." 140 root_certificates = _coerce_tls_path(self.cacert) 141 certificate_chain = _coerce_tls_path(self.cert) 142 private_key = _coerce_tls_path(self.private_key) 143 144 return self._compose_credentials( 145 grpc.ssl_channel_credentials( # pyright: ignore[reportUnknownMemberType] 146 root_certificates=root_certificates, 147 private_key=private_key, 148 certificate_chain=certificate_chain, 149 ) 150 )
Create native SSL client credentials object.
152 def to_server_credentials(self) -> grpc.ServerCredentials: 153 """Create native SSL server credentials object. 154 155 On the server side, we ignore the `call_credentials`. 156 """ 157 root_certificates = _coerce_tls_path(self.cacert) 158 certificate_chain = _coerce_tls_path(self.cert) 159 private_key = _coerce_tls_path(self.private_key) 160 161 return grpc.ssl_server_credentials( # pyright: ignore[reportUnknownMemberType] 162 private_key_certificate_chain_pairs=[(private_key, certificate_chain)], 163 root_certificates=root_certificates, 164 require_client_auth=True, 165 )
Create native SSL server credentials object.
On the server side, we ignore the call_credentials
.
41class GRPCStatusCode(_EnumBase): 42 "IntEnum equivalent to `grpc.StatusCode`." 43 44 OK = rpc_code.OK 45 CANCELLED = rpc_code.CANCELLED 46 UNKNOWN = rpc_code.UNKNOWN 47 FAILED_PRECONDITION = rpc_code.FAILED_PRECONDITION 48 INVALID_ARGUMENT = rpc_code.INVALID_ARGUMENT 49 DEADLINE_EXCEEDED = rpc_code.DEADLINE_EXCEEDED 50 NOT_FOUND = rpc_code.NOT_FOUND 51 ALREADY_EXISTS = rpc_code.ALREADY_EXISTS 52 PERMISSION_DENIED = rpc_code.PERMISSION_DENIED 53 UNAUTHENTICATED = rpc_code.UNAUTHENTICATED 54 RESOURCE_EXHAUSTED = rpc_code.RESOURCE_EXHAUSTED 55 ABORTED = rpc_code.ABORTED 56 OUT_OF_RANGE = rpc_code.OUT_OF_RANGE 57 UNIMPLEMENTED = rpc_code.UNIMPLEMENTED 58 INTERNAL = rpc_code.INTERNAL 59 UNAVAILABLE = rpc_code.UNAVAILABLE 60 DATA_LOSS = rpc_code.DATA_LOSS 61 62 @classmethod 63 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 64 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 65 n: Any = val.value[0] # pyright: ignore[reportUnknownMemberType] 66 assert isinstance(n, int) 67 return GRPCStatusCode(n) 68 69 @staticmethod 70 def _validate_enum() -> None: 71 "Verify that GRPCStatusCode covers every possible grpc.StatusCode." 72 for value in grpc.StatusCode: 73 n: Any = value.value[0] # pyright: ignore[reportUnknownMemberType] 74 assert isinstance(n, int) 75 assert GRPCStatusCode[value.name].value == n, value.name
IntEnum equivalent to grpc.StatusCode
.
62 @classmethod 63 def from_status_code(cls, val: grpc.StatusCode) -> "GRPCStatusCode": 64 "Create corresponding GRPCStatusCode from a grpc.StatusCode object." 65 n: Any = val.value[0] # pyright: ignore[reportUnknownMemberType] 66 assert isinstance(n, int) 67 return GRPCStatusCode(n)
Create corresponding GRPCStatusCode from a grpc.StatusCode object.