Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ Use `pip` to install [the package from pypi](https://pypi.org/project/aioartnet/
$ python -m aioartnet.main
INFO:aioartnet:preferred interfaces: [(1, 'wlp4s0'), (10, 'br-ee82b9af434e'), (10, 'docker0'), (10, 'lo')]
INFO:aioartnet:using interface wlp4s0 with ip 192.168.1.205 broadcast ip 192.168.1.255
INFO:aioartnet:configured own port Port<Input,DMX,0:0:1>
INFO:aioartnet:configured own port Port<Output,DMX,0:0:5>
status:
ArtNetNode<aioartnet,192.168.1.205:6454> [Port<Input,DMX,0:0:1>, Port<Output,DMX,0:0:5>]
ArtNetNode<ODE Mk3,192.168.1.238:6454> [Port<Output,DMX,0:0:0>, Port<Output,DMX,0:0:1>]
Expand All @@ -31,7 +29,6 @@ Use `pip` to install [the package from pypi](https://pypi.org/project/aioartnet/
There is a built in command-line lighting console for quick Art-Net testing, which maps channels 1:1 to a single universe:

$ python -m aioartnet.console --universe 0:0:0
INFO:aioartnet:configured own port Port<Input,DMX,0:0:0>
INFO:aioartnet:preferred interfaces: [(1, 'wlp4s0'), (10, 'br-ee82b9af434e'), (10, 'docker0'), (10, 'lo'), (10, 'lxdbr0')]
INFO:aioartnet:using interface wlp4s0 with ip 192.168.1.204 broadcast ip 192.168.1.255, listening on 0.0.0.0 our mac 4485009be628
> live on
Expand Down
19 changes: 18 additions & 1 deletion aioartnet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
from .aio_artnet import DMX_UNIVERSE_SIZE, ArtNetClient, ArtNetNode, ArtNetUniverse
from .client import ArtNetClient
from .events import (
NodeChanged,
NodeDiscovered,
NodeLost,
NodePortAdded,
NodePortRemoved,
UniverseDiscovered,
UniverseDMX,
)
from .models import DMX_UNIVERSE_SIZE, ArtNetNode, ArtNetUniverse

__all__ = [
"ArtNetClient",
"ArtNetUniverse",
"ArtNetNode",
"DMX_UNIVERSE_SIZE",
"UniverseDiscovered",
"UniverseDMX",
"NodeDiscovered",
"NodeLost",
"NodeChanged",
"NodePortAdded",
"NodePortRemoved",
]
182 changes: 75 additions & 107 deletions aioartnet/aio_artnet.py → aioartnet/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,25 @@
import socket
import struct
import time
from collections import defaultdict
from typing import Any, Optional, Tuple, Union, cast

from typing import Any, AsyncGenerator, Optional, Union, cast

from .events import (
ArtNetEvent,
NodeChanged,
NodeDiscovered,
NodeLost,
NodePortAdded,
NodePortRemoved,
UniverseDiscovered,
UniverseDMX,
)
from .models import (
DMX_UNIVERSE_SIZE,
ArtNetNode,
ArtNetPort,
ArtNetUniverse,
DatagramAddr,
)
from .network import AF_PACKET, getifaddrs

# Art-Net implementation for Python asyncio
Expand All @@ -21,7 +37,6 @@
ARTNET_PORT = 6454
ARTNET_PREFIX = bytes("Art-Net".encode() + b"\000")

DMX_UNIVERSE_SIZE = 512

# We need to interrogate network interfaces to check which are configured for IP and
# have a valid broadcast address. For unix-like systems this is fone with socket fnctl
Expand Down Expand Up @@ -73,87 +88,6 @@ def swap16(x: int) -> int:
# Each set fixes a single net and sub_net value, however the choice
# of universe nibble is determined per-port (net:sub_net:universe).

DGAddr = tuple[Union[str, Any], int]


class ArtNetNode:
def __init__(
self,
longName: str,
portName: str,
style: int,
ip: str,
udpport: int,
) -> None:
self.portName = portName
self.longName = longName
self._portBinds: defaultdict[int, list[ArtNetPort]] = defaultdict(list)
self.ports: list[ArtNetPort] = []
self.style: int = style
self.udpport = udpport
self.ip = ip
self.last_reply: float = 0.0

def __repr__(self) -> str:
return f"ArtNetNode<{self.portName},{self.ip}:{self.udpport}>"


class ArtNetUniverse:
def __init__(self, portaddress: int, client: "ArtNetClient"):
if portaddress > 0x7FFF:
raise ValueError("Invalid net:subnet:universe, as net>128")
self.portaddress = portaddress
self.publishers: list[ArtNetNode] = list()
self.subscribers: list[ArtNetNode] = list()
self.last_data = bytearray(DMX_UNIVERSE_SIZE)
self._last_seq = 1
self._last_publish: float = 0.0
self.publisherseq: dict[Tuple[DGAddr, int], int] = {}
self._client = client

def split(self) -> Tuple[int, int, int]:
# name net:sub_net:universe
# bits 8:15 4:8 0:4
net = self.portaddress >> 8
sub_net = (self.portaddress >> 4) & 0x0F
universe = self.portaddress & 0x0F
return net, sub_net, universe

def __repr__(self) -> str:
net, sub_net, universe = self.split()
return f"{net}:{sub_net}:{universe}"

def set_dmx(self, data: bytes) -> None:
assert len(data) == DMX_UNIVERSE_SIZE
self.last_data[:] = data[:]
self._client._send_dmx(self)

def get_dmx(self) -> bytes:
return self.last_data


class ArtNetPort:
def __init__(
self,
node: Union[ArtNetNode, "ArtNetClient"],
is_input: bool,
media: int,
portaddr: int,
universe: ArtNetUniverse,
):
self.node = node
self.is_input = is_input
self.media = 0
self.portaddr = portaddr
self.universe = universe

def __repr__(self) -> str:
inout = {True: "Input", False: "Output"}[self.is_input]
media = ["DMX", "MIDI", "Avab", "Colortran CMX", "ADB 62.5", "Art-Net", "DALI"][
self.media
]
return f"Port<{inout},{media},{self.universe}>"


class ArtNetClientProtocol(asyncio.DatagramProtocol):
def __init__(self, client: "ArtNetClient"):
Expand All @@ -174,7 +108,7 @@ def connection_made(self, transport: asyncio.BaseTransport) -> None:
if sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

def datagram_received(self, data: bytes, addr: DGAddr) -> None:
def datagram_received(self, data: bytes, addr: DatagramAddr) -> None:
if data[0:8] == ARTNET_PREFIX:
(opcode,) = struct.unpack("H", data[8:10])
h = self.handlers.get(opcode, None)
Expand All @@ -187,15 +121,15 @@ def datagram_received(self, data: bytes, addr: DGAddr) -> None:
else:
logger.debug(f"Received non Art-Net data from {addr}: {data!r}")

def on_art_poll(self, addr: DGAddr, data: bytes) -> None:
def on_art_poll(self, addr: DatagramAddr, data: bytes) -> None:
ver, flags, priority = struct.unpack("<HBB", data)
ver = swap16(ver)
logger.debug(
f"Received Art-Net Poll: ver {ver} flags {flags} prio: {priority} from {addr}"
)
self.send_art_poll_reply()

def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None:
def on_art_poll_reply(self, addr: DatagramAddr, data: bytes) -> None:
# everything up to the mac address field is mandatory, the rest must be
# parsed only if it is sent (field at a time)
ip, port, fw, netsw, subsw, oemCode = struct.unpack("<IHHBBH", data[0:12])
Expand Down Expand Up @@ -242,8 +176,8 @@ def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None:
changed |= nn.style != style
logger.debug(f"change detection on {nn} => {changed}")

# FIXME: what if a node changes ip address?

# if a node changes ip address, we will send NodeDiscovered, and then
# eventually the old node gets timed out NodeLost
if nn is None or changed:
newnode = ArtNetNode(
ip=f"{ipa}",
Expand All @@ -252,11 +186,10 @@ def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None:
portName=portName,
style=style,
)
if changed:
logger.debug(f"change detected: from {nn} to {newnode}")

# TODO: make client.node observable dict and remove this method
self.client.add_node(ip, newnode)
self.client.nodes[ip] = newnode
Comment thread
shuckc marked this conversation as resolved.
e = NodeChanged(node=newnode) if changed else NodeDiscovered(node=newnode)
logger.debug(f"new node created {nn} => {newnode} with {e}")
self.client._dispatch_event(e)
nn = newnode

nn.last_reply = time.time()
Expand All @@ -281,26 +214,32 @@ def on_art_poll_reply(self, addr: DGAddr, data: bytes) -> None:

# track which 'pages' of port bindings we have seen
old_ports = nn._portBinds[bindindex]

# print(f"{nn} desired bindindex={bindindex} of {portList}, existing {old_ports}")
for p in portList:
if p not in old_ports:
logging.debug(f"node {nn} added port {p}")
nn.ports.append(p)
nn._portBinds[bindindex].append(p)
{True: p.universe.publishers, False: p.universe.subscribers}[
p.is_input
].append(nn)
self.client._dispatch_event(NodePortAdded(node=nn, port=p))

for p in list(old_ports):
if p not in portList:
logging.debug(f"node {nn} removed port {p}")
nn.ports.remove(p)
nn._portBinds[bindindex].remove(p)
{True: p.universe.publishers, False: p.universe.subscribers}[
p.is_input
].remove(nn)
self.client._dispatch_event(NodePortRemoved(node=nn, port=p))
logger.debug(
f"Received Art-Net PollReply from {ip} fw {fw} portName {portName} longName: {longName} bindindex {bindindex} ports:{portList}"
)

def on_art_dmx(self, addr: DGAddr, data: bytes) -> None:
def on_art_dmx(self, addr: DatagramAddr, data: bytes) -> None:
ver, seq, phys, sub, net, chlen = struct.unpack("<HBBBBH", data[0:8])
ver = swap16(ver)
chlen = swap16(chlen)
Expand Down Expand Up @@ -338,7 +277,12 @@ def on_art_dmx(self, addr: DGAddr, data: bytes) -> None:
# TODO: HTP/LTP merging with Merge Mode, see "Data Merging" spec p61
# See ArtAddress AcCancelMerge flags spec p39
# Only two sources are allowed to contribute to the values in the universe
u.last_data[0:chlen] = data[8 : 8 + chlen]
channel_data = data[8 : 8 + chlen]
u.last_data[0:chlen] = channel_data

# if we are subscribed for this data, create UniverseDMX Event
if u in self.client._subscribing:
self.client._dispatch_event(UniverseDMX(u, channel_data))

async def art_poll_task(self) -> None:
while True:
Expand All @@ -352,6 +296,11 @@ async def art_poll_task(self) -> None:
if t > self._last_poll + 2.0:
self._send_art_poll()

for ip, node in list(self.client.nodes.items()):
if t > node.last_reply + 120:
self.client.nodes.pop(ip, None)
self.client._dispatch_event(NodeLost(node=node))

def _send_art_poll(self) -> None:
self._last_poll = time.time()
self.node_report_counter = (self.node_report_counter + 1) % 10000
Expand Down Expand Up @@ -511,10 +460,13 @@ def __init__(
self.mac: bytes = b"\01\22\33\44\55\66"

self.protocol: Optional[ArtNetClientProtocol] = None
self._publishing: list[ArtNetUniverse] = []
self._publishing: set[ArtNetUniverse] = set()
self._subscribing: set[ArtNetUniverse] = set()
self.interface: Optional[str] = interface
self._task: Optional[asyncio.Task[None]] = None

self._event_listeners: list[asyncio.Queue[ArtNetEvent]] = []

async def connect(self) -> asyncio.DatagramTransport:
loop = asyncio.get_running_loop()

Expand Down Expand Up @@ -569,9 +521,6 @@ def _send_dmx(self, universe: ArtNetUniverse) -> None:
def get_nodes(self) -> list[ArtNetNode]:
return list(self.nodes.values())

def add_node(self, ip: int, node: ArtNetNode) -> None:
self.nodes[ip] = node

def _parse_universe(self, universe: UniverseKey) -> int:
if isinstance(universe, str):
# parse to int
Expand Down Expand Up @@ -607,14 +556,14 @@ def set_port_config(
break
if port:
self.ports.remove(port)
logger.info(f"removed own port {port}")
logger.debug(f"removed own port {port}")

if is_input or is_output:
port = ArtNetPort(
node=self, is_input=is_input, media=0, portaddr=port_addr, universe=u
node=None, is_input=is_input, media=0, portaddr=port_addr, universe=u
)
self.ports.append(port)
logger.info(f"configured own port {port}")
logger.debug(f"configured own port {port}")

# TODO: optimise the layour of self.ports to self._portBinds
# up to four ports with a common (net,sub-net) can be listed on the same page
Expand All @@ -625,10 +574,12 @@ def set_port_config(
self._portBinds = {1: []}

# used for the timer-based DMX repeating
if u in self._publishing:
self._publishing.remove(u)
self._publishing.discard(u)
self._subscribing.discard(u)
if is_input:
self._publishing.append(u)
self._publishing.add(u)
if is_output:
self._subscribing.add(u)

if not self.passive and self.protocol:
self.protocol.send_art_poll_reply()
Expand All @@ -639,8 +590,25 @@ def _get_create_universe(self, port_addr: int) -> ArtNetUniverse:
if (u := self.universes.get(port_addr, None)) is None:
u = ArtNetUniverse(port_addr, self)
self.universes[port_addr] = u
self._dispatch_event(UniverseDiscovered(universe=u))
return u

# EVENT LISTENERS
async def events(self) -> AsyncGenerator[ArtNetEvent, None]:
q: asyncio.Queue[ArtNetEvent] = asyncio.Queue()
self._event_listeners.append(q)
try:
while True:
yield await q.get()
finally:
self._event_listeners.remove(q)

def _dispatch_event(self, event: ArtNetEvent) -> None:
for q in self._event_listeners:
q.put_nowait(event)
# for .on() .off() support
# text = event.text

# MUTABLE PROPERTIES
# if you need to change a lot of properties in bulk, set client.passive=True,
# make the required changes, then clear .passive
Expand Down
8 changes: 8 additions & 0 deletions aioartnet/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ async def main(client: ArtNetClient, engine: Engine, interpreter: Interpreter) -
await client.connect()
await engine.start_ticking()

async def print_events() -> None:
async for event in client.events():
print(event)

t = asyncio.create_task(print_events())

# Run echo loop. Read text from stdin, and reply it back.
while True:
try:
Expand All @@ -324,6 +330,8 @@ async def main(client: ArtNetClient, engine: Engine, interpreter: Interpreter) -
break
except Exception:
traceback.print_exc(limit=-2)

t.cancel()
Comment thread
shuckc marked this conversation as resolved.
return None


Expand Down
Loading