Skip to content

[fix][io] Fix skipped file listing loop runs in file connector#14

Open
pdolif wants to merge 4 commits intoapache:masterfrom
pdolif:fix-13
Open

[fix][io] Fix skipped file listing loop runs in file connector#14
pdolif wants to merge 4 commits intoapache:masterfrom
pdolif:fix-13

Conversation

@pdolif
Copy link
Copy Markdown

@pdolif pdolif commented Apr 4, 2026

Fixes #13

Motivation

The polling interval of the FileListingThread might not always work as expected, and file listings might be skipped.
After performing the file listing, the thread sleeps for pollingInterval - 1 milliseconds only. Besides that, there is a check to see whether the elapsed time since the last run exceeds the polling interval. For example, if a polling interval of 100ms is used, the thread might sleep 99ms, but the elapsed time since the last file listing (when files were found) must be greater than 100ms, i.e., at least 101ms. If the rest of the code execution takes less than 2ms, one loop run is skipped.

Modifications

  • Check if the time elapsed is greater than or equal the polling interval.
  • Sleep for pollingInterval instead of pollingInterval - 1 milliseconds.

Verifying this change

  • Make sure that the change passes the CI checks.

This PR adds one test for the polling interval.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: pdolif#1

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes an off-by-one/timing issue in the file connector’s polling loop that could cause directory listings to be skipped (effectively doubling the configured polling interval in some cases).

Changes:

  • Adjust listing eligibility check to use >= semantics (via <= now - pollingInterval) so runs aren’t skipped at the exact interval boundary.
  • Sleep for pollingInterval instead of pollingInterval - 1.
  • Add a test intended to validate polling interval behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java Fixes the polling loop boundary condition and sleep duration.
file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java Adds a test to exercise the polling interval behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +179 to +187
generateFiles(1);
Thread.sleep(pollingInterval + tolerance);

verify(workQueue, times(1)).offer(any(File.class));

generateFiles(1);
Thread.sleep(pollingInterval + tolerance);

verify(workQueue, times(2)).offer(any(File.class));
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is timing-sensitive and can be flaky on slower/loaded CI runners: the listing thread might not get scheduled within pollingInterval + tolerance, causing the times(1) / times(2) assertions to fail even when behavior is correct. Consider replacing the fixed Thread.sleep(...) + exact call counts with an eventual assertion (e.g., Mockito verify(..., timeout(...)).offer(...) / after(...), or a small polling loop with an overall deadline) so the test waits up to a bound for the expected offers.

Copilot uses AI. Check for mistakes.
sleep(pollingInterval - 1);
sleep(pollingInterval);
} catch (InterruptedException e) {
// Just ignore
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InterruptedException is swallowed here, so the thread cannot be shut down cleanly and the interrupt flag is lost. Since this code is being modified, consider restoring the interrupt status (Thread.currentThread().interrupt()) and breaking out of the loop (or otherwise terminating) to make the thread responsive to shutdown/cancellation.

Suggested change
// Just ignore
Thread.currentThread().interrupt();
break;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pdolif This look like a valid point made by the review, although this isn't related to your change. Another detail is that while (true) { should be replaced with while (!Thread.currentThread().isInterrupted()) { so that the thread would shutdown gracefully when the JVM stops.

btw. This class should extend java.lang.Thread at all since it's used this way:

// One extra for the File listing thread, and another for the cleanup thread
executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2);
executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed));
executor.execute(new ProcessedFileThread(fileConfig, recentlyProcessed));
for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) {
executor.execute(new FileConsumerThread(this, workQueue, inProcess, recentlyProcessed));
}

A better approach would be to just implement Runnable.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari It is reflected now. What about FileConsumerThread and ProcessedFileThread? I think for them it is the same and should also be changed. I am just not sure whether it should be done in this PR or in a separate one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

File connector: Some file listing loop runs might be skipped

3 participants