Skip to content

feat(python): add InputAligner on humble#281

Open
dvorak0 wants to merge 4 commits intoros2:humblefrom
UniflexAI:humble
Open

feat(python): add InputAligner on humble#281
dvorak0 wants to merge 4 commits intoros2:humblefrom
UniflexAI:humble

Conversation

@dvorak0
Copy link
Copy Markdown

@dvorak0 dvorak0 commented Apr 15, 2026

Summary

  • add a Python InputAligner implementation on top of the humble branch
  • export it from the Python package
  • add Python parity tests matching the C++ test_input_aligner.cpp coverage

Details

This PR targets ros2/message_filters:humble from UniflexAI/message_filters:humble.
The Python implementation follows the C++ InputAligner semantics referenced from the upstream sa-input_aligner work.

Test coverage

The Python test file covers the same 8 cases as the C++ test_input_aligner.cpp:

  • init
  • dispatch_inputs_in_order
  • ignores_inactive_inputs
  • input_timeout
  • drops_msgs
  • dispatch_by_timer
  • no_period_information
  • get_queue_status

Validation

Validated on a Humble environment with all 8 Python InputAligner tests passing.

* feat(python): add InputAligner on humble

* fix(python): use ROS time for InputAligner timestamps

* fix(python): avoid rclpy.clock_type dependency on Humble

* test(python): expand InputAligner parity coverage

* test(python): add remaining InputAligner parity cases

* style(python): polish InputAligner time helpers
@dvorak0
Copy link
Copy Markdown
Author

dvorak0 commented Apr 15, 2026

@ahcorde Hi Alejandro, could you please take a look at this PR when you have a moment? I’m not sure whether it would be better to bring InputAligner back to Humble.

If that makes sense, I can continue working on the C++ version.

Copy link
Copy Markdown
Contributor

@ahcorde ahcorde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind to target rolling ? This change is backportable

Comment thread src/message_filters/__init__.py Outdated
from rclpy.qos import QoSProfile
from rclpy.time import Time

from .input_aligner import InputAligner, QueueStatus
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from .input_aligner import InputAligner, QueueStatus
from messaege_filters.input_aligner import InputAligner, QueueStatus

? Absolute imports recommended by PEP-8 over relative importls. This option is acceptible, but maybe it is better to use absoleute, as it is not that complex.

Comment thread src/message_filters/input_aligner.py Outdated
return QueueStatus(self.active, len(self.events), self.msgs_processed, self.msgs_dropped)


class InputAligner:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not it be inheriting from SimpleFilter? It seems like a successor to that but is not. I understand that it will prevent it from being imported in the __init__.py. But the SimpleFilter is going to be removed from there in this task. After that, this filter may be easely imported there.

I may add the inheritance later, but not now? :)



class InputAligner:
def __init__(self, timeout, *filters):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add type hints?

Suggested change
def __init__(self, timeout, *filters):
def __init__(
self,
timeout: int,
*filters: list[SimpleFilter],
):

cb(*(msg + args))


def _ros_zero_time():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _ros_zero_time():
def _ros_zero_time() -> Time:

?

Comment on lines +36 to +38
def _ros_max_time():
zero = _ros_zero_time()
return Time(nanoseconds=9223372036854775807, clock_type=zero.clock_type)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _ros_max_time():
zero = _ros_zero_time()
return Time(nanoseconds=9223372036854775807, clock_type=zero.clock_type)
def _ros_max_time() -> Time:
return Time(
nanoseconds=9223372036854775807,
clock_type=_ros_zero_time().clock_type,
)

?

Comment thread src/message_filters/input_aligner.py Outdated

class _EventQueue:
def __init__(self):
self.events = []
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use Queue.queue. As one of the advantages, it is threadsafe.

self.event_queues = []
self.input_connections = []
self.signals = []
self.dispatch_timer = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add type hints for all these fields as well? May be useful for list fields for sure as the declaration does not provide any information on the stored data itself.

def disconnectAll(self):
self.input_connections = []

def registerCallback(self, index, cb, *args):
Copy link
Copy Markdown
Contributor

@EsipovPA EsipovPA Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def registerCallback(self, index, cb, *args):
def registerCallback(
self,
index: int,
callback: tp.Callable[*actual acceptible arguments and returns list*],
*args,
**kwargs,
):

Comment on lines +60 to +62
def pop_first(self):
self.events.pop(0)
self.msgs_processed += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't pop type functions return the oped value?

Suggested change
def pop_first(self):
self.events.pop(0)
self.msgs_processed += 1
def pop_first(self) -> MsgT:
self.msgs_processed += 1
return self.events.pop(0)

?

self.msgs_processed = 0
self.msgs_dropped = 0

def first_timestamp(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_first_timestamp? activate? name suggests a property that returns timestamp, but actually it is a method that changes the state of a class instance.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to keep consistency between py version and

rclcpp::Time firstTimeStamp()
. If you could help to make sure get_first_timestamp looks better, I can take it.

@dvorak0
Copy link
Copy Markdown
Author

dvorak0 commented Apr 17, 2026

Thanks for reviewing this PR.

I’m planning to contribute the same change to Rolling once this gets merged. However, I ran into an issue while testing in a ros:rolling Docker container: the current rolling branch does not seem to compile due to a CMake target export / dependency resolution problem (related to exported targets such as std_msgs).

Would you happen to know the correct fix or the intended approach here? If I can get Rolling building properly on my side, I’d be happy to validate everything there as well before sending a follow-up patch.

Thanks again for your help.

Comment on lines +131 to +136
with self.lock:
if not any(queue.events for queue in self.event_queues):
return
input_available = True
while input_available:
input_available = self._dispatch_first_message()
Copy link
Copy Markdown
Contributor

@EsipovPA EsipovPA Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with self.lock:
if not any(queue.events for queue in self.event_queues):
return
input_available = True
while input_available:
input_available = self._dispatch_first_message()
with self.lock:
if all(queue.events for queue in self.event_queues):
input_available = True
while input_available:
input_available = self._dispatch_first_message()

should it be all though?

Comment thread src/message_filters/input_aligner.py Outdated
timestamps = [queue.first_timestamp() for queue in self.event_queues]
idx = min(range(len(timestamps)), key=lambda i: timestamps[i].nanoseconds)
queue = self.event_queues[idx]
if queue.events:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that with the previous logic based on not any(queue.events for queue in self.event_queues) this condition will always be True. And with the suggested in my previous comment as well. At least there is another place, where this method is invoked. If you switch to any(queue.events for queue in self.event_queues), it could be False. But it seems to me that these two functions should be rearranged so you will not do so many loops over the lists and queues to dispatch a single message. I'll try to think on a better solution and come back as soon as I have one. Soory for not having one already. It may require some time.

Copy link
Copy Markdown
Author

@dvorak0 dvorak0 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a case:

  1. [dispatchMessages] only part of the queue are empty.
  2. [_dispatch_first_message] queue.events could be False.

That's why we might not switch to all.

By "not do so many loops", if I understand well, it's because we need to search for the "latest timestamp" by loop, even there is only single message in a single queue. A solution to that might be to switch to priority queue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

@EsipovPA
Copy link
Copy Markdown
Contributor

EsipovPA commented Apr 17, 2026

A bit nitpicky comment, but shouldn't a PR be linked to an issue?

Thanks for reviewing this PR.

I’m planning to contribute the same change to Rolling once this gets merged. However, I ran into an issue while testing in a ros:rolling Docker container: the current rolling branch does not seem to compile due to a CMake target export / dependency resolution problem (related to exported targets such as std_msgs).

Would you happen to know the correct fix or the intended approach here? If I can get Rolling building properly on my side, I’d be happy to validate everything there as well before sending a follow-up patch.

Thanks again for your help.

I'll try to take a look at the rolling build during the weekend. Though, why not target this to rolling already? As @ahcorde have proposed here. If the build is fixed, these changes may just be backported without the need to open another PR.

upd

Failed to find some time to take a look at the rolling build during the weekend. I'll try to do it during this week (20.04.26 - 24.04.26).

@dvorak0
Copy link
Copy Markdown
Author

dvorak0 commented Apr 17, 2026

A bit nitpicky comment, but shouldn't a PR be linked to an issue?

Thanks for reviewing this PR.
I’m planning to contribute the same change to Rolling once this gets merged. However, I ran into an issue while testing in a ros:rolling Docker container: the current rolling branch does not seem to compile due to a CMake target export / dependency resolution problem (related to exported targets such as std_msgs).
Would you happen to know the correct fix or the intended approach here? If I can get Rolling building properly on my side, I’d be happy to validate everything there as well before sending a follow-up patch.
Thanks again for your help.

I'll try to take a look at the rolling build during the weekend. Though, why not target this to rolling already? If the build is fixed, these changes may just be backported without the need to open another PR.

A bit nitpicky comment, but shouldn't a PR be linked to an issue?

Thanks for reviewing this PR.
I’m planning to contribute the same change to Rolling once this gets merged. However, I ran into an issue while testing in a ros:rolling Docker container: the current rolling branch does not seem to compile due to a CMake target export / dependency resolution problem (related to exported targets such as std_msgs).
Would you happen to know the correct fix or the intended approach here? If I can get Rolling building properly on my side, I’d be happy to validate everything there as well before sending a follow-up patch.
Thanks again for your help.

I'll try to take a look at the rolling build during the weekend. Though, why not target this to rolling already? If the build is fixed, these changes may just be backported without the need to open another PR.

I agree to the idea of targeting this to rolling. What prevents me before, is the problem of that I could not compile the rolling branch, and thus the testing. If that's solved, that would be perfect.

@dvorak0
Copy link
Copy Markdown
Author

dvorak0 commented Apr 17, 2026

Here is the issue: #282

@EsipovPA
Copy link
Copy Markdown
Contributor

Hello @dvorak0 !

I've managed to take a look at the building with rolling issue. I've started with the rolling branch and just cherry-picked all 4 of your commits and managed to complete the build and test your changes successfully. There were some issues with copuright and flake8, but these are minor. I did not encounter any fatal issues while building and testing these changes.

So I think you may do the same. Just rebase your branch on rolling and you should be good to go. Here is my branch: https://github.com/EsipovPA/message_filters/commits/282_input_aligner_rolling/

You may cherry-pick your commits on top of the rolling branch, or just do the rebase origin/rolling. Not sure which approach will be easyer to do in this case. I've preferred the cherry-picking, as is this case ti is required to completely swap the base branch. But it is totally up to you.

@dvorak0
Copy link
Copy Markdown
Author

dvorak0 commented Apr 24, 2026

Hello @dvorak0 !

I've managed to take a look at the building with rolling issue. I've started with the rolling branch and just cherry-picked all 4 of your commits and managed to complete the build and test your changes successfully. There were some issues with copuright and flake8, but these are minor. I did not encounter any fatal issues while building and testing these changes.

So I think you may do the same. Just rebase your branch on rolling and you should be good to go. Here is my branch: https://github.com/EsipovPA/message_filters/commits/282_input_aligner_rolling/

You may cherry-pick your commits on top of the rolling branch, or just do the rebase origin/rolling. Not sure which approach will be easyer to do in this case. I've preferred the cherry-picking, as is this case ti is required to completely swap the base branch. But it is totally up to you.

I got error with

docker run --rm -it ros:rolling bash -lc '
apt-get update &&
apt-get install -y git python3-colcon-common-extensions python3-rosdep build-essential &&
mkdir -p /ws/src &&
cd /ws/src &&
git clone -b 282_input_aligner_rolling https://github.com/EsipovPA/message_filters.git &&
cd /ws &&
. /opt/ros/rolling/setup.sh &&
colcon build --symlink-install
'

The log shows:

it is already the newest version (1:2.43.0-1ubuntu7.3).
python3-colcon-common-extensions is already the newest version (0.3.0-100).
python3-rosdep is already the newest version (0.26.0-1).
build-essential is already the newest version (12.10ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 9 not upgraded.
Cloning into 'message_filters'...
remote: Enumerating objects: 2906, done.
remote: Counting objects: 100% (1315/1315), done.
remote: Compressing objects: 100% (504/504), done.
remote: Total 2906 (delta 1118), reused 821 (delta 811), pack-reused 1591 (from 3)
Receiving objects: 100% (2906/2906), 865.43 KiB | 1.13 MiB/s, done.
Resolving deltas: 100% (1838/1838), done.
Starting >>> message_filters
--- stderr: message_filters                         
CMake Error at CMakeLists.txt:29 (target_link_libraries):
  Target "message_filters" links to:

    std_msgs::std_msgs

  but the target was not found.  Possible reasons include:

    * There is a typo in the target name.
    * A find_package call is missing for an IMPORTED target.
    * An ALIAS target is missing.



CMake Error at CMakeLists.txt:91 (target_link_libraries):
  Target "message_filters-test_subscriber" links to:

    sensor_msgs::sensor_msgs

  but the target was not found.  Possible reasons include:

    * There is a typo in the target name.
    * A find_package call is missing for an IMPORTED target.
    * An ALIAS target is missing.



CMake Error at CMakeLists.txt:127 (target_link_libraries):
  Target "message_filters-test_fuzz" links to:

    sensor_msgs::sensor_msgs

  but the target was not found.  Possible reasons include:

    * There is a typo in the target name.
    * A find_package call is missing for an IMPORTED target.
    * An ALIAS target is missing.



CMake Error at CMakeLists.txt:132 (target_link_libraries):
  Target "message_filters-test_message_traits" links to:

    std_msgs::std_msgs

  but the target was not found.  Possible reasons include:

    * There is a typo in the target name.
    * A find_package call is missing for an IMPORTED target.
    * An ALIAS target is missing.



CMake Generate step failed.  Build files cannot be regenerated correctly.
---
Failed   <<< message_filters [1.70s, exited with code 1]
                                
Summary: 0 packages finished [1.77s]
  1 package failed: message_filters
  1 package had stderr output: message_filters

@EsipovPA I'm guess there is issue regarding CMake exports. But I'm not sure.

@EsipovPA
Copy link
Copy Markdown
Contributor

EsipovPA commented Apr 24, 2026

@dvorak0
I've fixed it by rebuilding from scratch.

In my case, I had a pretty old version of a rolling branch of all the ROS2 code, so I removed build and install folders, redownloaded all the sources and built everything from scratch with

colcol build --packages-up-to message_filters --symlink-install.

But I've needed the update anyways. Maybe for you it will only take the removing of build/*_msgs and install/*_msgs. Maybe not. Needs some trying to figure it out.

@dvorak0
Copy link
Copy Markdown
Author

dvorak0 commented Apr 24, 2026

@dvorak0 I've fixed it by rebuilding from scratch.

In my case, I had a pretty old version of a rolling branch of all the ROS2 code, so I removed build and install folders, redownloaded all the sources and built everything from scratch with

colcol build --packages-up-to message_filters --symlink-install.

But I've needed the update anyways. Maybe for you it will only take the removing of build/*_msgs and install/*_msgs. Maybe not. Needs some trying to figure it out.

@EsipovPA
I have to "fixed the compilation and test" -> "passed the tests" -> “carefully get the CMakeLists.txt back".

Then I get #283 . What do you think?

I believe the std_msgs::std_msgs is the modern CMake way compared with ament_target. So the docker image of "ros:rolling" is still not compatible to compile message_filters.

What do you think of the "docker run" compilation? By using that, we could keep in the same page of dev envs.

@EsipovPA
Copy link
Copy Markdown
Contributor

EsipovPA commented Apr 24, 2026

@dvorak0 I'm not sure about the ros:rolling image. I was building and testing it on a developer's build. Like is described here.

In my case, I've successfully built a package and completed almost all the tests. Here is my output

The build

$ colcon build --packages-up-to message_filters 

...

Finished <<< message_filters [25.5s]                                

Summary: 147 packages finished [1min 15s]

The test

colcon test --packages-select message_filters --event-handlers console_cohesion+

...

93% tests passed, 2 tests failed out of 28

Label Time Summary:
copyright     =   0.23 sec*proc (1 test)
cppcheck      =   0.19 sec*proc (1 test)
cpplint       =   1.36 sec*proc (1 test)
flake8        =   0.48 sec*proc (1 test)
gtest         =   5.74 sec*proc (15 tests)
lint_cmake    =   0.19 sec*proc (1 test)
linter        =   5.26 sec*proc (8 tests)
pep257        =   0.29 sec*proc (1 test)
pytest        =   3.61 sec*proc (5 tests)
uncrustify    =   0.40 sec*proc (1 test)
xmllint       =   2.11 sec*proc (1 test)

Total Test time (real) =  14.63 sec

The following tests did not run:
         12 - message_filters-test_fuzz (Skipped)

The following tests FAILED:
         21 - copyright (Failed)
         24 - flake8 (Failed)
Errors while running CTest
Output from these tests are in: /root/ros2_rolling/build/message_filters/Testing/Temporary/LastTest.log
Use "--rerun-failed --output-on-failure" to re-run the failed cases verbosely.
---
--- stderr: message_filters
Errors while running CTest
Output from these tests are in: /root/ros2_rolling/build/message_filters/Testing/Temporary/LastTest.log
Use "--rerun-failed --output-on-failure" to re-run the failed cases verbosely.
---
Finished <<< message_filters [15.4s]    [ with test failures ]

Summary: 1 package finished [16.7s]
  1 package had stderr output: message_filters
  1 package had test failures: message_filters

Do you use the same build, or do you download a docker image and build using pre-built packages?

@dvorak0
Copy link
Copy Markdown
Author

dvorak0 commented Apr 24, 2026

@dvorak0 I'm not sure about the ros:rolling image. I was building and testing it on a developer's build. Like is described here. Do you use the same build, or do you download a docker image and build using pre-built packages?

@EsipovPA I downloaded the image and thus used the pre-built packages from the image. It might explain.

@EsipovPA
Copy link
Copy Markdown
Contributor

@dvorak0 Well, anyway, since you've opened the new PR, it should run the build with the latest rolling branch, as far as I know. So, it should build in the end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants