From e8b1495552a75c01db17e48e185e76531d36f7d6 Mon Sep 17 00:00:00 2001 From: YANG Zhenfei Date: Sun, 29 Mar 2026 17:10:59 +0800 Subject: [PATCH] refactor(perception): remove per-frame asyncio.run --- tinynav/core/models_trt.py | 157 ++++++++++++++++++++++++++++---- tinynav/core/perception_node.py | 23 +++-- 2 files changed, 152 insertions(+), 28 deletions(-) diff --git a/tinynav/core/models_trt.py b/tinynav/core/models_trt.py index 6f924ecb..a708c6d0 100644 --- a/tinynav/core/models_trt.py +++ b/tinynav/core/models_trt.py @@ -4,7 +4,7 @@ from codetiming import Timer import platform import asyncio -from tinynav.core.func import alru_cache_numpy +from tinynav.core.func import alru_cache_numpy, lru_cache_numpy from cuda import cudart import ctypes @@ -126,6 +126,36 @@ async def run_graph(self): results[out["name"]] = out["host"].copy() return results + def run_graph_sync(self): + if "aarch64" not in platform.machine(): + for inp in self.inputs: + cudart.cudaMemcpyAsync( + inp["device"], + inp["host"].ctypes.data, + inp["nbytes"], + cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, + self.stream, + ) + + cudart.cudaGraphLaunch(self.graph_exec, self.stream) + + if "aarch64" not in platform.machine(): + for out in self.outputs: + cudart.cudaMemcpyAsync( + out["host"].ctypes.data, + out["device"], + out["nbytes"], + cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost, + self.stream, + ) + + cudart.cudaStreamSynchronize(self.stream) + + results = {} + for out in self.outputs: + results[out["name"]] = out["host"].copy() + return results + class SuperPointTRT(TRTBase): def __init__(self, engine_path=f"/tinynav/tinynav/models/superpoint_fp16_dynamic_{platform.machine()}.plan"): @@ -133,13 +163,26 @@ def __init__(self, engine_path=f"/tinynav/tinynav/models/superpoint_fp16_dynamic # model input [1,1,H,W] self.input_shape = self.inputs[0]["shape"][2:4] # [H,W] + def _postprocess(self, input_image: np.ndarray, results: dict): + h_in, w_in = input_image.shape[0], input_image.shape[1] + h_net, w_net = self.input_shape[0], self.input_shape[1] + scale_x = w_in / w_net + scale_y = h_in / h_net + k = results["kpts"][0] + if k.shape[0] == 2: + k[0] = (k[0] + 0.5) * scale_x - 0.5 + k[1] = (k[1] + 0.5) * scale_y - 0.5 + else: + k[:, 0] = (k[:, 0] + 0.5) * scale_x - 0.5 + k[:, 1] = (k[:, 1] + 0.5) * scale_y - 0.5 + results["mask"] = results["mask"][:, :, None] + return results + # default threshold as # https://github.com/cvg/LightGlue/blob/746fac2c042e05d1865315b1413419f1c1e7ba55/lightglue/superpoint.py#L111 # @alru_cache_numpy(maxsize=32) async def infer(self, input_image:np.ndarray, threshold = np.array([[0.0005]], dtype=np.float32)): - # Resize to engine input size (may change aspect ratio for non-matching resolutions). - h_in, w_in = input_image.shape[0], input_image.shape[1] h_net, w_net = self.input_shape[0], self.input_shape[1] image = cv2.resize(input_image, (w_net, h_net)) image = image[None, None, :, :] @@ -148,20 +191,19 @@ async def infer(self, input_image:np.ndarray, threshold = np.array([[0.0005]], d np.copyto(self.inputs[1]["host"], threshold) results = await self.run_graph() + return self._postprocess(input_image, results) - # Scale keypoints from network coords (h_net, w_net) back to input image coords (h_in, w_in). - # Use per-axis scale so Looper (640x544) and other resolutions match; img_shape is (width, height). - scale_x = w_in / w_net - scale_y = h_in / h_net - k = results["kpts"][0] - if k.shape[0] == 2: - k[0] = (k[0] + 0.5) * scale_x - 0.5 - k[1] = (k[1] + 0.5) * scale_y - 0.5 - else: - k[:, 0] = (k[:, 0] + 0.5) * scale_x - 0.5 - k[:, 1] = (k[:, 1] + 0.5) * scale_y - 0.5 - results["mask"] = results["mask"][:, :, None] - return results + @lru_cache_numpy(maxsize=32) + def infer_sync(self, input_image:np.ndarray, threshold = np.array([[0.0005]], dtype=np.float32)): + h_net, w_net = self.input_shape[0], self.input_shape[1] + image = cv2.resize(input_image, (w_net, h_net)) + image = image[None, None, :, :] + + np.copyto(self.inputs[0]["host"], image) + np.copyto(self.inputs[1]["host"], threshold) + + results = self.run_graph_sync() + return self._postprocess(input_image, results) class LightGlueTRT(TRTBase): def __init__(self, engine_path=f"/tinynav/tinynav/models/lightglue_fp16_{platform.machine()}.plan"): @@ -184,6 +226,20 @@ async def infer(self, kpts0, kpts1, desc0, desc1, mask0, mask1, img_shape0, img_ return await self.run_graph() + @lru_cache_numpy(maxsize=32) + def infer_sync(self, kpts0, kpts1, desc0, desc1, mask0, mask1, img_shape0, img_shape1, match_threshold = np.array([[0.1]], dtype=np.float32)): + np.copyto(self.inputs[0]["host"], kpts0) + np.copyto(self.inputs[1]["host"], kpts1) + np.copyto(self.inputs[2]["host"], desc0) + np.copyto(self.inputs[3]["host"], desc1) + np.copyto(self.inputs[4]["host"], mask0) + np.copyto(self.inputs[5]["host"], mask1) + np.copyto(self.inputs[6]["host"], img_shape0) + np.copyto(self.inputs[7]["host"], img_shape1) + np.copyto(self.inputs[8]["host"], match_threshold) + + return self.run_graph_sync() + class Dinov2TRT(TRTBase): def __init__(self, engine_path=f"/tinynav/tinynav/models/dinov2_base_224x224_fp16_{platform.machine()}.plan"): super().__init__(engine_path) @@ -206,6 +262,12 @@ async def infer(self, image): results = await self.run_graph() return results["last_hidden_state"][:, 0, :].squeeze(0) + def infer_sync(self, image): + image = self.preprocess_image(image) + np.copyto(self.inputs[0]["host"], image) + results = self.run_graph_sync() + return results["last_hidden_state"][:, 0, :].squeeze(0) + class StereoEngineTRT(TRTBase): def _get_static_shape(self, name): @@ -285,6 +347,47 @@ async def run_graph(self): results[name] = np.array(arr).copy() if arr.ndim == 0 else arr.copy() return results + def run_graph_sync(self): + input_shapes = self._current_input_shapes + if "aarch64" not in platform.machine(): + cudart.cudaMemcpyAsync(self.inputs[0]["device"], self.inputs[0]["host"].ctypes.data, + self._current_input_nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, self.stream) + cudart.cudaMemcpyAsync(self.inputs[1]["device"], self.inputs[1]["host"].ctypes.data, + self._current_input_nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, self.stream) + for inp in self.inputs[2:]: + cudart.cudaMemcpyAsync(inp["device"], inp["host"].ctypes.data, + inp["nbytes"], cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, self.stream) + self.context.set_optimization_profile_async(0, self.stream) + self.context.set_input_shape("left", input_shapes) + self.context.set_input_shape("right", input_shapes) + self.context.execute_async_v3(stream_handle=self.stream) + h_net, w_net = input_shapes[2], input_shapes[3] + if "aarch64" not in platform.machine(): + for out in self.outputs: + if out["name"] in ("disp", "depth"): + nbytes = input_shapes[2] * input_shapes[3] * np.float32().itemsize + else: + nbytes = out["nbytes"] + cudart.cudaMemcpyAsync( + out["host"].ctypes.data, + out["device"], + nbytes, + cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost, + self.stream, + ) + cudart.cudaStreamSynchronize(self.stream) + results = {} + for out in self.outputs: + arr = out["host"] + name = out["name"] + if name in ("disp", "depth"): + flat = np.asarray(arr).reshape(-1) + needed = h_net * w_net + results[name] = flat[:needed].reshape(h_net, w_net).copy() + else: + results[name] = np.array(arr).copy() if arr.ndim == 0 else arr.copy() + return results + async def infer(self, left_img, right_img, baseline, focal_length): h_in, w_in = left_img.shape[0], left_img.shape[1] @@ -308,6 +411,28 @@ async def infer(self, left_img, right_img, baseline, focal_length): ) return disp.astype(np.float32), depth.astype(np.float32) + def infer_sync(self, left_img, right_img, baseline, focal_length): + h_in, w_in = left_img.shape[0], left_img.shape[1] + + self._current_input_shapes = (1, 1, h_in, w_in) + self._current_input_nbytes = h_in * w_in * np.uint8().itemsize + + left_tensor = left_img.astype(np.uint8).ravel() + right_tensor = right_img.astype(np.uint8).ravel() + np.copyto(self.inputs[0]["host"].reshape(-1)[: left_tensor.size], left_tensor) + np.copyto(self.inputs[1]["host"].reshape(-1)[: right_tensor.size], right_tensor) + np.copyto(self.inputs[2]["host"], baseline) + np.copyto(self.inputs[3]["host"], focal_length) + + results = self.run_graph_sync() + disp = results["disp"] + depth = results["depth"] + if disp.shape != (h_in, w_in) or depth.shape != (h_in, w_in): + raise RuntimeError( + f"StereoEngine output shape mismatch: got disp {disp.shape}, depth {depth.shape}, expected ({h_in}, {w_in})" + ) + return disp.astype(np.float32), depth.astype(np.float32) + if __name__ == "__main__": # Synthetic sanity test for both RealSense and Looper resolutions. diff --git a/tinynav/core/perception_node.py b/tinynav/core/perception_node.py index 1ead4e24..69dd15b0 100644 --- a/tinynav/core/perception_node.py +++ b/tinynav/core/perception_node.py @@ -17,7 +17,6 @@ from math_utils import rot_from_two_vector, np2msg, np2tf, estimate_pose, se3_inv from math_utils import uf_init, uf_union, uf_all_sets_list from tf2_ros import TransformBroadcaster -import asyncio import gtsam import gtsam_unstable from collections import deque @@ -191,12 +190,12 @@ def images_callback(self, left_msg, right_msg): self.last_processed_timestamp = current_timestamp loop_start = time.perf_counter() with Timer(name="Perception Loop", text="[{name}] Elapsed time: {milliseconds:.0f} ms\n\n", logger=self.logger.info): - processed = asyncio.run(self.process(left_msg, right_msg)) + processed = self.process(left_msg, right_msg) if processed: loop_ms = (time.perf_counter() - loop_start) * 1000.0 self.stats_pub.publish(Float32MultiArray(data=[float(loop_ms)])) - async def process(self, left_msg, right_msg): + def process(self, left_msg, right_msg): if self.K is None or self.T_body_last is None: return False self.process_cnt += 1 @@ -204,7 +203,7 @@ async def process(self, left_msg, right_msg): right_img = self.bridge.imgmsg_to_cv2(right_msg, "mono8") current_timestamp = stamp2second(left_msg.header.stamp) if len(self.keyframe_queue) == 0: # first frame - disparity, depth = await self.stereo_engine.infer(left_img, right_img, np.array([[self.baseline]]), np.array([[self.K[0,0]]])) + disparity, depth = self.stereo_engine.infer_sync(left_img, right_img, np.array([[self.baseline]]), np.array([[self.K[0,0]]])) self.keyframe_queue.append( Keyframe( timestamp=current_timestamp, @@ -221,12 +220,12 @@ async def process(self, left_msg, right_msg): return True with Timer(name="[Stereo Inference]", text="[{name}] Elapsed time: {milliseconds:.0f} ms", logger=self.logger.debug): - disparity, depth = await self.stereo_engine.infer(left_img, right_img, np.array([[self.baseline]]), np.array([[self.K[0,0]]])) + disparity, depth = self.stereo_engine.infer_sync(left_img, right_img, np.array([[self.baseline]]), np.array([[self.K[0,0]]])) kf_prev = self.keyframe_queue[-1] - prev_left_extract_result = await self.superpoint.infer(kf_prev.image) - current_left_extract_result = await self.superpoint.infer(left_img) + prev_left_extract_result = self.superpoint.infer_sync(kf_prev.image) + current_left_extract_result = self.superpoint.infer_sync(left_img) - match_result = await self.light_glue.infer( + match_result = self.light_glue.infer_sync( prev_left_extract_result["kpts"], current_left_extract_result["kpts"], prev_left_extract_result["descps"], @@ -337,7 +336,7 @@ async def process(self, left_msg, right_msg): #current_i = len(self.keyframe_queue[-_N:]) with Timer(name="[init extract info]", text="[{name}] Elapsed time: {milliseconds:.0f} ms", logger=self.logger.debug): - extract_info = [await self.superpoint.infer(kf.image) for kf in self.keyframe_queue[-_N:]] + extract_info = [self.superpoint.infer_sync(kf.image) for kf in self.keyframe_queue[-_N:]] parent, rank = uf_init(len(self.keyframe_queue[-_N:]) * _M) self.logger.debug(f"Processing {len(self.keyframe_queue)} keyframes for data association.") @@ -355,12 +354,12 @@ async def process(self, left_msg, right_msg): self.logger.debug("timestamp prev: ", kf_prev.timestamp) self.logger.debug("timestamp curr: ", kf_curr.timestamp) with Timer(name="[cached result[1.1/3]]", text="[{name}] Elapsed time: {milliseconds:.03f} ms", logger=self.logger.debug): - prev_left_extract_result = await self.superpoint.infer(kf_prev.image) + prev_left_extract_result = self.superpoint.infer_sync(kf_prev.image) with Timer(name="[cached result[1.2/3]]", text="[{name}] Elapsed time: {milliseconds:.03f} ms", logger=self.logger.debug): - current_left_extract_result = await self.superpoint.infer(kf_curr.image) + current_left_extract_result = self.superpoint.infer_sync(kf_curr.image) with Timer(name="[cached result[1.3/3]]", text="[{name}] Elapsed time: {milliseconds:.03f} ms", logger=self.logger.debug): - match_result = await self.light_glue.infer( + match_result = self.light_glue.infer_sync( prev_left_extract_result["kpts"], current_left_extract_result["kpts"], prev_left_extract_result["descps"],