From 1bc31973ab1ad968f23d2fc1cdf9d3ec19d8e147 Mon Sep 17 00:00:00 2001 From: Chris Shucksmith Date: Wed, 25 Jun 2025 10:07:44 +0100 Subject: [PATCH 1/3] make info to debug --- README.md | 2 -- aioartnet/aio_artnet.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fe3a311..e3d9d89 100644 --- a/README.md +++ b/README.md @@ -18,8 +18,6 @@ Use `pip` to install [the package from pypi](https://pypi.org/project/aioartnet/ $ python -m aioartnet.main INFO:aioartnet:preferred interfaces: [(1, 'wlp4s0'), (10, 'br-ee82b9af434e'), (10, 'docker0'), (10, 'lo')] INFO:aioartnet:using interface wlp4s0 with ip 192.168.1.205 broadcast ip 192.168.1.255 - INFO:aioartnet:configured own port Port - INFO:aioartnet:configured own port Port status: ArtNetNode [Port, Port] ArtNetNode [Port, Port] diff --git a/aioartnet/aio_artnet.py b/aioartnet/aio_artnet.py index 0b14f12..4ef634c 100644 --- a/aioartnet/aio_artnet.py +++ b/aioartnet/aio_artnet.py @@ -607,14 +607,14 @@ def set_port_config( break if port: self.ports.remove(port) - logger.info(f"removed own port {port}") + logger.debug(f"removed own port {port}") if is_input or is_output: port = ArtNetPort( node=self, is_input=is_input, media=0, portaddr=port_addr, universe=u ) self.ports.append(port) - logger.info(f"configured own port {port}") + logger.debug(f"configured own port {port}") # TODO: optimise the layour of self.ports to self._portBinds # up to four ports with a common (net,sub-net) can be listed on the same page From c0ac3e4e130cc1e635046bbfad5b76ca62a204f6 Mon Sep 17 00:00:00 2001 From: Chris Shucksmith Date: Wed, 25 Jun 2025 16:23:34 +0100 Subject: [PATCH 2/3] add event stream --- README.md | 1 - aioartnet/__init__.py | 19 ++- aioartnet/{aio_artnet.py => client.py} | 153 +++++++++---------------- aioartnet/console.py | 8 ++ aioartnet/events.py | 54 +++++++++ aioartnet/main.py | 19 +-- aioartnet/models.py | 83 ++++++++++++++ test/test_aio_artnet.py | 9 +- 8 files changed, 230 insertions(+), 116 deletions(-) rename aioartnet/{aio_artnet.py => client.py} (87%) create mode 100644 aioartnet/events.py create mode 100644 aioartnet/models.py diff --git a/README.md b/README.md index e3d9d89..30e00db 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,6 @@ Use `pip` to install [the package from pypi](https://pypi.org/project/aioartnet/ There is a built in command-line lighting console for quick Art-Net testing, which maps channels 1:1 to a single universe: $ python -m aioartnet.console --universe 0:0:0 - INFO:aioartnet:configured own port Port INFO:aioartnet:preferred interfaces: [(1, 'wlp4s0'), (10, 'br-ee82b9af434e'), (10, 'docker0'), (10, 'lo'), (10, 'lxdbr0')] INFO:aioartnet:using interface wlp4s0 with ip 192.168.1.204 broadcast ip 192.168.1.255, listening on 0.0.0.0 our mac 4485009be628 > live on diff --git a/aioartnet/__init__.py b/aioartnet/__init__.py index b5044e4..a9ead4d 100644 --- a/aioartnet/__init__.py +++ b/aioartnet/__init__.py @@ -1,8 +1,25 @@ -from .aio_artnet import DMX_UNIVERSE_SIZE, ArtNetClient, ArtNetNode, ArtNetUniverse +from .client import ArtNetClient +from .events import ( + NodeChanged, + NodeDiscovered, + NodeLost, + NodePortAdded, + NodePortRemoved, + UniverseDiscovered, + UniverseDMX, +) +from .models import DMX_UNIVERSE_SIZE, ArtNetNode, ArtNetUniverse __all__ = [ "ArtNetClient", "ArtNetUniverse", "ArtNetNode", "DMX_UNIVERSE_SIZE", + "UniverseDiscovered", + "UniverseDMX", + "NodeDiscovered", + "NodeLost", + "NodeChanged", + "NodePortAdded", + "NodePortRemoved", ] diff --git a/aioartnet/aio_artnet.py b/aioartnet/client.py similarity index 87% rename from aioartnet/aio_artnet.py rename to aioartnet/client.py index 4ef634c..db22291 100644 --- a/aioartnet/aio_artnet.py +++ b/aioartnet/client.py @@ -5,9 +5,24 @@ import socket import struct import time -from collections import defaultdict -from typing import Any, Optional, Tuple, Union, cast - +from typing import Any, AsyncGenerator, Optional, Union, cast + +from .events import ( + ArtNetEvent, + NodeChanged, + NodeDiscovered, + NodeLost, + NodePortAdded, + NodePortRemoved, + UniverseDiscovered, +) +from .models import ( + DMX_UNIVERSE_SIZE, + ArtNetNode, + ArtNetPort, + ArtNetUniverse, + DatagramAddr, +) from .network import AF_PACKET, getifaddrs # Art-Net implementation for Python asyncio @@ -21,7 +36,6 @@ ARTNET_PORT = 6454 ARTNET_PREFIX = bytes("Art-Net".encode() + b"\000") -DMX_UNIVERSE_SIZE = 512 # We need to interrogate network interfaces to check which are configured for IP and # have a valid broadcast address. For unix-like systems this is fone with socket fnctl @@ -73,87 +87,6 @@ def swap16(x: int) -> int: # Each set fixes a single net and sub_net value, however the choice # of universe nibble is determined per-port (net:sub_net:universe). -DGAddr = tuple[Union[str, Any], int] - - -class ArtNetNode: - def __init__( - self, - longName: str, - portName: str, - style: int, - ip: str, - udpport: int, - ) -> None: - self.portName = portName - self.longName = longName - self._portBinds: defaultdict[int, list[ArtNetPort]] = defaultdict(list) - self.ports: list[ArtNetPort] = [] - self.style: int = style - self.udpport = udpport - self.ip = ip - self.last_reply: float = 0.0 - - def __repr__(self) -> str: - return f"ArtNetNode<{self.portName},{self.ip}:{self.udpport}>" - - -class ArtNetUniverse: - def __init__(self, portaddress: int, client: "ArtNetClient"): - if portaddress > 0x7FFF: - raise ValueError("Invalid net:subnet:universe, as net>128") - self.portaddress = portaddress - self.publishers: list[ArtNetNode] = list() - self.subscribers: list[ArtNetNode] = list() - self.last_data = bytearray(DMX_UNIVERSE_SIZE) - self._last_seq = 1 - self._last_publish: float = 0.0 - self.publisherseq: dict[Tuple[DGAddr, int], int] = {} - self._client = client - - def split(self) -> Tuple[int, int, int]: - # name net:sub_net:universe - # bits 8:15 4:8 0:4 - net = self.portaddress >> 8 - sub_net = (self.portaddress >> 4) & 0x0F - universe = self.portaddress & 0x0F - return net, sub_net, universe - - def __repr__(self) -> str: - net, sub_net, universe = self.split() - return f"{net}:{sub_net}:{universe}" - - def set_dmx(self, data: bytes) -> None: - assert len(data) == DMX_UNIVERSE_SIZE - self.last_data[:] = data[:] - self._client._send_dmx(self) - - def get_dmx(self) -> bytes: - return self.last_data - - -class ArtNetPort: - def __init__( - self, - node: Union[ArtNetNode, "ArtNetClient"], - is_input: bool, - media: int, - portaddr: int, - universe: ArtNetUniverse, - ): - self.node = node - self.is_input = is_input - self.media = 0 - self.portaddr = portaddr - self.universe = universe - - def __repr__(self) -> str: - inout = {True: "Input", False: "Output"}[self.is_input] - media = ["DMX", "MIDI", "Avab", "Colortran CMX", "ADB 62.5", "Art-Net", "DALI"][ - self.media - ] - return f"Port<{inout},{media},{self.universe}>" - class ArtNetClientProtocol(asyncio.DatagramProtocol): def __init__(self, client: "ArtNetClient"): @@ -174,7 +107,7 @@ def connection_made(self, transport: asyncio.BaseTransport) -> None: if sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - def datagram_received(self, data: bytes, addr: DGAddr) -> None: + def datagram_received(self, data: bytes, addr: DatagramAddr) -> None: if data[0:8] == ARTNET_PREFIX: (opcode,) = struct.unpack("H", data[8:10]) h = self.handlers.get(opcode, None) @@ -187,7 +120,7 @@ def datagram_received(self, data: bytes, addr: DGAddr) -> None: else: logger.debug(f"Received non Art-Net data from {addr}: {data!r}") - def on_art_poll(self, addr: DGAddr, data: bytes) -> None: + def on_art_poll(self, addr: DatagramAddr, data: bytes) -> None: ver, flags, priority = struct.unpack(" None: ) self.send_art_poll_reply() - def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None: + def on_art_poll_reply(self, addr: DatagramAddr, data: bytes) -> None: # everything up to the mac address field is mandatory, the rest must be # parsed only if it is sent (field at a time) ip, port, fw, netsw, subsw, oemCode = struct.unpack(" None: portName=portName, style=style, ) - if changed: - logger.debug(f"change detected: from {nn} to {newnode}") - # TODO: make client.node observable dict and remove this method - self.client.add_node(ip, newnode) + self.client.nodes[ip] = newnode + e = NodeChanged(node=newnode) if changed else NodeDiscovered(node=newnode) + self.client._dispatch_event(e) nn = newnode nn.last_reply = time.time() @@ -281,26 +213,32 @@ def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None: # track which 'pages' of port bindings we have seen old_ports = nn._portBinds[bindindex] + + # print(f"{nn} desired bindindex={bindindex} of {portList}, existing {old_ports}") for p in portList: if p not in old_ports: + logging.debug(f"node {nn} added port {p}") nn.ports.append(p) nn._portBinds[bindindex].append(p) {True: p.universe.publishers, False: p.universe.subscribers}[ p.is_input ].append(nn) + self.client._dispatch_event(NodePortAdded(node=nn, port=p)) for p in list(old_ports): if p not in portList: + logging.debug(f"node {nn} removed port {p}") nn.ports.remove(p) nn._portBinds[bindindex].remove(p) {True: p.universe.publishers, False: p.universe.subscribers}[ p.is_input ].remove(nn) + self.client._dispatch_event(NodePortRemoved(node=nn, port=p)) logger.debug( f"Received Art-Net PollReply from {ip} fw {fw} portName {portName} longName: {longName} bindindex {bindindex} ports:{portList}" ) - def on_art_dmx(self, addr: DGAddr, data: bytes) -> None: + def on_art_dmx(self, addr: DatagramAddr, data: bytes) -> None: ver, seq, phys, sub, net, chlen = struct.unpack(" None: if t > self._last_poll + 2.0: self._send_art_poll() + for ip, node in list(self.client.nodes.items()): + if t > node.last_reply + 120: + self.client.nodes.pop(ip, None) + self.client._dispatch_event(NodeLost(node=node)) + def _send_art_poll(self) -> None: self._last_poll = time.time() self.node_report_counter = (self.node_report_counter + 1) % 10000 @@ -515,6 +458,8 @@ def __init__( self.interface: Optional[str] = interface self._task: Optional[asyncio.Task[None]] = None + self._event_listeners: list[asyncio.Queue[ArtNetEvent]] = [] + async def connect(self) -> asyncio.DatagramTransport: loop = asyncio.get_running_loop() @@ -569,9 +514,6 @@ def _send_dmx(self, universe: ArtNetUniverse) -> None: def get_nodes(self) -> list[ArtNetNode]: return list(self.nodes.values()) - def add_node(self, ip: int, node: ArtNetNode) -> None: - self.nodes[ip] = node - def _parse_universe(self, universe: UniverseKey) -> int: if isinstance(universe, str): # parse to int @@ -611,7 +553,7 @@ def set_port_config( if is_input or is_output: port = ArtNetPort( - node=self, is_input=is_input, media=0, portaddr=port_addr, universe=u + node=None, is_input=is_input, media=0, portaddr=port_addr, universe=u ) self.ports.append(port) logger.debug(f"configured own port {port}") @@ -639,8 +581,25 @@ def _get_create_universe(self, port_addr: int) -> ArtNetUniverse: if (u := self.universes.get(port_addr, None)) is None: u = ArtNetUniverse(port_addr, self) self.universes[port_addr] = u + self._dispatch_event(UniverseDiscovered(universe=u)) return u + # EVENT LISTENERS + async def events(self) -> AsyncGenerator[ArtNetEvent, None]: + q: asyncio.Queue[ArtNetEvent] = asyncio.Queue() + self._event_listeners.append(q) + try: + while True: + yield await q.get() + finally: + self._event_listeners.remove(q) + + def _dispatch_event(self, event: ArtNetEvent) -> None: + for q in self._event_listeners: + q.put_nowait(event) + # for .on() .off() support + # text = event.text + # MUTABLE PROPERTIES # if you need to change a lot of properties in bulk, set client.passive=True, # make the required changes, then clear .passive diff --git a/aioartnet/console.py b/aioartnet/console.py index ca02660..a02d0b0 100644 --- a/aioartnet/console.py +++ b/aioartnet/console.py @@ -307,6 +307,12 @@ async def main(client: ArtNetClient, engine: Engine, interpreter: Interpreter) - await client.connect() await engine.start_ticking() + async def print_events() -> None: + async for event in client.events(): + print(event) + + t = asyncio.create_task(print_events()) + # Run echo loop. Read text from stdin, and reply it back. while True: try: @@ -324,6 +330,8 @@ async def main(client: ArtNetClient, engine: Engine, interpreter: Interpreter) - break except Exception: traceback.print_exc(limit=-2) + + t.cancel() return None diff --git a/aioartnet/events.py b/aioartnet/events.py new file mode 100644 index 0000000..1a1b36c --- /dev/null +++ b/aioartnet/events.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass, field +from typing import Protocol + +from .models import ArtNetNode, ArtNetPort, ArtNetUniverse + + +@dataclass +class ArtNetEvent(Protocol): + text: str = field(init=False, repr=False) + + +@dataclass +class NodeDiscovered(ArtNetEvent): + node: ArtNetNode + text: str = field(init=False, repr=False, default="node-added") + + +@dataclass +class NodeLost(ArtNetEvent): + node: ArtNetNode + text: str = field(init=False, repr=False, default="node-removed") + + +@dataclass +class NodeChanged(ArtNetEvent): + node: ArtNetNode + text: str = field(init=False, repr=False, default="node-changed") + + +@dataclass +class NodePortAdded(ArtNetEvent): + node: ArtNetNode + port: ArtNetPort + text: str = field(init=False, repr=False, default="node-port-changed") + + +@dataclass +class NodePortRemoved(ArtNetEvent): + node: ArtNetNode + port: ArtNetPort + text: str = field(init=False, repr=False, default="node-port-changed") + + +@dataclass +class UniverseDiscovered(ArtNetEvent): + universe: ArtNetUniverse + text: str = field(init=False, repr=False, default="universe-added") + + +@dataclass +class UniverseDMX(ArtNetEvent): + universe: ArtNetUniverse + data: bytes + text: str = field(init=False, repr=False, default="universe-added") diff --git a/aioartnet/main.py b/aioartnet/main.py index a2f2458..dd4f653 100644 --- a/aioartnet/main.py +++ b/aioartnet/main.py @@ -2,23 +2,16 @@ import asyncio import logging -from . import ArtNetClient +from . import ( + ArtNetClient, +) async def main(client: ArtNetClient) -> None: await client.connect() - # u5 = client.set_port_config("0:0:5", isoutput=True) - - while True: - await asyncio.sleep(5) - print("nodes:") - for n, node in client.nodes.items(): - print(f" {node!r: <60} {node.ports}") - print("universes:") - for univ in client.universes.values(): - print(f" {univ} pubs:{univ.publishers} subs:{univ.subscribers}") - - # print(u5.last_data[0:20].hex()) + + async for event in client.events(): + print(event) if __name__ == "__main__": diff --git a/aioartnet/models.py b/aioartnet/models.py new file mode 100644 index 0000000..7313191 --- /dev/null +++ b/aioartnet/models.py @@ -0,0 +1,83 @@ +from collections import defaultdict +from dataclasses import dataclass +from typing import Any, Optional, Protocol, Tuple, Union + +DMX_UNIVERSE_SIZE = 512 + +DatagramAddr = tuple[Union[str, Any], int] + + +class UnivActuator(Protocol): + def _send_dmx(self, univ: "ArtNetUniverse") -> None: ... + + +class ArtNetNode: + def __init__( + self, + longName: str, + portName: str, + style: int, + ip: str, + udpport: int, + ) -> None: + self.portName = portName + self.longName = longName + self._portBinds: defaultdict[int, list[ArtNetPort]] = defaultdict(list) + self.ports: list[ArtNetPort] = [] + self.style: int = style + self.udpport = udpport + self.ip = ip + self.last_reply: float = 0.0 + + def __repr__(self) -> str: + return f"ArtNetNode<{self.portName},{self.ip}:{self.udpport}>" + + +class ArtNetUniverse: + def __init__(self, portaddress: int, client: UnivActuator): + if portaddress > 0x7FFF: + raise ValueError("Invalid net:subnet:universe, as net>128") + self.portaddress = portaddress + self.publishers: list[ArtNetNode] = list() + self.subscribers: list[ArtNetNode] = list() + self.last_data = bytearray(DMX_UNIVERSE_SIZE) + self._last_seq = 1 + self._last_publish: float = 0.0 + self.publisherseq: dict[Tuple[DatagramAddr, int], int] = {} + self._client = client + + def split(self) -> Tuple[int, int, int]: + # name net:sub_net:universe + # bits 8:15 4:8 0:4 + net = self.portaddress >> 8 + sub_net = (self.portaddress >> 4) & 0x0F + universe = self.portaddress & 0x0F + return net, sub_net, universe + + def __repr__(self) -> str: + net, sub_net, universe = self.split() + return f"{net}:{sub_net}:{universe}" + + def set_dmx(self, data: bytes) -> None: + assert len(data) == DMX_UNIVERSE_SIZE + self.last_data[:] = data[:] + self._client._send_dmx(self) + + def get_dmx(self) -> bytes: + return self.last_data + + +@dataclass +class ArtNetPort: + node: Optional[ArtNetNode] + is_input: bool + media: int + portaddr: int + universe: ArtNetUniverse + + def __repr__(self) -> str: + inout = {True: "Input", False: "Output"}[self.is_input] + media = ["DMX", "MIDI", "Avab", "Colortran CMX", "ADB 62.5", "Art-Net", "DALI"][ + self.media + ] + return f"Port<{inout},{media},{self.universe}>" diff --git a/test/test_aio_artnet.py b/test/test_aio_artnet.py index ef320e5..2136b48 100644 --- a/test/test_aio_artnet.py +++ b/test/test_aio_artnet.py @@ -11,7 +11,8 @@ ArtNetClient, ArtNetUniverse, ) -from aioartnet.aio_artnet import ArtNetClientProtocol, DGAddr +from aioartnet.client import ArtNetClientProtocol +from aioartnet.models import DatagramAddr def test_universe() -> None: @@ -50,12 +51,12 @@ def packet_reader(file: str) -> Iterator[Tuple[float, bytes]]: class MockTransport(BaseTransport): def __init__(self) -> None: - self.sent: list[tuple[bytes, DGAddr]] = [] + self.sent: list[tuple[bytes, DatagramAddr]] = [] def get_extra_info(self, name: str, default: Any = None) -> Any: return None - def sendto(self, data: bytes, addr: DGAddr) -> None: + def sendto(self, data: bytes, addr: DatagramAddr) -> None: self.sent.append((data, addr)) @@ -150,7 +151,7 @@ def connect_protocol(self, protocol: ArtNetClientProtocol) -> None: def get_extra_info(self, name: str, default: Any = None) -> Any: return None - def sendto(self, data: bytes, addr: DGAddr) -> None: + def sendto(self, data: bytes, addr: DatagramAddr) -> None: self.pending.append((data, addr)) def drain(self) -> None: From 2a35797e10effa8af0c3a78cd9acf40e591a1b0f Mon Sep 17 00:00:00 2001 From: Chris Shucksmith Date: Thu, 26 Jun 2025 13:16:34 +0100 Subject: [PATCH 3/3] events: send UniverseDMX event when subscribed, test coverage --- aioartnet/client.py | 25 +++++++++++----- aioartnet/events.py | 2 +- aioartnet/models.py | 10 +++++++ test/test_aio_artnet.py | 65 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/aioartnet/client.py b/aioartnet/client.py index db22291..8c9c374 100644 --- a/aioartnet/client.py +++ b/aioartnet/client.py @@ -15,6 +15,7 @@ NodePortAdded, NodePortRemoved, UniverseDiscovered, + UniverseDMX, ) from .models import ( DMX_UNIVERSE_SIZE, @@ -175,8 +176,8 @@ def on_art_poll_reply(self, addr: DatagramAddr, data: bytes) -> None: changed |= nn.style != style logger.debug(f"change detection on {nn} => {changed}") - # FIXME: what if a node changes ip address? - + # if a node changes ip address, we will send NodeDiscovered, and then + # eventually the old node gets timed out NodeLost if nn is None or changed: newnode = ArtNetNode( ip=f"{ipa}", @@ -185,9 +186,9 @@ def on_art_poll_reply(self, addr: DatagramAddr, data: bytes) -> None: portName=portName, style=style, ) - self.client.nodes[ip] = newnode e = NodeChanged(node=newnode) if changed else NodeDiscovered(node=newnode) + logger.debug(f"new node created {nn} => {newnode} with {e}") self.client._dispatch_event(e) nn = newnode @@ -276,7 +277,12 @@ def on_art_dmx(self, addr: DatagramAddr, data: bytes) -> None: # TODO: HTP/LTP merging with Merge Mode, see "Data Merging" spec p61 # See ArtAddress AcCancelMerge flags spec p39 # Only two sources are allowed to contribute to the values in the universe - u.last_data[0:chlen] = data[8 : 8 + chlen] + channel_data = data[8 : 8 + chlen] + u.last_data[0:chlen] = channel_data + + # if we are subscribed for this data, create UniverseDMX Event + if u in self.client._subscribing: + self.client._dispatch_event(UniverseDMX(u, channel_data)) async def art_poll_task(self) -> None: while True: @@ -454,7 +460,8 @@ def __init__( self.mac: bytes = b"\01\22\33\44\55\66" self.protocol: Optional[ArtNetClientProtocol] = None - self._publishing: list[ArtNetUniverse] = [] + self._publishing: set[ArtNetUniverse] = set() + self._subscribing: set[ArtNetUniverse] = set() self.interface: Optional[str] = interface self._task: Optional[asyncio.Task[None]] = None @@ -567,10 +574,12 @@ def set_port_config( self._portBinds = {1: []} # used for the timer-based DMX repeating - if u in self._publishing: - self._publishing.remove(u) + self._publishing.discard(u) + self._subscribing.discard(u) if is_input: - self._publishing.append(u) + self._publishing.add(u) + if is_output: + self._subscribing.add(u) if not self.passive and self.protocol: self.protocol.send_art_poll_reply() diff --git a/aioartnet/events.py b/aioartnet/events.py index 1a1b36c..a2466b8 100644 --- a/aioartnet/events.py +++ b/aioartnet/events.py @@ -51,4 +51,4 @@ class UniverseDiscovered(ArtNetEvent): class UniverseDMX(ArtNetEvent): universe: ArtNetUniverse data: bytes - text: str = field(init=False, repr=False, default="universe-added") + text: str = field(init=False, repr=False, default="universe-dmx") diff --git a/aioartnet/models.py b/aioartnet/models.py index 7313191..0501d55 100644 --- a/aioartnet/models.py +++ b/aioartnet/models.py @@ -66,6 +66,16 @@ def set_dmx(self, data: bytes) -> None: def get_dmx(self) -> bytes: return self.last_data + # eq/hash based on portaddress only + def __hash__(self) -> int: + return hash(self.portaddress) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, self.__class__): + return self.portaddress == other.portaddress + else: + return False + @dataclass class ArtNetPort: diff --git a/test/test_aio_artnet.py b/test/test_aio_artnet.py index 2136b48..368f8a6 100644 --- a/test/test_aio_artnet.py +++ b/test/test_aio_artnet.py @@ -1,3 +1,4 @@ +import asyncio import socket import struct from asyncio import BaseTransport @@ -12,6 +13,14 @@ ArtNetUniverse, ) from aioartnet.client import ArtNetClientProtocol +from aioartnet.events import ( + ArtNetEvent, + NodeChanged, + NodeDiscovered, + NodePortAdded, + UniverseDiscovered, + UniverseDMX, +) from aioartnet.models import DatagramAddr @@ -161,6 +170,24 @@ def drain(self) -> None: p.datagram_received(*msg) +async def event_consumer( + client: ArtNetClient, received_events: list[ArtNetEvent] +) -> None: + async for event in client.events(): + received_events.append(event) + print(f"event_consumer got {event}") + print("Consumer task finished.") + + +async def await_events(events: list[ArtNetEvent], count: int) -> list[ArtNetEvent]: + # async with asyncio.timeout(2): + while len(events) < count: + await asyncio.sleep(0) + ret = events[:count] + events[:count] = [] + return ret + + @pytest.mark.asyncio async def test_artnet_back_to_back_nodes() -> None: # use two instances of our client linked by a mock transport to test @@ -173,6 +200,11 @@ async def test_artnet_back_to_back_nodes() -> None: clB = ArtNetClient(interface="dummy", portName="bravo") clB.broadcast_ip = "10.10.10.255" clB.unicast_ip = "10.10.10.2" + events: list[ArtNetEvent] = [] + + consumer_task = asyncio.create_task(event_consumer(clB, events)) + while len(clB._event_listeners) == 0: + await asyncio.sleep(0) protoA = ArtNetClientProtocol(clA) protoB = ArtNetClientProtocol(clB) @@ -189,6 +221,10 @@ async def test_artnet_back_to_back_nodes() -> None: == "[ArtNetNode, ArtNetNode]" ) + na, nb = await await_events(events, 2) + assert isinstance(na, NodeDiscovered) + assert isinstance(nb, NodeDiscovered) + # when a client has a property modified, it automatically sends an unsolicited PollReply clB.portName = "charlie" assert len(transport.pending) == 1 @@ -203,6 +239,35 @@ async def test_artnet_back_to_back_nodes() -> None: == "[ArtNetNode, ArtNetNode]" ) + (nc,) = await await_events(events, 1) + assert isinstance(nc, NodeChanged) + + u1p = clA.set_port_config("2:2:2", is_input=True) + u1s = clB.set_port_config("2:2:2", is_output=True) + transport.drain() + + euniv, npa1, npa2 = await await_events(events, 3) + assert isinstance(euniv, UniverseDiscovered) + assert isinstance(npa1, NodePortAdded) + assert isinstance(npa2, NodePortAdded) + + test_pattern = bytearray(512) + test_pattern[1] = 255 + u1p.set_dmx(test_pattern) + transport.drain() + + assert u1s.get_dmx() == test_pattern + (dmx1,) = await await_events(events, 1) + assert isinstance(dmx1, UniverseDMX) + assert dmx1.data == test_pattern + + consumer_task.cancel() + try: + await consumer_task + except asyncio.CancelledError: + # expected + pass + @pytest.mark.asyncio async def test_ports() -> None: