diff --git a/README.md b/README.md
index fe3a311..30e00db 100644
--- a/README.md
+++ b/README.md
@@ -18,8 +18,6 @@ Use `pip` to install [the package from pypi](https://pypi.org/project/aioartnet/
$ python -m aioartnet.main
INFO:aioartnet:preferred interfaces: [(1, 'wlp4s0'), (10, 'br-ee82b9af434e'), (10, 'docker0'), (10, 'lo')]
INFO:aioartnet:using interface wlp4s0 with ip 192.168.1.205 broadcast ip 192.168.1.255
- INFO:aioartnet:configured own port Port
- INFO:aioartnet:configured own port Port
status:
ArtNetNode [Port , Port]
ArtNetNode [Port, Port]
@@ -31,7 +29,6 @@ Use `pip` to install [the package from pypi](https://pypi.org/project/aioartnet/
There is a built in command-line lighting console for quick Art-Net testing, which maps channels 1:1 to a single universe:
$ python -m aioartnet.console --universe 0:0:0
- INFO:aioartnet:configured own port Port
INFO:aioartnet:preferred interfaces: [(1, 'wlp4s0'), (10, 'br-ee82b9af434e'), (10, 'docker0'), (10, 'lo'), (10, 'lxdbr0')]
INFO:aioartnet:using interface wlp4s0 with ip 192.168.1.204 broadcast ip 192.168.1.255, listening on 0.0.0.0 our mac 4485009be628
> live on
diff --git a/aioartnet/__init__.py b/aioartnet/__init__.py
index b5044e4..a9ead4d 100644
--- a/aioartnet/__init__.py
+++ b/aioartnet/__init__.py
@@ -1,8 +1,25 @@
-from .aio_artnet import DMX_UNIVERSE_SIZE, ArtNetClient, ArtNetNode, ArtNetUniverse
+from .client import ArtNetClient
+from .events import (
+ NodeChanged,
+ NodeDiscovered,
+ NodeLost,
+ NodePortAdded,
+ NodePortRemoved,
+ UniverseDiscovered,
+ UniverseDMX,
+)
+from .models import DMX_UNIVERSE_SIZE, ArtNetNode, ArtNetUniverse
__all__ = [
"ArtNetClient",
"ArtNetUniverse",
"ArtNetNode",
"DMX_UNIVERSE_SIZE",
+ "UniverseDiscovered",
+ "UniverseDMX",
+ "NodeDiscovered",
+ "NodeLost",
+ "NodeChanged",
+ "NodePortAdded",
+ "NodePortRemoved",
]
diff --git a/aioartnet/aio_artnet.py b/aioartnet/client.py
similarity index 86%
rename from aioartnet/aio_artnet.py
rename to aioartnet/client.py
index 0b14f12..8c9c374 100644
--- a/aioartnet/aio_artnet.py
+++ b/aioartnet/client.py
@@ -5,9 +5,25 @@
import socket
import struct
import time
-from collections import defaultdict
-from typing import Any, Optional, Tuple, Union, cast
-
+from typing import Any, AsyncGenerator, Optional, Union, cast
+
+from .events import (
+ ArtNetEvent,
+ NodeChanged,
+ NodeDiscovered,
+ NodeLost,
+ NodePortAdded,
+ NodePortRemoved,
+ UniverseDiscovered,
+ UniverseDMX,
+)
+from .models import (
+ DMX_UNIVERSE_SIZE,
+ ArtNetNode,
+ ArtNetPort,
+ ArtNetUniverse,
+ DatagramAddr,
+)
from .network import AF_PACKET, getifaddrs
# Art-Net implementation for Python asyncio
@@ -21,7 +37,6 @@
ARTNET_PORT = 6454
ARTNET_PREFIX = bytes("Art-Net".encode() + b"\000")
-DMX_UNIVERSE_SIZE = 512
# We need to interrogate network interfaces to check which are configured for IP and
# have a valid broadcast address. For unix-like systems this is fone with socket fnctl
@@ -73,87 +88,6 @@ def swap16(x: int) -> int:
# Each set fixes a single net and sub_net value, however the choice
# of universe nibble is determined per-port (net:sub_net:universe).
-DGAddr = tuple[Union[str, Any], int]
-
-
-class ArtNetNode:
- def __init__(
- self,
- longName: str,
- portName: str,
- style: int,
- ip: str,
- udpport: int,
- ) -> None:
- self.portName = portName
- self.longName = longName
- self._portBinds: defaultdict[int, list[ArtNetPort]] = defaultdict(list)
- self.ports: list[ArtNetPort] = []
- self.style: int = style
- self.udpport = udpport
- self.ip = ip
- self.last_reply: float = 0.0
-
- def __repr__(self) -> str:
- return f"ArtNetNode<{self.portName},{self.ip}:{self.udpport}>"
-
-
-class ArtNetUniverse:
- def __init__(self, portaddress: int, client: "ArtNetClient"):
- if portaddress > 0x7FFF:
- raise ValueError("Invalid net:subnet:universe, as net>128")
- self.portaddress = portaddress
- self.publishers: list[ArtNetNode] = list()
- self.subscribers: list[ArtNetNode] = list()
- self.last_data = bytearray(DMX_UNIVERSE_SIZE)
- self._last_seq = 1
- self._last_publish: float = 0.0
- self.publisherseq: dict[Tuple[DGAddr, int], int] = {}
- self._client = client
-
- def split(self) -> Tuple[int, int, int]:
- # name net:sub_net:universe
- # bits 8:15 4:8 0:4
- net = self.portaddress >> 8
- sub_net = (self.portaddress >> 4) & 0x0F
- universe = self.portaddress & 0x0F
- return net, sub_net, universe
-
- def __repr__(self) -> str:
- net, sub_net, universe = self.split()
- return f"{net}:{sub_net}:{universe}"
-
- def set_dmx(self, data: bytes) -> None:
- assert len(data) == DMX_UNIVERSE_SIZE
- self.last_data[:] = data[:]
- self._client._send_dmx(self)
-
- def get_dmx(self) -> bytes:
- return self.last_data
-
-
-class ArtNetPort:
- def __init__(
- self,
- node: Union[ArtNetNode, "ArtNetClient"],
- is_input: bool,
- media: int,
- portaddr: int,
- universe: ArtNetUniverse,
- ):
- self.node = node
- self.is_input = is_input
- self.media = 0
- self.portaddr = portaddr
- self.universe = universe
-
- def __repr__(self) -> str:
- inout = {True: "Input", False: "Output"}[self.is_input]
- media = ["DMX", "MIDI", "Avab", "Colortran CMX", "ADB 62.5", "Art-Net", "DALI"][
- self.media
- ]
- return f"Port<{inout},{media},{self.universe}>"
-
class ArtNetClientProtocol(asyncio.DatagramProtocol):
def __init__(self, client: "ArtNetClient"):
@@ -174,7 +108,7 @@ def connection_made(self, transport: asyncio.BaseTransport) -> None:
if sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- def datagram_received(self, data: bytes, addr: DGAddr) -> None:
+ def datagram_received(self, data: bytes, addr: DatagramAddr) -> None:
if data[0:8] == ARTNET_PREFIX:
(opcode,) = struct.unpack("H", data[8:10])
h = self.handlers.get(opcode, None)
@@ -187,7 +121,7 @@ def datagram_received(self, data: bytes, addr: DGAddr) -> None:
else:
logger.debug(f"Received non Art-Net data from {addr}: {data!r}")
- def on_art_poll(self, addr: DGAddr, data: bytes) -> None:
+ def on_art_poll(self, addr: DatagramAddr, data: bytes) -> None:
ver, flags, priority = struct.unpack(" None:
)
self.send_art_poll_reply()
- def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None:
+ def on_art_poll_reply(self, addr: DatagramAddr, data: bytes) -> None:
# everything up to the mac address field is mandatory, the rest must be
# parsed only if it is sent (field at a time)
ip, port, fw, netsw, subsw, oemCode = struct.unpack(" None:
changed |= nn.style != style
logger.debug(f"change detection on {nn} => {changed}")
- # FIXME: what if a node changes ip address?
-
+ # if a node changes ip address, we will send NodeDiscovered, and then
+ # eventually the old node gets timed out NodeLost
if nn is None or changed:
newnode = ArtNetNode(
ip=f"{ipa}",
@@ -252,11 +186,10 @@ def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None:
portName=portName,
style=style,
)
- if changed:
- logger.debug(f"change detected: from {nn} to {newnode}")
-
- # TODO: make client.node observable dict and remove this method
- self.client.add_node(ip, newnode)
+ self.client.nodes[ip] = newnode
+ e = NodeChanged(node=newnode) if changed else NodeDiscovered(node=newnode)
+ logger.debug(f"new node created {nn} => {newnode} with {e}")
+ self.client._dispatch_event(e)
nn = newnode
nn.last_reply = time.time()
@@ -281,26 +214,32 @@ def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None:
# track which 'pages' of port bindings we have seen
old_ports = nn._portBinds[bindindex]
+
+ # print(f"{nn} desired bindindex={bindindex} of {portList}, existing {old_ports}")
for p in portList:
if p not in old_ports:
+ logging.debug(f"node {nn} added port {p}")
nn.ports.append(p)
nn._portBinds[bindindex].append(p)
{True: p.universe.publishers, False: p.universe.subscribers}[
p.is_input
].append(nn)
+ self.client._dispatch_event(NodePortAdded(node=nn, port=p))
for p in list(old_ports):
if p not in portList:
+ logging.debug(f"node {nn} removed port {p}")
nn.ports.remove(p)
nn._portBinds[bindindex].remove(p)
{True: p.universe.publishers, False: p.universe.subscribers}[
p.is_input
].remove(nn)
+ self.client._dispatch_event(NodePortRemoved(node=nn, port=p))
logger.debug(
f"Received Art-Net PollReply from {ip} fw {fw} portName {portName} longName: {longName} bindindex {bindindex} ports:{portList}"
)
- def on_art_dmx(self, addr: DGAddr, data: bytes) -> None:
+ def on_art_dmx(self, addr: DatagramAddr, data: bytes) -> None:
ver, seq, phys, sub, net, chlen = struct.unpack(" None:
# TODO: HTP/LTP merging with Merge Mode, see "Data Merging" spec p61
# See ArtAddress AcCancelMerge flags spec p39
# Only two sources are allowed to contribute to the values in the universe
- u.last_data[0:chlen] = data[8 : 8 + chlen]
+ channel_data = data[8 : 8 + chlen]
+ u.last_data[0:chlen] = channel_data
+
+ # if we are subscribed for this data, create UniverseDMX Event
+ if u in self.client._subscribing:
+ self.client._dispatch_event(UniverseDMX(u, channel_data))
async def art_poll_task(self) -> None:
while True:
@@ -352,6 +296,11 @@ async def art_poll_task(self) -> None:
if t > self._last_poll + 2.0:
self._send_art_poll()
+ for ip, node in list(self.client.nodes.items()):
+ if t > node.last_reply + 120:
+ self.client.nodes.pop(ip, None)
+ self.client._dispatch_event(NodeLost(node=node))
+
def _send_art_poll(self) -> None:
self._last_poll = time.time()
self.node_report_counter = (self.node_report_counter + 1) % 10000
@@ -511,10 +460,13 @@ def __init__(
self.mac: bytes = b"\01\22\33\44\55\66"
self.protocol: Optional[ArtNetClientProtocol] = None
- self._publishing: list[ArtNetUniverse] = []
+ self._publishing: set[ArtNetUniverse] = set()
+ self._subscribing: set[ArtNetUniverse] = set()
self.interface: Optional[str] = interface
self._task: Optional[asyncio.Task[None]] = None
+ self._event_listeners: list[asyncio.Queue[ArtNetEvent]] = []
+
async def connect(self) -> asyncio.DatagramTransport:
loop = asyncio.get_running_loop()
@@ -569,9 +521,6 @@ def _send_dmx(self, universe: ArtNetUniverse) -> None:
def get_nodes(self) -> list[ArtNetNode]:
return list(self.nodes.values())
- def add_node(self, ip: int, node: ArtNetNode) -> None:
- self.nodes[ip] = node
-
def _parse_universe(self, universe: UniverseKey) -> int:
if isinstance(universe, str):
# parse to int
@@ -607,14 +556,14 @@ def set_port_config(
break
if port:
self.ports.remove(port)
- logger.info(f"removed own port {port}")
+ logger.debug(f"removed own port {port}")
if is_input or is_output:
port = ArtNetPort(
- node=self, is_input=is_input, media=0, portaddr=port_addr, universe=u
+ node=None, is_input=is_input, media=0, portaddr=port_addr, universe=u
)
self.ports.append(port)
- logger.info(f"configured own port {port}")
+ logger.debug(f"configured own port {port}")
# TODO: optimise the layour of self.ports to self._portBinds
# up to four ports with a common (net,sub-net) can be listed on the same page
@@ -625,10 +574,12 @@ def set_port_config(
self._portBinds = {1: []}
# used for the timer-based DMX repeating
- if u in self._publishing:
- self._publishing.remove(u)
+ self._publishing.discard(u)
+ self._subscribing.discard(u)
if is_input:
- self._publishing.append(u)
+ self._publishing.add(u)
+ if is_output:
+ self._subscribing.add(u)
if not self.passive and self.protocol:
self.protocol.send_art_poll_reply()
@@ -639,8 +590,25 @@ def _get_create_universe(self, port_addr: int) -> ArtNetUniverse:
if (u := self.universes.get(port_addr, None)) is None:
u = ArtNetUniverse(port_addr, self)
self.universes[port_addr] = u
+ self._dispatch_event(UniverseDiscovered(universe=u))
return u
+ # EVENT LISTENERS
+ async def events(self) -> AsyncGenerator[ArtNetEvent, None]:
+ q: asyncio.Queue[ArtNetEvent] = asyncio.Queue()
+ self._event_listeners.append(q)
+ try:
+ while True:
+ yield await q.get()
+ finally:
+ self._event_listeners.remove(q)
+
+ def _dispatch_event(self, event: ArtNetEvent) -> None:
+ for q in self._event_listeners:
+ q.put_nowait(event)
+ # for .on() .off() support
+ # text = event.text
+
# MUTABLE PROPERTIES
# if you need to change a lot of properties in bulk, set client.passive=True,
# make the required changes, then clear .passive
diff --git a/aioartnet/console.py b/aioartnet/console.py
index ca02660..a02d0b0 100644
--- a/aioartnet/console.py
+++ b/aioartnet/console.py
@@ -307,6 +307,12 @@ async def main(client: ArtNetClient, engine: Engine, interpreter: Interpreter) -
await client.connect()
await engine.start_ticking()
+ async def print_events() -> None:
+ async for event in client.events():
+ print(event)
+
+ t = asyncio.create_task(print_events())
+
# Run echo loop. Read text from stdin, and reply it back.
while True:
try:
@@ -324,6 +330,8 @@ async def main(client: ArtNetClient, engine: Engine, interpreter: Interpreter) -
break
except Exception:
traceback.print_exc(limit=-2)
+
+ t.cancel()
return None
diff --git a/aioartnet/events.py b/aioartnet/events.py
new file mode 100644
index 0000000..a2466b8
--- /dev/null
+++ b/aioartnet/events.py
@@ -0,0 +1,54 @@
+from dataclasses import dataclass, field
+from typing import Protocol
+
+from .models import ArtNetNode, ArtNetPort, ArtNetUniverse
+
+
+@dataclass
+class ArtNetEvent(Protocol):
+ text: str = field(init=False, repr=False)
+
+
+@dataclass
+class NodeDiscovered(ArtNetEvent):
+ node: ArtNetNode
+ text: str = field(init=False, repr=False, default="node-added")
+
+
+@dataclass
+class NodeLost(ArtNetEvent):
+ node: ArtNetNode
+ text: str = field(init=False, repr=False, default="node-removed")
+
+
+@dataclass
+class NodeChanged(ArtNetEvent):
+ node: ArtNetNode
+ text: str = field(init=False, repr=False, default="node-changed")
+
+
+@dataclass
+class NodePortAdded(ArtNetEvent):
+ node: ArtNetNode
+ port: ArtNetPort
+ text: str = field(init=False, repr=False, default="node-port-changed")
+
+
+@dataclass
+class NodePortRemoved(ArtNetEvent):
+ node: ArtNetNode
+ port: ArtNetPort
+ text: str = field(init=False, repr=False, default="node-port-changed")
+
+
+@dataclass
+class UniverseDiscovered(ArtNetEvent):
+ universe: ArtNetUniverse
+ text: str = field(init=False, repr=False, default="universe-added")
+
+
+@dataclass
+class UniverseDMX(ArtNetEvent):
+ universe: ArtNetUniverse
+ data: bytes
+ text: str = field(init=False, repr=False, default="universe-dmx")
diff --git a/aioartnet/main.py b/aioartnet/main.py
index a2f2458..dd4f653 100644
--- a/aioartnet/main.py
+++ b/aioartnet/main.py
@@ -2,23 +2,16 @@
import asyncio
import logging
-from . import ArtNetClient
+from . import (
+ ArtNetClient,
+)
async def main(client: ArtNetClient) -> None:
await client.connect()
- # u5 = client.set_port_config("0:0:5", isoutput=True)
-
- while True:
- await asyncio.sleep(5)
- print("nodes:")
- for n, node in client.nodes.items():
- print(f" {node!r: <60} {node.ports}")
- print("universes:")
- for univ in client.universes.values():
- print(f" {univ} pubs:{univ.publishers} subs:{univ.subscribers}")
-
- # print(u5.last_data[0:20].hex())
+
+ async for event in client.events():
+ print(event)
if __name__ == "__main__":
diff --git a/aioartnet/models.py b/aioartnet/models.py
new file mode 100644
index 0000000..0501d55
--- /dev/null
+++ b/aioartnet/models.py
@@ -0,0 +1,93 @@
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Optional, Protocol, Tuple, Union
+
+DMX_UNIVERSE_SIZE = 512
+
+DatagramAddr = tuple[Union[str, Any], int]
+
+
+class UnivActuator(Protocol):
+ def _send_dmx(self, univ: "ArtNetUniverse") -> None: ...
+
+
+class ArtNetNode:
+ def __init__(
+ self,
+ longName: str,
+ portName: str,
+ style: int,
+ ip: str,
+ udpport: int,
+ ) -> None:
+ self.portName = portName
+ self.longName = longName
+ self._portBinds: defaultdict[int, list[ArtNetPort]] = defaultdict(list)
+ self.ports: list[ArtNetPort] = []
+ self.style: int = style
+ self.udpport = udpport
+ self.ip = ip
+ self.last_reply: float = 0.0
+
+ def __repr__(self) -> str:
+ return f"ArtNetNode<{self.portName},{self.ip}:{self.udpport}>"
+
+
+class ArtNetUniverse:
+ def __init__(self, portaddress: int, client: UnivActuator):
+ if portaddress > 0x7FFF:
+ raise ValueError("Invalid net:subnet:universe, as net>128")
+ self.portaddress = portaddress
+ self.publishers: list[ArtNetNode] = list()
+ self.subscribers: list[ArtNetNode] = list()
+ self.last_data = bytearray(DMX_UNIVERSE_SIZE)
+ self._last_seq = 1
+ self._last_publish: float = 0.0
+ self.publisherseq: dict[Tuple[DatagramAddr, int], int] = {}
+ self._client = client
+
+ def split(self) -> Tuple[int, int, int]:
+ # name net:sub_net:universe
+ # bits 8:15 4:8 0:4
+ net = self.portaddress >> 8
+ sub_net = (self.portaddress >> 4) & 0x0F
+ universe = self.portaddress & 0x0F
+ return net, sub_net, universe
+
+ def __repr__(self) -> str:
+ net, sub_net, universe = self.split()
+ return f"{net}:{sub_net}:{universe}"
+
+ def set_dmx(self, data: bytes) -> None:
+ assert len(data) == DMX_UNIVERSE_SIZE
+ self.last_data[:] = data[:]
+ self._client._send_dmx(self)
+
+ def get_dmx(self) -> bytes:
+ return self.last_data
+
+ # eq/hash based on portaddress only
+ def __hash__(self) -> int:
+ return hash(self.portaddress)
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, self.__class__):
+ return self.portaddress == other.portaddress
+ else:
+ return False
+
+
+@dataclass
+class ArtNetPort:
+ node: Optional[ArtNetNode]
+ is_input: bool
+ media: int
+ portaddr: int
+ universe: ArtNetUniverse
+
+ def __repr__(self) -> str:
+ inout = {True: "Input", False: "Output"}[self.is_input]
+ media = ["DMX", "MIDI", "Avab", "Colortran CMX", "ADB 62.5", "Art-Net", "DALI"][
+ self.media
+ ]
+ return f"Port<{inout},{media},{self.universe}>"
diff --git a/test/test_aio_artnet.py b/test/test_aio_artnet.py
index ef320e5..368f8a6 100644
--- a/test/test_aio_artnet.py
+++ b/test/test_aio_artnet.py
@@ -1,3 +1,4 @@
+import asyncio
import socket
import struct
from asyncio import BaseTransport
@@ -11,7 +12,16 @@
ArtNetClient,
ArtNetUniverse,
)
-from aioartnet.aio_artnet import ArtNetClientProtocol, DGAddr
+from aioartnet.client import ArtNetClientProtocol
+from aioartnet.events import (
+ ArtNetEvent,
+ NodeChanged,
+ NodeDiscovered,
+ NodePortAdded,
+ UniverseDiscovered,
+ UniverseDMX,
+)
+from aioartnet.models import DatagramAddr
def test_universe() -> None:
@@ -50,12 +60,12 @@ def packet_reader(file: str) -> Iterator[Tuple[float, bytes]]:
class MockTransport(BaseTransport):
def __init__(self) -> None:
- self.sent: list[tuple[bytes, DGAddr]] = []
+ self.sent: list[tuple[bytes, DatagramAddr]] = []
def get_extra_info(self, name: str, default: Any = None) -> Any:
return None
- def sendto(self, data: bytes, addr: DGAddr) -> None:
+ def sendto(self, data: bytes, addr: DatagramAddr) -> None:
self.sent.append((data, addr))
@@ -150,7 +160,7 @@ def connect_protocol(self, protocol: ArtNetClientProtocol) -> None:
def get_extra_info(self, name: str, default: Any = None) -> Any:
return None
- def sendto(self, data: bytes, addr: DGAddr) -> None:
+ def sendto(self, data: bytes, addr: DatagramAddr) -> None:
self.pending.append((data, addr))
def drain(self) -> None:
@@ -160,6 +170,24 @@ def drain(self) -> None:
p.datagram_received(*msg)
+async def event_consumer(
+ client: ArtNetClient, received_events: list[ArtNetEvent]
+) -> None:
+ async for event in client.events():
+ received_events.append(event)
+ print(f"event_consumer got {event}")
+ print("Consumer task finished.")
+
+
+async def await_events(events: list[ArtNetEvent], count: int) -> list[ArtNetEvent]:
+ # async with asyncio.timeout(2):
+ while len(events) < count:
+ await asyncio.sleep(0)
+ ret = events[:count]
+ events[:count] = []
+ return ret
+
+
@pytest.mark.asyncio
async def test_artnet_back_to_back_nodes() -> None:
# use two instances of our client linked by a mock transport to test
@@ -172,6 +200,11 @@ async def test_artnet_back_to_back_nodes() -> None:
clB = ArtNetClient(interface="dummy", portName="bravo")
clB.broadcast_ip = "10.10.10.255"
clB.unicast_ip = "10.10.10.2"
+ events: list[ArtNetEvent] = []
+
+ consumer_task = asyncio.create_task(event_consumer(clB, events))
+ while len(clB._event_listeners) == 0:
+ await asyncio.sleep(0)
protoA = ArtNetClientProtocol(clA)
protoB = ArtNetClientProtocol(clB)
@@ -188,6 +221,10 @@ async def test_artnet_back_to_back_nodes() -> None:
== "[ArtNetNode, ArtNetNode]"
)
+ na, nb = await await_events(events, 2)
+ assert isinstance(na, NodeDiscovered)
+ assert isinstance(nb, NodeDiscovered)
+
# when a client has a property modified, it automatically sends an unsolicited PollReply
clB.portName = "charlie"
assert len(transport.pending) == 1
@@ -202,6 +239,35 @@ async def test_artnet_back_to_back_nodes() -> None:
== "[ArtNetNode, ArtNetNode]"
)
+ (nc,) = await await_events(events, 1)
+ assert isinstance(nc, NodeChanged)
+
+ u1p = clA.set_port_config("2:2:2", is_input=True)
+ u1s = clB.set_port_config("2:2:2", is_output=True)
+ transport.drain()
+
+ euniv, npa1, npa2 = await await_events(events, 3)
+ assert isinstance(euniv, UniverseDiscovered)
+ assert isinstance(npa1, NodePortAdded)
+ assert isinstance(npa2, NodePortAdded)
+
+ test_pattern = bytearray(512)
+ test_pattern[1] = 255
+ u1p.set_dmx(test_pattern)
+ transport.drain()
+
+ assert u1s.get_dmx() == test_pattern
+ (dmx1,) = await await_events(events, 1)
+ assert isinstance(dmx1, UniverseDMX)
+ assert dmx1.data == test_pattern
+
+ consumer_task.cancel()
+ try:
+ await consumer_task
+ except asyncio.CancelledError:
+ # expected
+ pass
+
@pytest.mark.asyncio
async def test_ports() -> None: