From 39ec039e7fcaada7196c5b42a106b5c5399e9bbb Mon Sep 17 00:00:00 2001 From: Philipp Dolif Date: Sat, 4 Apr 2026 12:44:12 +0200 Subject: [PATCH 1/4] [fix][io] Fix skipped file listing loop runs in file connector --- .../pulsar/io/file/FileListingThread.java | 4 +-- .../pulsar/io/file/FileListingThreadTest.java | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java index 5958fc3748..7b6612ccd9 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java @@ -70,7 +70,7 @@ public FileListingThread(FileSourceConfig fileConfig, public void run() { while (true) { - if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { + if ((queueLastUpdated.get() <= System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { try { final File directory = new File(inputDir); final Set listing = performListing(directory, fileFilterRef.get(), recurseDirs); @@ -97,7 +97,7 @@ public void run() { } try { - sleep(pollingInterval - 1); + sleep(pollingInterval); } catch (InterruptedException e) { // Just ignore } diff --git a/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java b/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java index 8df748d887..864f48d39f 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java @@ -163,6 +163,35 @@ public final void maximumAgeTest() throws IOException { } } + @Test + public void pollingIntervalTest() throws IOException { + int pollingInterval = 100; + int tolerance = 20; + + Map map = new HashMap<>(); + map.put("inputDirectory", directory.toString()); + map.put("pollingInterval", pollingInterval); + + try { + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + + generateFiles(1); + Thread.sleep(pollingInterval + tolerance); + + verify(workQueue, times(1)).offer(any(File.class)); + + generateFiles(1); + Thread.sleep(pollingInterval + tolerance); + + verify(workQueue, times(2)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } + @Test public final void doRecurseTest() throws IOException { From 8648b9f71e4729a79b6fba0f20bc1b13d4876000 Mon Sep 17 00:00:00 2001 From: Philipp Dolif Date: Sat, 18 Apr 2026 10:36:52 +0200 Subject: [PATCH 2/4] FileListingThread: Change from Thread to Runnable --- .../java/org/apache/pulsar/io/file/FileListingThread.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java index 7b6612ccd9..be2eb614da 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java @@ -35,11 +35,11 @@ import org.apache.commons.lang3.StringUtils; /** - * Worker thread that checks the configured input directory for + * Worker that checks the configured input directory for * files that meet the provided filtering criteria, and publishes * them to a work queue for processing by the FileConsumerThreads. */ -public class FileListingThread extends Thread { +public class FileListingThread implements Runnable { private final AtomicLong queueLastUpdated = new AtomicLong(0L); private final Lock listingLock = new ReentrantLock(); @@ -68,6 +68,7 @@ public FileListingThread(FileSourceConfig fileConfig, fileFilterRef.set(createFileFilter(fileConfig)); } + @Override public void run() { while (true) { if ((queueLastUpdated.get() <= System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { @@ -97,7 +98,7 @@ public void run() { } try { - sleep(pollingInterval); + Thread.sleep(pollingInterval); } catch (InterruptedException e) { // Just ignore } From 6d040071323dbb0620c41fa7d410a78dd73d49b1 Mon Sep 17 00:00:00 2001 From: Philipp Dolif Date: Sat, 18 Apr 2026 10:41:18 +0200 Subject: [PATCH 3/4] Rename FileListingThread to FileListingTask --- ...istingThread.java => FileListingTask.java} | 10 ++-- .../org/apache/pulsar/io/file/FileSource.java | 4 +- .../pulsar/io/file/AbstractFileTest.java | 2 +- .../io/file/FileConsumerThreadTest.java | 12 ++--- ...readTest.java => FileListingTaskTest.java} | 50 +++++++++---------- .../io/file/ProcessedFileThreadTest.java | 24 ++++----- 6 files changed, 51 insertions(+), 51 deletions(-) rename file/src/main/java/org/apache/pulsar/io/file/{FileListingThread.java => FileListingTask.java} (96%) rename file/src/test/java/org/apache/pulsar/io/file/{FileListingThreadTest.java => FileListingTaskTest.java} (82%) diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java similarity index 96% rename from file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java rename to file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java index be2eb614da..76cfb356bb 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java @@ -39,7 +39,7 @@ * files that meet the provided filtering criteria, and publishes * them to a work queue for processing by the FileConsumerThreads. */ -public class FileListingThread implements Runnable { +public class FileListingTask implements Runnable { private final AtomicLong queueLastUpdated = new AtomicLong(0L); private final Lock listingLock = new ReentrantLock(); @@ -53,10 +53,10 @@ public class FileListingThread implements Runnable { private final boolean keepOriginal; private final long pollingInterval; - public FileListingThread(FileSourceConfig fileConfig, - BlockingQueue workQueue, - BlockingQueue inProcess, - BlockingQueue recentlyProcessed) { + public FileListingTask(FileSourceConfig fileConfig, + BlockingQueue workQueue, + BlockingQueue inProcess, + BlockingQueue recentlyProcessed) { this.workQueue = workQueue; this.inProcess = inProcess; this.recentlyProcessed = recentlyProcessed; diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileSource.java b/file/src/main/java/org/apache/pulsar/io/file/FileSource.java index c201977d56..85d46c519f 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileSource.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileSource.java @@ -45,9 +45,9 @@ public void open(Map config, SourceContext sourceContext) throws FileSourceConfig fileConfig = FileSourceConfig.load(config); fileConfig.validate(); - // One extra for the File listing thread, and another for the cleanup thread + // One extra for the File listing task, and another for the cleanup thread executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2); - executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed)); + executor.execute(new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed)); executor.execute(new ProcessedFileThread(fileConfig, recentlyProcessed)); for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) { diff --git a/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java b/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java index 1cfa2202eb..70e820a3fd 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java @@ -49,7 +49,7 @@ public abstract class AbstractFileTest { protected BlockingQueue producedFiles; protected TestFileGenerator generatorThread; - protected FileListingThread listingThread; + protected FileListingTask listingTask; protected ExecutorService executor; protected Path directory; diff --git a/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java b/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java index 6f1d958dde..6f2a182e57 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java @@ -50,9 +50,9 @@ public final void singleFileTest() throws IOException { try { generateFiles(1); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); Thread.sleep(2000); @@ -85,9 +85,9 @@ public final void mulitpleFileTest() throws IOException { try { generateFiles(50, 2); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); Thread.sleep(2000); @@ -120,9 +120,9 @@ public final void multiLineFileTest() throws IOException { try { generateFiles(1, 10); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); Thread.sleep(2000); diff --git a/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java b/file/src/test/java/org/apache/pulsar/io/file/FileListingTaskTest.java similarity index 82% rename from file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java rename to file/src/test/java/org/apache/pulsar/io/file/FileListingTaskTest.java index 864f48d39f..d4fdeef9b1 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/FileListingTaskTest.java @@ -30,7 +30,7 @@ import org.testng.annotations.Test; -public class FileListingThreadTest extends AbstractFileTest { +public class FileListingTaskTest extends AbstractFileTest { @Test public final void singleFileTest() throws IOException { @@ -40,8 +40,8 @@ public final void singleFileTest() throws IOException { try { generateFiles(1); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(producedFiles, times(1)).put(any(File.class)); verify(workQueue, times(1)).offer(any(File.class)); @@ -63,8 +63,8 @@ public final void fiftyFileTest() throws IOException { try { generateFiles(50); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(50)).offer(any(File.class)); @@ -86,8 +86,8 @@ public final void minimumSizeTest() throws IOException { try { // Create 50 zero size files generateFiles(50, 0); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(0)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -106,8 +106,8 @@ public final void maximumSizeTest() throws IOException { // Create 5 files that exceed the limit and 45 that don't generateFiles(5, 1000); generateFiles(45, 10); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(45)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -127,8 +127,8 @@ public final void minimumAgeTest() throws IOException { try { // Create 5 files that will be too "new" for processing generateFiles(5); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(0)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -152,8 +152,8 @@ public final void maximumAgeTest() throws IOException { // Create 5 files that will be too "old" for processing generateFiles(5); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -173,8 +173,8 @@ public void pollingIntervalTest() throws IOException { map.put("pollingInterval", pollingInterval); try { - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); generateFiles(1); Thread.sleep(pollingInterval + tolerance); @@ -205,8 +205,8 @@ public final void doRecurseTest() throws IOException { // Create 5 files in a sub-folder generateFiles(5, 1, directory.toString() + File.separator + "sub-dir"); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(10)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -229,8 +229,8 @@ public final void doNotRecurseTest() throws IOException { // Create 5 files in a sub-folder generateFiles(5, 1, directory.toString() + File.separator + "sub-dir"); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -252,8 +252,8 @@ public final void pathFilterTest() throws IOException { // Create 5 files in a sub-folder generateFiles(5, 1, directory.toString() + File.separator + "sub-dir-a"); generateFiles(5, 1, directory.toString() + File.separator + "dir-b"); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -275,8 +275,8 @@ public final void processedFileFilterTest() throws IOException { try { generateFiles(5, 1, directory.toString(), ".txt"); generateFiles(1, 1, directory.toString(), processedFileSuffix); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -298,8 +298,8 @@ public final void processedFileFilterTest2() throws IOException { try { generateFiles(5, 1, directory.toString(), ".txt"); generateFiles(1, 1, directory.toString(), processedFileSuffix); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(6)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { diff --git a/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java b/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java index 0b1f8b1584..4d50eb8dc1 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java @@ -54,10 +54,10 @@ public final void singleFileTest() throws IOException { try { generateFiles(1); fileConfig = FileSourceConfig.load(map); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); Thread.sleep(2000); @@ -93,10 +93,10 @@ public final void multipleFileTest() throws IOException { try { generateFiles(50); fileConfig = FileSourceConfig.load(map); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); Thread.sleep(2000); @@ -134,10 +134,10 @@ public final void keepFileTest() throws IOException { try { generateFiles(1); fileConfig = FileSourceConfig.load(map); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); Thread.sleep(7900); // Should pull the same file 5 times? @@ -173,10 +173,10 @@ public final void continuousRunTest() throws IOException { directory.toString(), "continuous", ".txt", getPermissions()); executor.execute(generatorThread); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); @@ -222,14 +222,14 @@ public final void multipleConsumerTest() throws IOException { directory.toString(), "continuous", ".txt", getPermissions()); executor.execute(generatorThread); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); FileConsumerThread consumerThread2 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); FileConsumerThread consumerThread3 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(consumerThread2); executor.execute(consumerThread3); @@ -280,10 +280,10 @@ public final void renameFileTest() throws IOException { "continuous", ".txt", getPermissions()); executor.execute(generatorThread); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); From 05f687f59c7bb69eca2a7c1de15aad28d324dc63 Mon Sep 17 00:00:00 2001 From: Philipp Dolif Date: Sat, 18 Apr 2026 10:43:08 +0200 Subject: [PATCH 4/4] Respect interrupts in FileListingTask polling loop --- .../main/java/org/apache/pulsar/io/file/FileListingTask.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java b/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java index 76cfb356bb..7e399dc870 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java @@ -70,7 +70,7 @@ public FileListingTask(FileSourceConfig fileConfig, @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { if ((queueLastUpdated.get() <= System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { try { final File directory = new File(inputDir); @@ -100,7 +100,8 @@ public void run() { try { Thread.sleep(pollingInterval); } catch (InterruptedException e) { - // Just ignore + Thread.currentThread().interrupt(); + break; } } }