Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 190 additions & 101 deletions src/gh/components/DF_tcp_listener/code.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#! python3

from ghpythonlib.componentbase import executingcomponent as component
import socket
import threading
Expand All @@ -8,150 +6,241 @@
import scriptcontext as sc
import Rhino.Geometry as rg
import System.Drawing as sd
import Grasshopper
from diffCheck import df_gh_canvas_utils

class DFTCPListener(component):
def __init__(self):
super(DFTCPListener, self).__init__()
try:
ghenv.Component.ExpireSolution(True) # noqa: F821
ghenv.Component.Attributes.PerformLayout() # noqa: F821
except NameError:
except:
pass

for idx, label in enumerate(("Start", "Stop", "Load")):
df_gh_canvas_utils.add_button(
ghenv.Component, label, idx, x_offset=60) # noqa: F821
df_gh_canvas_utils.add_button(ghenv.Component, label, idx, x_offset=60) # noqa: F821
df_gh_canvas_utils.add_panel(ghenv.Component, "Host", "127.0.0.1", 3, 60, 20) # noqa: F821
df_gh_canvas_utils.add_panel(ghenv.Component, "Port", "5000", 4, 60, 20) # noqa: F821

def RunScript(self,
i_start: bool,
i_stop: bool,
i_load: bool,
i_host: str,
i_port: int):

prefix = 'tcp'
def RunScript(self, i_start, i_stop, i_load, i_host, i_port):
prefix = "tcp"

# ----------------------------
# Sticky initialization
sc.sticky.setdefault(f'{prefix}_server_sock', None)
sc.sticky.setdefault(f'{prefix}_server_started', False)
sc.sticky.setdefault(f'{prefix}_cloud_buffer_raw', [])
sc.sticky.setdefault(f'{prefix}_latest_cloud', None)
sc.sticky.setdefault(f'{prefix}_status_message', 'Waiting..')
sc.sticky.setdefault(f'{prefix}_prev_start', False)
sc.sticky.setdefault(f'{prefix}_prev_stop', False)
sc.sticky.setdefault(f'{prefix}_prev_load', False)

# Client handler
def handle_client(conn: socket.socket) -> None:
"""
Reads the incoming bytes from a single TCP client socket and stores valid data in a shared buffer.

:param conn: A socket object returned by `accept()` representing a live client connection.
The client is expected to send newline-delimited JSON-encoded data, where each
message is a list of 6D values: [x, y, z, r, g, b].

:returns: None
"""
buf = b''
# ----------------------------
sc.sticky.setdefault(prefix + "_server_sock", None)
sc.sticky.setdefault(prefix + "_server_started", False)
sc.sticky.setdefault(prefix + "_cloud_buffer_raw", [])
sc.sticky.setdefault(prefix + "_latest_cloud", None)
sc.sticky.setdefault(prefix + "_status_message", "Waiting..")
sc.sticky.setdefault(prefix + "_prev_start", False)
sc.sticky.setdefault(prefix + "_prev_stop", False)
sc.sticky.setdefault(prefix + "_prev_load", False)

# Receiving state
sc.sticky.setdefault(prefix + "_is_receiving", False)
sc.sticky.setdefault(prefix + "_recv_bytes", 0)

# Loading state
sc.sticky.setdefault(prefix + "_is_loading", False)
sc.sticky.setdefault(prefix + "_load_progress", (0, 0)) # (done, total)
sc.sticky.setdefault(prefix + "_load_started_at", None)
sc.sticky.setdefault(prefix + "_load_duration_s", None)

# ----------------------------
# Helper: schedule safe refresh on GH UI/solution thread
# ----------------------------
def request_expire(delay_ms=200, recompute=True):
try:
comp = ghenv.Component # noqa: F821
doc = comp.OnPingDocument()
if doc is None:
return

def cb(_):
try:
comp.ExpireSolution(recompute)
except:
pass

doc.ScheduleSolution(int(delay_ms), Grasshopper.Kernel.GH_Document.GH_ScheduleDelegate(cb))
except:
pass

# ----------------------------
# TCP receive thread
# ----------------------------
def handle_client(conn):
buf = b""
sc.sticky[prefix + "_is_receiving"] = True
sc.sticky[prefix + "_recv_bytes"] = 0
sc.sticky[prefix + "_status_message"] = "Client connected; receiving..."
request_expire(0, True)

with conn:
while sc.sticky.get(f'{prefix}_server_started', False):
while sc.sticky.get(prefix + "_server_started", False):
try:
chunk = conn.recv(4096)
chunk = conn.recv(65536)
if not chunk:
break
sc.sticky[prefix + "_recv_bytes"] += len(chunk)
buf += chunk
while b'\n' in buf:
line, buf = buf.split(b'\n', 1)

# Expect ONE message terminated by '\n'
if b"\n" in buf:
line, buf = buf.split(b"\n", 1)

try:
raw = json.loads(line.decode())
except Exception:
continue
raw = json.loads(line.decode("utf-8"))
except Exception as e:
sc.sticky[prefix + "_status_message"] = "JSON error: {}".format(repr(e))
request_expire(0, True)
break

if isinstance(raw, list) and all(isinstance(pt, list) and len(pt) == 6 for pt in raw):
sc.sticky[f'{prefix}_cloud_buffer_raw'] = raw
except Exception:
break
time.sleep(0.05) # sleep briefly to prevent CPU spin
sc.sticky[prefix + "_cloud_buffer_raw"] = raw
sc.sticky[prefix + "_status_message"] = "Buffered {} pts".format(len(raw))
else:
sc.sticky[prefix + "_status_message"] = "Invalid payload (expected [[x,y,z,r,g,b],...])"

request_expire(0, True)
break

# thread to accept incoming connections
def server_loop(sock: socket.socket) -> None:
"""
Accepts a single client connection and starts a background thread to handle it.
except Exception as e:
sc.sticky[prefix + "_status_message"] = "Recv error: {}".format(repr(e))
request_expire(0, True)
break

:param sock: A bound and listening TCP socket created by start_server().
This socket will accept one incoming connection, then delegate it to handle_client().
sc.sticky[prefix + "_is_receiving"] = False
request_expire(0, True)

:returns: None. This runs as a background thread and blocks on accept().
"""
def server_loop(sock):
try:
conn, _ = sock.accept()
handle_client(conn)
except Exception:
pass

# Start TCP server
def start_server() -> None:
"""
creates and binds a TCP socket on the given host/port, marks the server as started and then starts the accept_loop in a background thread
except Exception as e:
sc.sticky[prefix + "_status_message"] = "Accept error: {}".format(repr(e))
request_expire(0, True)

:returns: None.
"""
def start_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((i_host, i_port))
sock.bind((i_host, int(i_port)))
sock.listen(1)
sc.sticky[f'{prefix}_server_sock'] = sock
sc.sticky[f'{prefix}_server_started'] = True
sc.sticky[f'{prefix}_status_message'] = f'Listening on {i_host}:{i_port}'
# Only accept one connection to keep it long-lived
threading.Thread(target=server_loop, args=(sock,), daemon=True).start()

def stop_server() -> None:
"""
Stops the running TCP server by closing the listening socket and resetting internal state.
sc.sticky[prefix + "_server_sock"] = sock
sc.sticky[prefix + "_server_started"] = True
sc.sticky[prefix + "_status_message"] = "Listening on {}:{}".format(i_host, i_port)

threading.Thread(target=server_loop, args=(sock,), daemon=True).start()
request_expire(0, True)

:returns: None.
"""
sock = sc.sticky.get(f'{prefix}_server_sock')
def stop_server():
sock = sc.sticky.get(prefix + "_server_sock")
if sock:
try:
sock.close()
except Exception:
except:
pass
sc.sticky[f'{prefix}_server_sock'] = None
sc.sticky[f'{prefix}_server_started'] = False
sc.sticky[f'{prefix}_cloud_buffer_raw'] = []
sc.sticky[f'{prefix}_status_message'] = 'Stopped'

# Start or stop server based on inputs
if i_start and not sc.sticky[f'{prefix}_prev_start']:
sc.sticky[prefix + "_server_sock"] = None
sc.sticky[prefix + "_server_started"] = False
sc.sticky[prefix + "_is_receiving"] = False
sc.sticky[prefix + "_is_loading"] = False
sc.sticky[prefix + "_cloud_buffer_raw"] = []
sc.sticky[prefix + "_status_message"] = "Stopped"
request_expire(0, True)

# ----------------------------
# Async load (build Rhino PointCloud)
# ----------------------------
def build_pointcloud_async(raw_snapshot):
try:
sc.sticky[prefix + "_is_loading"] = True
sc.sticky[prefix + "_load_started_at"] = time.time()
sc.sticky[prefix + "_load_duration_s"] = None

total = len(raw_snapshot)
sc.sticky[prefix + "_load_progress"] = (0, total)
sc.sticky[prefix + "_status_message"] = "Loading {} pts...".format(total)
request_expire(0, True)

pc = rg.PointCloud()

step = 25000 if total > 25000 else 5000
last_ui = time.time()

for i, pt in enumerate(raw_snapshot):
x, y, z, r, g, b = pt
pc.Add(rg.Point3d(x, y, z), sd.Color.FromArgb(int(r), int(g), int(b)))

if (i + 1) % step == 0:
sc.sticky[prefix + "_load_progress"] = (i + 1, total)
now = time.time()
if now - last_ui > 0.3:
request_expire(0, True)
last_ui = now

sc.sticky[prefix + "_latest_cloud"] = pc

dur = time.time() - sc.sticky[prefix + "_load_started_at"]
sc.sticky[prefix + "_load_duration_s"] = dur
sc.sticky[prefix + "_load_progress"] = (total, total)
sc.sticky[prefix + "_status_message"] = "Loaded {} pts in {:.2f}s".format(pc.Count, dur)

# Force final recompute so output updates immediately
request_expire(0, True)

except Exception as e:
sc.sticky[prefix + "_status_message"] = "Load error: {}".format(repr(e))
request_expire(0, True)
finally:
sc.sticky[prefix + "_is_loading"] = False
request_expire(0, True)

# ----------------------------
# UI: start/stop/load button edges
# ----------------------------
if i_start and not sc.sticky[prefix + "_prev_start"]:
start_server()
if i_stop and not sc.sticky[f'{prefix}_prev_stop']:

if i_stop and not sc.sticky[prefix + "_prev_stop"]:
stop_server()

# Load buffered points into Rhino PointCloud
if i_load and not sc.sticky[f'{prefix}_prev_load']:
if not sc.sticky.get(f'{prefix}_server_started', False):
sc.sticky[f'{prefix}_status_message'] = "Start Server First!"
if i_load and not sc.sticky[prefix + "_prev_load"]:
if not sc.sticky.get(prefix + "_server_started", False):
sc.sticky[prefix + "_status_message"] = "Start Server First!"
elif sc.sticky.get(prefix + "_is_loading", False):
sc.sticky[prefix + "_status_message"] = "Already loading..."
else:
raw = sc.sticky.get(f'{prefix}_cloud_buffer_raw', [])
raw = sc.sticky.get(prefix + "_cloud_buffer_raw", [])
if raw:
pc = rg.PointCloud()
for x, y, z, r, g, b in raw:
pc.Add(rg.Point3d(x, y, z), sd.Color.FromArgb(int(r), int(g), int(b)))
sc.sticky[f'{prefix}_latest_cloud'] = pc
sc.sticky[f'{prefix}_status_message'] = f'Loaded pcd with {pc.Count} pts'
raw_snapshot = list(raw) # snapshot
threading.Thread(target=build_pointcloud_async, args=(raw_snapshot,), daemon=True).start()
else:
sc.sticky[f'{prefix}_status_message'] = 'No data buffered'
sc.sticky[prefix + "_status_message"] = "No data buffered"
request_expire(0, True)

# ----------------------------
# Live status while receiving/loading
# ----------------------------
if sc.sticky.get(prefix + "_is_receiving", False):
b = sc.sticky.get(prefix + "_recv_bytes", 0)
sc.sticky[prefix + "_status_message"] = "Receiving... {:.1f} MB".format(b / (1024.0 * 1024.0))
request_expire(300, True)

if sc.sticky.get(prefix + "_is_loading", False):
done, total = sc.sticky.get(prefix + "_load_progress", (0, 0))
if total:
sc.sticky[prefix + "_status_message"] = "Loading... {}/{} pts".format(done, total)
request_expire(300, True)

# Update previous states
sc.sticky[f'{prefix}_prev_start'] = i_start
sc.sticky[f'{prefix}_prev_stop'] = i_stop
sc.sticky[f'{prefix}_prev_load'] = i_load

# Update UI and output
ghenv.Component.Message = sc.sticky[f'{prefix}_status_message'] # noqa: F821
sc.sticky[prefix + "_prev_start"] = i_start
sc.sticky[prefix + "_prev_stop"] = i_stop
sc.sticky[prefix + "_prev_load"] = i_load

o_cloud = sc.sticky[f'{prefix}_latest_cloud']
return [o_cloud]
# Output
ghenv.Component.Message = sc.sticky[prefix + "_status_message"] # noqa: F821
return [sc.sticky[prefix + "_latest_cloud"]]
2 changes: 1 addition & 1 deletion src/gh/examples/simple_tcp_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def random_colored_point():
with socket.create_connection((host, port)) as s:
print("Connected to GH")
while True:
cloud = [random_colored_point() for _ in range(1000)]
cloud = [random_colored_point() for _ in range(1000000)]
msg = json.dumps(cloud) + "\n"
s.sendall(msg.encode())
print("Sent cloud with", len(cloud), "colored points")
Expand Down
Loading