diff --git a/Makefile b/Makefile index b4aa3b736..ae2d88cbc 100644 --- a/Makefile +++ b/Makefile @@ -134,6 +134,18 @@ pyprof: buildext $(PROFFILES) $(WHICH_PYTHON) $${fn} > $${outfn} || exit 1; \ done +.PHONY: track +NASA_DATASET_DIR := download/Flight1_Catered_Dataset-20201013 +NASA_DLC_CSV := $(NASA_DATASET_DIR)/Data/dlc.csv +NASA_TRUTH_CSV := $(NASA_DATASET_DIR)/Data/truth.csv + +$(NASA_DLC_CSV) $(NASA_TRUTH_CSV) &: + @mkdir -p download + PYTHONPATH=$(MODMESH_ROOT)/modmesh $(WHICH_PYTHON) -m track.dataset + +track: $(NASA_DLC_CSV) $(NASA_TRUTH_CSV) + PYTHONPATH=$(MODMESH_ROOT)/modmesh $(WHICH_PYTHON) -m track.track_flight + .PHONY: pilot pilot: cmake cmake --build $(BUILD_PATH) --target $@ VERBOSE=$(VERBOSE) $(MAKE_PARALLEL) diff --git a/modmesh/__init__.py b/modmesh/__init__.py index e12806908..f47d6c9bb 100644 --- a/modmesh/__init__.py +++ b/modmesh/__init__.py @@ -41,7 +41,7 @@ from . import system # noqa: F401 from . import testing # noqa: F401 from . import toggle # noqa: F401 - +from . import track # noqa: F401 clinfo = core.ProcessInfo.instance.command_line diff --git a/modmesh/track/__init__.py b/modmesh/track/__init__.py new file mode 100644 index 000000000..8bc5833cf --- /dev/null +++ b/modmesh/track/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2026, Chun-Shih Chang +# BSD 3-Clause License, see COPYING + +""" +track: Track nasa flight dataset with Kalman Filter. +""" + +__all__ = [ + 'dataset', + 'earth', + 'npkalmanfilter', + 'track_flight', + 'attitude', +] + +# vim: set ff=unix fenc=utf8 et sw=4 ts=4 sts=4: diff --git a/modmesh/track/attitude.py b/modmesh/track/attitude.py new file mode 100644 index 000000000..53bf79277 --- /dev/null +++ b/modmesh/track/attitude.py @@ -0,0 +1,141 @@ +# Copyright (c) 2026, Chun-Shih Chang +# BSD 3-Clause License, see COPYING + +""" +Attitude representation and conversion. +""" + +import numpy as np + + +class attitude: + """ + Static utilities for attitude conversions. + """ + + @staticmethod + def skew(v): + """ + Return the skew-symmetric matrix of a 3D vector. + For a vector ``v``, this function returns matrix ``[v]_x`` such that + ``[v]_x @ a == np.cross(v, a)``. + + :param v: 3D vector. + :type v: numpy.ndarray + :return: 3x3 skew-symmetric matrix. + :rtype: numpy.ndarray + """ + return np.array( + [ + [0.0, -v[2], v[1]], + [v[2], 0.0, -v[0]], + [-v[1], v[0], 0.0], + ] + ) + + @staticmethod + def dangle_to_dcm(dtheta): + """ + Convert delta-angle vector to direction cosine matrix (DCM). + Uses Rodrigues' rotation formula. For very small angles, a first-order + approximation is used to avoid division by zero. + Reference1: + https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html + Reference2: + reference: https://arxiv.org/abs/1312.0788 + + :param dtheta: Rotation vector in radians (axis * angle), + shape ``(3,)``. + :type dtheta: numpy.ndarray + :return: Rotation matrix, shape ``(3, 3)``. + :rtype: numpy.ndarray + """ + angle = np.linalg.norm(dtheta) + if angle < 1e-12: + return np.eye(3) + attitude.skew(dtheta) + + k = dtheta / angle + kx = attitude.skew(k) + c = np.cos(angle) + s = np.sin(angle) + return c * np.eye(3) + (1.0 - c) * np.outer(k, k) + s * kx + + @staticmethod + def quat_to_dcm(q): + """ + Convert quaternion to direction cosine matrix (DCM). + Reference: + https://naif.jpl.nasa.gov/pub/naif/toolkit_docs/C/req/rotation.html#Formation%20of%20a%20rotation%20matrix%20from%20a%20quaternion + + :param q: Quaternion in ``[x, y, z, w]`` order. + :type q: numpy.ndarray + :return: Rotation matrix, shape ``(3, 3)``. + :rtype: numpy.ndarray + """ + x, y, z, w = q + xx = x * x + yy = y * y + zz = z * z + xy = x * y + xz = x * z + yz = y * z + wx = w * x + wy = w * y + wz = w * z + return np.array( + [ + [1.0 - 2.0 * (yy + zz), 2.0 * (xy - wz), 2.0 * (xz + wy)], + [2.0 * (xy + wz), 1.0 - 2.0 * (xx + zz), 2.0 * (yz - wx)], + [2.0 * (xz - wy), 2.0 * (yz + wx), 1.0 - 2.0 * (xx + yy)], + ] + ) + + @staticmethod + def dcm_to_quat(dcm): + """ + Convert direction cosine matrix (DCM) to quaternion. + Reference: + https://www.euclideanspace.com/maths/geometry/rotations/conversions/matrixToQuaternion/ + The returned quaternion is normalized and follows ``[x, y, z, w]`` + order (scalar-last). + + :param dcm: Rotation matrix, shape ``(3, 3)``. + :type dcm: numpy.ndarray + :return: Unit quaternion in ``[x, y, z, w]`` order. + :rtype: numpy.ndarray + """ + m = np.asarray(dcm, dtype=np.float64) + trace = np.trace(m) + + if trace > 0.0: + s = 2.0 * np.sqrt(trace + 1.0) + w = 0.25 * s + x = (m[2, 1] - m[1, 2]) / s + y = (m[0, 2] - m[2, 0]) / s + z = (m[1, 0] - m[0, 1]) / s + elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]: + s = 2.0 * np.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2]) + w = (m[2, 1] - m[1, 2]) / s + x = 0.25 * s + y = (m[0, 1] + m[1, 0]) / s + z = (m[0, 2] + m[2, 0]) / s + elif m[1, 1] > m[2, 2]: + s = 2.0 * np.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2]) + w = (m[0, 2] - m[2, 0]) / s + x = (m[0, 1] + m[1, 0]) / s + y = 0.25 * s + z = (m[1, 2] + m[2, 1]) / s + else: + s = 2.0 * np.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1]) + w = (m[1, 0] - m[0, 1]) / s + x = (m[0, 2] + m[2, 0]) / s + y = (m[1, 2] + m[2, 1]) / s + z = 0.25 * s + + q = np.array([x, y, z, w], dtype=np.float64) + q_norm = np.linalg.norm(q) + if q_norm == 0.0: + return np.array([0.0, 0.0, 0.0, 1.0], dtype=np.float64) + return q / q_norm + +# vim: set ff=unix fenc=utf8 et sw=4 ts=4 sts=4: diff --git a/modmesh/track/dataset.py b/modmesh/track/dataset.py new file mode 100644 index 000000000..2e38697f9 --- /dev/null +++ b/modmesh/track/dataset.py @@ -0,0 +1,275 @@ +# Copyright (c) 2026, Chun-Shih Chang +# BSD 3-Clause License, see COPYING + +""" +Download, extract and preprocess nasa flight dataset. +""" + +import json +import numpy as np +import os +import ssl +import urllib.request +import zipfile + + +class Base: + """ + Shared helpers for dataset CSV loading and interpolation. + """ + + @staticmethod + def load_text(path): + """ + Load a CSV file as a float64 NumPy matrix. + + :param path: CSV file path. + :type path: str + :return: Matrix with shape ``(N, M)``. + :rtype: numpy.ndarray + """ + raw = np.genfromtxt( + path, + delimiter=",", + names=True, + dtype=None, + encoding=None, + ) + return raw.view((np.float64, len(raw.dtype.names))) + + @staticmethod + def interp_linear(t_tgt, t_src, y_src): + """ + Linearly interpolate source samples to target timestamps. + + :param t_tgt: Target timestamps. + :type t_tgt: numpy.ndarray + :param t_src: Source timestamps. + :type t_src: numpy.ndarray + :param y_src: Source samples with shape ``(N, C)``. + :type y_src: numpy.ndarray + :return: Interpolated samples with shape ``(len(t_tgt), C)``. + :rtype: numpy.ndarray + """ + out = np.empty((len(t_tgt), y_src.shape[1]), dtype=np.float64) + for i in range(y_src.shape[1]): + out[:, i] = np.interp(t_tgt, t_src, y_src[:, i]) + return out + + +class DLC(Base): + """ + DLC IMU dataset loader and timestamp aligner. + """ + + def __init__(self, root): + """ + Initialize DLC dataset reader. + + :param root: Root directory that contains extracted NASA files. + :type root: str + """ + self.path = "Flight1_Catered_Dataset-20201013/Data/dlc.csv" + self.root = root + self.imu_pos_in_con = np.array([-0.08035, 0.28390, -1.42333]) + self.dcm_con_imu = np.array([ + [-0.2477, -0.1673, 0.9543], + [-0.0478, 0.9859, 0.1604], + [-0.9677, -0.0059, -0.2522], + ]) + + def load(self): + """ + Load DLC IMU delta-velocity and delta-angle samples from CSV. + + :return: ``None``. + :rtype: None + """ + raw_data = Base.load_text(f"{self.root}/{self.path}") + self.timestamp = raw_data[:, 0] + self.imu_dvel_in_imu = raw_data[:, 1:4] + self.imu_dangle_in_imu = raw_data[:, 4:7] + + def alignment(self, target_timestamp): + """ + Align IMU increments to target timestamps by interpolation. + + The method integrates raw increments to cumulative signals, + interpolates those cumulative signals to target timestamps, and + differences them back + to per-step increments. + + :param target_timestamp: Target timestamps. + :type target_timestamp: numpy.ndarray + :return: ``None``. + :rtype: None + """ + imu_velocity = np.cumsum(self.imu_dvel_in_imu.copy(), axis=0) + imu_angle = np.cumsum(self.imu_dangle_in_imu.copy(), axis=0) + imu_velocity = Base.interp_linear( + target_timestamp, + self.timestamp, + imu_velocity, + ) + imu_angle = Base.interp_linear( + target_timestamp, + self.timestamp, + imu_angle, + ) + + aligned_diff_velocity = np.empty( + (len(target_timestamp), 3), + dtype=np.float64, + ) + aligned_diff_angle = np.empty( + (len(target_timestamp), 3), + dtype=np.float64, + ) + + aligned_diff_velocity[0, :] = 0.0 + aligned_diff_angle[0, :] = 0.0 + aligned_diff_velocity[1:, :] = ( + imu_velocity[1:, :] - imu_velocity[:-1, :] + ) + aligned_diff_angle[1:, :] = imu_angle[1:, :] - imu_angle[:-1, :] + + self.timestamp = target_timestamp + self.imu_dvel_in_imu = aligned_diff_velocity + self.imu_dangle_in_imu = aligned_diff_angle + + +class GroundTruth(Base): + """ + Ground-truth state dataset loader. + """ + + def __init__(self, root): + """ + Initialize ground-truth dataset reader. + + :param root: Root directory that contains extracted NASA files. + :type root: str + """ + self.path = "Flight1_Catered_Dataset-20201013/Data/truth.csv" + self.root = root + + def load(self): + """ + Load ground-truth position, velocity, and quaternion from CSV. + + :return: ``None``. + :rtype: None + """ + raw_data = Base.load_text(f"{self.root}/{self.path}") + self.timestamp = raw_data[:, 0] + self.gt_con_pos_in_ecef = raw_data[:, 1:4] + self.gt_con_vel_in_ecef = raw_data[:, 4:7] + self.gt_quat_con_ecef = raw_data[:, 7:11] + + +class NasaDataset: + """ + Helper for downloading, extracting, and loading NASA files. + """ + + def __init__(self): + """ + Initialize download/load configuration. + """ + + self.url = "https://techport.nasa.gov/api/file/presignedUrl/380503" + self.download_dir = "./download" + self.filename = "DDL-F1_Dataset-20201013.zip" + + def download(self): + """ + Download zipped dataset from NASA presigned URL. + + :return: ``None``. + :rtype: None + """ + response = urllib.request.urlopen(self.url) + presigned_url = json.loads(response.read())["presignedUrl"] + os.makedirs(self.download_dir, exist_ok=True) + urllib.request.urlretrieve( + presigned_url, + f"{self.download_dir}/{self.filename}", + reporthook=self._download_hook, + ) + print() + + def extract(self): + """ + Extract downloaded dataset zip into ``download_dir``. + + :return: ``None``. + :rtype: None + """ + with zipfile.ZipFile( + f"{self.download_dir}/{self.filename}", + "r", + ) as file: + file.extractall(self.download_dir) + + def load(self): + """ + Load DLC and ground-truth datasets, then align timestamps. + + :return: ``None``. + :rtype: None + """ + self.dlc = DLC(self.download_dir) + self.dlc.load() + self.ground_truth = GroundTruth(self.download_dir) + self.ground_truth.load() + self.alignment() + + def alignment(self): + """ + Align DLC timestamps to ground-truth timestamps. + + :return: ``None``. + :rtype: None + """ + self.dlc.alignment(self.ground_truth.timestamp) + + def _download_hook(self, block_num, block_size, total_size): + """ + Progress callback for ``urllib.request.urlretrieve``. + + :param block_num: Downloaded block count. + :type block_num: int + :param block_size: Block size in bytes. + :type block_size: int + :param total_size: Total file size in bytes. + :type total_size: int + :return: ``None``. + :rtype: None + """ + if total_size <= 0: + return + downloaded = block_num * block_size + ratio = min(downloaded / total_size, 1.0) + width = 30 + filled = int(width * ratio) + bar = "#" * filled + "-" * (width - filled) + print(f"\rDownloading [{bar}] {ratio:6.2%}", end="", flush=True) + + +def main(): + """ + Download and extract the NASA flight dataset. + + :return: ``None``. + :rtype: None + """ + ssl._create_default_https_context = ssl._create_stdlib_context + dataset = NasaDataset() + dataset.download() + dataset.extract() + + +if __name__ == "__main__": + main() + +# vim: set ff=unix fenc=utf8 et sw=4 ts=4 sts=4: diff --git a/modmesh/track/earth.py b/modmesh/track/earth.py new file mode 100644 index 000000000..86da9dd79 --- /dev/null +++ b/modmesh/track/earth.py @@ -0,0 +1,92 @@ +# Copyright (c) 2026, Chun-Shih Chang +# BSD 3-Clause License, see COPYING + +""" +Earth-related calculations, such as gravity, Coriolis force and +centrifugal force. +""" + +import numpy as np + + +class Earth: + """ + Static utilities for Earth constants and inertial-frame compensation. + Reference1: https://earth-info.nga.mil/?action=wgs84&dir=wgs84 + Reference2: + https://physics.stackexchange.com/questions/835050/equations-of-motion-in-earth-centered-earth-fixed-coordinates + """ + + OMEGA_EARTH = 7.292115e-5 + MU_EARTH = 3.986004418e14 + + @staticmethod + def earth_rotation_rate_ecef(): + """ + Return Earth rotation rate vector in ECEF frame. + + :return: Angular velocity vector ``[0, 0, omega]`` in rad/s. + :rtype: numpy.ndarray + """ + return np.array([0.0, 0.0, Earth.OMEGA_EARTH], dtype=np.float64) + + @staticmethod + def gravity_ecef(pos_ecef): + """ + Compute central gravity acceleration in ECEF frame. + + :param pos_ecef: Position in ECEF coordinates, shape ``(3,)``. + :type pos_ecef: numpy.ndarray + :return: Gravity acceleration in m/s^2, shape ``(3,)``. + :rtype: numpy.ndarray + """ + r = np.linalg.norm(pos_ecef) + return -Earth.MU_EARTH / (r ** 3) * pos_ecef + + @staticmethod + def coriolis_force_ecef(vel_ecef): + """ + Compute Coriolis acceleration term in ECEF frame. + + :param vel_ecef: Velocity in ECEF coordinates, shape ``(3,)``. + :type vel_ecef: numpy.ndarray + :return: Coriolis acceleration in m/s^2. + :rtype: numpy.ndarray + """ + omega = Earth.earth_rotation_rate_ecef() + return -2.0 * np.cross(omega, vel_ecef) + + @staticmethod + def centrifugal_force_accel(pos_ecef): + """ + Compute centrifugal acceleration term in ECEF frame. + + :param pos_ecef: Position in ECEF coordinates, shape ``(3,)``. + :type pos_ecef: numpy.ndarray + :return: Centrifugal acceleration in m/s^2. + :rtype: numpy.ndarray + """ + omega = Earth.earth_rotation_rate_ecef() + return -np.cross(omega, np.cross(omega, pos_ecef)) + + @staticmethod + def apply_earth_rotation_compensation(accel_ecef, vel_ecef, pos_ecef): + """ + Add Coriolis and centrifugal terms to acceleration. + + :param accel_ecef: Input acceleration in ECEF frame. + :type accel_ecef: numpy.ndarray + :param vel_ecef: Velocity in ECEF frame. + :type vel_ecef: numpy.ndarray + :param pos_ecef: Position in ECEF frame. + :type pos_ecef: numpy.ndarray + :return: Compensated acceleration in ECEF frame. + :rtype: numpy.ndarray + """ + return ( + accel_ecef + + Earth.coriolis_force_ecef(vel_ecef) + + Earth.centrifugal_force_accel(pos_ecef) + ) + +# vim: set ff=unix fenc=utf8 et sw=4 ts=4 sts=4: diff --git a/modmesh/track/npkalmanfilter.py b/modmesh/track/npkalmanfilter.py new file mode 100644 index 000000000..0d2930ded --- /dev/null +++ b/modmesh/track/npkalmanfilter.py @@ -0,0 +1,96 @@ +# Copyright (c) 2026, Chun-Shih Chang +# BSD 3-Clause License, see COPYING + + +""" +Kalman Filter implementation in numpy. +""" + + +import numpy as np + + +class KalmanFilter: + """ + Minimal linear Kalman filter implementation using NumPy. + """ + + def __init__(self, dim_x, dim_z): + """ + Initialize filter dimensions and default matrices. + + :param dim_x: State dimension. + :type dim_x: int + :param dim_z: Measurement dimension. + :type dim_z: int + :raises ValueError: If any dimension is non-positive. + """ + if dim_x <= 0 or dim_z <= 0: + raise ValueError("dim_x and dim_z must be positive") + + self.dim_x = dim_x + self.dim_z = dim_z + + self.x = np.zeros(dim_x, dtype=np.float64) + self.F = np.eye(dim_x, dtype=np.float64) + self.P = np.eye(dim_x, dtype=np.float64) + self.Q = np.eye(dim_x, dtype=np.float64) + self.H = np.zeros((dim_z, dim_x), dtype=np.float64) + self.R = np.eye(dim_z, dtype=np.float64) + + def predict(self, u=None, B=None): + """ + Perform linear prediction step. + + Implements: + ``x = F x + B u`` and ``P = F P F^T + Q``. + + :param u: Optional control input. + :type u: numpy.ndarray or None + :param B: Optional control-input matrix. + :type B: numpy.ndarray or None + :return: Predicted state and covariance. + :rtype: tuple[numpy.ndarray, numpy.ndarray] + :raises ValueError: If ``u`` is provided but ``B`` is ``None``. + """ + self.x = self.F @ self.x + if u is not None: + if B is None: + raise ValueError("B is required when u is provided") + self.x = self.x + B @ u + self.P = self.F @ self.P @ self.F.T + self.Q + return self.x, self.P + + def update(self, z, H=None, R=None): + """ + Perform linear measurement update step. + + :param z: Measurement vector. If ``None``, update is skipped. + :type z: numpy.ndarray or None + :param H: Optional measurement matrix overriding ``self.H``. + :type H: numpy.ndarray or None + :param R: Optional measurement covariance overriding ``self.R``. + :type R: numpy.ndarray or None + :return: Updated state and covariance. + :rtype: tuple[numpy.ndarray, numpy.ndarray] + """ + if z is None: + return self.x, self.P + + z = np.asarray(z, dtype=np.float64).reshape(-1) + h = self.H if H is None else np.asarray(H, dtype=np.float64) + r = self.R if R is None else np.asarray(R, dtype=np.float64) + + y = z - h @ self.x + s = h @ self.P @ h.T + r + k = self.P @ h.T @ np.linalg.inv(s) + + self.x = self.x + k @ y + + identity = np.eye(self.dim_x, dtype=np.float64) + i_kh = identity - k @ h + self.P = i_kh @ self.P @ i_kh.T + k @ r @ k.T + + return self.x, self.P + +# vim: set ff=unix fenc=utf8 et sw=4 ts=4 sts=4: diff --git a/modmesh/track/track_flight.py b/modmesh/track/track_flight.py new file mode 100644 index 000000000..bbf916682 --- /dev/null +++ b/modmesh/track/track_flight.py @@ -0,0 +1,216 @@ +# Copyright (c) 2026, Chun-Shih Chang +# BSD 3-Clause License, see COPYING + + +""" +Track nasa flight dataset with Kalman Filter. +""" + +import argparse +import numpy as np + +from .npkalmanfilter import KalmanFilter +from .dataset import NasaDataset +from .earth import Earth +from .attitude import attitude + + +def propagate_attitude(dcm_ecef_imu, imu_dangle_in_imu, dcm_con_imu): + """ + Propagate IMU attitude and convert to container quaternion. + + :param dcm_ecef_imu: Current IMU DCM in ECEF frame. + :type dcm_ecef_imu: numpy.ndarray + :param imu_dangle_in_imu: IMU delta-angle vector in IMU frame. + :type imu_dangle_in_imu: numpy.ndarray + :param dcm_con_imu: Fixed DCM from container frame to IMU frame. + :type dcm_con_imu: numpy.ndarray + :return: Updated ``(dcm_ecef_imu, quat_con_ecef)``. + :rtype: tuple[numpy.ndarray, numpy.ndarray] + """ + dcm_delta_imu = attitude.dangle_to_dcm(imu_dangle_in_imu) + dcm_ecef_imu = dcm_ecef_imu @ dcm_delta_imu + dcm_ecef_con = dcm_ecef_imu @ dcm_con_imu.T + quat_con_ecef = attitude.dcm_to_quat(dcm_ecef_con.T) + return dcm_ecef_imu, quat_con_ecef + + +def compute_accel_ecef( + dcm_ecef_imu, + imu_dvel_in_imu, + dt, + con_pos_in_ecef, + con_vel_in_ecef, + use_earth_rotation, +): + """ + Compute ECEF acceleration from IMU delta-velocity and compensation terms. + + :param dcm_ecef_imu: IMU DCM in ECEF frame. + :type dcm_ecef_imu: numpy.ndarray + :param imu_dvel_in_imu: IMU delta-velocity in IMU frame. + :type imu_dvel_in_imu: numpy.ndarray + :param dt: Time step in seconds. + :type dt: float + :param con_pos_in_ecef: Container position in ECEF. + :type con_pos_in_ecef: numpy.ndarray + :param con_vel_in_ecef: Container velocity in ECEF. + :type con_vel_in_ecef: numpy.ndarray + :param use_earth_rotation: Enable Coriolis/centrifugal compensation. + :type use_earth_rotation: bool + :return: Estimated ECEF acceleration. + :rtype: numpy.ndarray + """ + dv_ecef = dcm_ecef_imu @ imu_dvel_in_imu + accel_ecef = dv_ecef / dt + accel_ecef = accel_ecef + Earth.gravity_ecef(con_pos_in_ecef) + if use_earth_rotation: + accel_ecef = Earth.apply_earth_rotation_compensation( + accel_ecef, con_vel_in_ecef, con_pos_in_ecef + ) + return accel_ecef + + +def predict_translation(kf, accel_ecef, dt): + """ + Predict translational states with constant-acceleration discrete model. + + :param kf: Kalman filter instance. + :type kf: KalmanFilter + :param accel_ecef: Input acceleration in ECEF frame. + :type accel_ecef: numpy.ndarray + :param dt: Time step in seconds. + :type dt: float + :return: ``None``. + :rtype: None + """ + kf.F = np.eye(10) + kf.F[0:3, 3:6] = np.eye(3) * dt + control_input_matrix = np.zeros((10, 3)) + control_input_matrix[0:3, 0:3] = 0.5 * dt ** 2 * np.eye(3) + control_input_matrix[3:6, 0:3] = dt * np.eye(3) + kf.predict(u=accel_ecef, B=control_input_matrix) + + +def quat_error_angle_deg(predict, ground_truth): + """ + Compute absolute quaternion attitude error in degrees. + + :param predict: Predicted quaternion in ``[x, y, z, w]``. + :type predict: numpy.ndarray + :param ground_truth: Ground-truth quaternion in ``[x, y, z, w]``. + :type ground_truth: numpy.ndarray + :return: Attitude error angle in degrees. + :rtype: float + """ + dot = np.dot( + predict / np.linalg.norm(predict), + ground_truth / np.linalg.norm(ground_truth), + ) + dot = np.abs(dot) + dot = np.clip(dot, -1.0, 1.0) + return np.degrees(2.0 * np.arccos(dot)) + + +def run_kalman_filter(data, use_earth_rotation=True): + """ + Run prediction-only Kalman propagation on the NASA flight dataset. + + :param data: Loaded dataset object. + :type data: NasaDataset + :param use_earth_rotation: Enable Earth rotation compensation terms. + :type use_earth_rotation: bool + :return: Final-state summary dictionary. + :rtype: dict + """ + time_ns = data.dlc.timestamp + imu_dvel_in_imu = data.dlc.imu_dvel_in_imu + imu_dangle_in_imu = data.dlc.imu_dangle_in_imu + dcm_con_imu = data.dlc.dcm_con_imu + gt_con_pos_in_ecef = data.ground_truth.gt_con_pos_in_ecef + gt_con_vel_in_ecef = data.ground_truth.gt_con_vel_in_ecef + gt_quat_con_ecef = data.ground_truth.gt_quat_con_ecef + dcm_ecef_con = attitude.quat_to_dcm(gt_quat_con_ecef[0]).T + dcm_ecef_imu = dcm_ecef_con @ dcm_con_imu + + kf = KalmanFilter(dim_x=10, dim_z=1) + con_pos_in_ecef_0 = gt_con_pos_in_ecef[0] + con_vel_in_ecef_0 = gt_con_vel_in_ecef[0] + gt_quat_con_ecef_0 = gt_quat_con_ecef[0] + quat_con_ecef = gt_quat_con_ecef_0.copy() + kf.x = np.hstack( + [con_pos_in_ecef_0, con_vel_in_ecef_0, gt_quat_con_ecef_0] + ) + + for i in range(1, len(time_ns)): + dt = (time_ns[i] - time_ns[i - 1]) / 1e9 + dcm_ecef_imu, quat_con_ecef = propagate_attitude( + dcm_ecef_imu, imu_dangle_in_imu[i], dcm_con_imu + ) + + con_pos_in_ecef = kf.x[0:3] + con_vel_in_ecef = kf.x[3:6] + accel_ecef = compute_accel_ecef( + dcm_ecef_imu, + imu_dvel_in_imu[i], + dt, + con_pos_in_ecef, + con_vel_in_ecef, + use_earth_rotation + ) + predict_translation(kf, accel_ecef, dt) + kf.x[6:10] = quat_con_ecef + + con_pos_in_ecef_n1 = kf.x[0:3] + gt_con_pos_in_ecef_n1 = gt_con_pos_in_ecef[-1] + quat_con_ecef_n1 = kf.x[6:10].copy() + gt_quat_con_ecef_n1 = gt_quat_con_ecef[-1] + attitude_error_deg_n1 = quat_error_angle_deg( + quat_con_ecef_n1, + gt_quat_con_ecef_n1, + ) + + return { + "final_con_pos_in_ecef": con_pos_in_ecef_n1, + "final_gt_con_pos_in_ecef": gt_con_pos_in_ecef_n1, + "final_quat_con_ecef": quat_con_ecef_n1, + "final_gt_quat_con_ecef": gt_quat_con_ecef_n1, + "final_attitude_error_deg": attitude_error_deg_n1, + } + + +def main(): + """ + Entry point for running the track flight prediction. + + :return: ``None``. + :rtype: None + """ + parser = argparse.ArgumentParser() + parser.add_argument("--no-earth-rotation", action="store_true") + args = parser.parse_args() + + data = NasaDataset() + data.load() + result = run_kalman_filter( + data, use_earth_rotation=not args.no_earth_rotation + ) + + print("Kalman Filter Prediction Result:") + print(f"final_con_pos_in_ecef = {result['final_con_pos_in_ecef']}") + print(f"final_gt_con_pos_in_ecef = {result['final_gt_con_pos_in_ecef']}") + final_pos_error_ecef = ( + result["final_con_pos_in_ecef"] - result["final_gt_con_pos_in_ecef"] + ) + print(f"final_pos_error_ecef = {final_pos_error_ecef}") + print(f"final_error_m = {np.linalg.norm(final_pos_error_ecef)}") + + print(f"final_quat_con_ecef = {result['final_quat_con_ecef']}") + print(f"final_gt_quat_con_ecef = {result['final_gt_quat_con_ecef']}") + print(f"final_attitude_error_deg = {result['final_attitude_error_deg']}") + + +if __name__ == "__main__": + main() + +# vim: set ff=unix fenc=utf8 et sw=4 ts=4 sts=4: diff --git a/setup.py b/setup.py index f1734e57f..59562751e 100644 --- a/setup.py +++ b/setup.py @@ -71,6 +71,7 @@ def main(): 'modmesh.pilot.airfoil', 'modmesh.plot', 'modmesh.profiling', + 'modmesh.track', ], ext_modules=[CMakeExtension("_modmesh")], cmdclass={'build_ext': cmake_build_ext},