Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
import org.apache.commons.lang3.StringUtils;

/**
* Worker thread that checks the configured input directory for
* Worker that checks the configured input directory for
* files that meet the provided filtering criteria, and publishes
* them to a work queue for processing by the FileConsumerThreads.
*/
public class FileListingThread extends Thread {
public class FileListingTask implements Runnable {

private final AtomicLong queueLastUpdated = new AtomicLong(0L);
private final Lock listingLock = new ReentrantLock();
Expand All @@ -53,10 +53,10 @@ public class FileListingThread extends Thread {
private final boolean keepOriginal;
private final long pollingInterval;

public FileListingThread(FileSourceConfig fileConfig,
BlockingQueue<File> workQueue,
BlockingQueue<File> inProcess,
BlockingQueue<File> recentlyProcessed) {
public FileListingTask(FileSourceConfig fileConfig,
BlockingQueue<File> workQueue,
BlockingQueue<File> inProcess,
BlockingQueue<File> recentlyProcessed) {
this.workQueue = workQueue;
this.inProcess = inProcess;
this.recentlyProcessed = recentlyProcessed;
Expand All @@ -68,9 +68,10 @@ public FileListingThread(FileSourceConfig fileConfig,
fileFilterRef.set(createFileFilter(fileConfig));
}

@Override
public void run() {
while (true) {
if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) {
while (!Thread.currentThread().isInterrupted()) {
if ((queueLastUpdated.get() <= System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) {
try {
final File directory = new File(inputDir);
final Set<File> listing = performListing(directory, fileFilterRef.get(), recurseDirs);
Expand All @@ -97,9 +98,10 @@ public void run() {
}

try {
sleep(pollingInterval - 1);
Thread.sleep(pollingInterval);
} catch (InterruptedException e) {
// Just ignore
Thread.currentThread().interrupt();
break;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions file/src/main/java/org/apache/pulsar/io/file/FileSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
FileSourceConfig fileConfig = FileSourceConfig.load(config);
fileConfig.validate();

// One extra for the File listing thread, and another for the cleanup thread
// One extra for the File listing task, and another for the cleanup thread
executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2);
executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed));
executor.execute(new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed));
executor.execute(new ProcessedFileThread(fileConfig, recentlyProcessed));

for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class AbstractFileTest {
protected BlockingQueue<File> producedFiles;

protected TestFileGenerator generatorThread;
protected FileListingThread listingThread;
protected FileListingTask listingTask;
protected ExecutorService executor;

protected Path directory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public final void singleFileTest() throws IOException {

try {
generateFiles(1);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
Thread.sleep(2000);

Expand Down Expand Up @@ -85,9 +85,9 @@ public final void mulitpleFileTest() throws IOException {

try {
generateFiles(50, 2);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
Thread.sleep(2000);

Expand Down Expand Up @@ -120,9 +120,9 @@ public final void multiLineFileTest() throws IOException {

try {
generateFiles(1, 10);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
Thread.sleep(2000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.testng.annotations.Test;


public class FileListingThreadTest extends AbstractFileTest {
public class FileListingTaskTest extends AbstractFileTest {

@Test
public final void singleFileTest() throws IOException {
Expand All @@ -40,8 +40,8 @@ public final void singleFileTest() throws IOException {

try {
generateFiles(1);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(producedFiles, times(1)).put(any(File.class));
verify(workQueue, times(1)).offer(any(File.class));
Expand All @@ -63,8 +63,8 @@ public final void fiftyFileTest() throws IOException {

try {
generateFiles(50);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(50)).offer(any(File.class));

Expand All @@ -86,8 +86,8 @@ public final void minimumSizeTest() throws IOException {
try {
// Create 50 zero size files
generateFiles(50, 0);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(0)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -106,8 +106,8 @@ public final void maximumSizeTest() throws IOException {
// Create 5 files that exceed the limit and 45 that don't
generateFiles(5, 1000);
generateFiles(45, 10);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(45)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -127,8 +127,8 @@ public final void minimumAgeTest() throws IOException {
try {
// Create 5 files that will be too "new" for processing
generateFiles(5);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(0)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -152,8 +152,8 @@ public final void maximumAgeTest() throws IOException {

// Create 5 files that will be too "old" for processing
generateFiles(5);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(5)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -163,6 +163,35 @@ public final void maximumAgeTest() throws IOException {
}
}

@Test
public void pollingIntervalTest() throws IOException {
int pollingInterval = 100;
int tolerance = 20;

Map<String, Object> map = new HashMap<>();
map.put("inputDirectory", directory.toString());
map.put("pollingInterval", pollingInterval);

try {
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);

generateFiles(1);
Thread.sleep(pollingInterval + tolerance);

verify(workQueue, times(1)).offer(any(File.class));

generateFiles(1);
Thread.sleep(pollingInterval + tolerance);

verify(workQueue, times(2)).offer(any(File.class));
Comment on lines +179 to +187
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is timing-sensitive and can be flaky on slower/loaded CI runners: the listing thread might not get scheduled within pollingInterval + tolerance, causing the times(1) / times(2) assertions to fail even when behavior is correct. Consider replacing the fixed Thread.sleep(...) + exact call counts with an eventual assertion (e.g., Mockito verify(..., timeout(...)).offer(...) / after(...), or a small polling loop with an overall deadline) so the test waits up to a bound for the expected offers.

Copilot uses AI. Check for mistakes.
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
} finally {
cleanUp();
}
}

@Test
public final void doRecurseTest() throws IOException {

Expand All @@ -176,8 +205,8 @@ public final void doRecurseTest() throws IOException {

// Create 5 files in a sub-folder
generateFiles(5, 1, directory.toString() + File.separator + "sub-dir");
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(10)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -200,8 +229,8 @@ public final void doNotRecurseTest() throws IOException {

// Create 5 files in a sub-folder
generateFiles(5, 1, directory.toString() + File.separator + "sub-dir");
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(5)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -223,8 +252,8 @@ public final void pathFilterTest() throws IOException {
// Create 5 files in a sub-folder
generateFiles(5, 1, directory.toString() + File.separator + "sub-dir-a");
generateFiles(5, 1, directory.toString() + File.separator + "dir-b");
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(5)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -246,8 +275,8 @@ public final void processedFileFilterTest() throws IOException {
try {
generateFiles(5, 1, directory.toString(), ".txt");
generateFiles(1, 1, directory.toString(), processedFileSuffix);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(5)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -269,8 +298,8 @@ public final void processedFileFilterTest2() throws IOException {
try {
generateFiles(5, 1, directory.toString(), ".txt");
generateFiles(1, 1, directory.toString(), processedFileSuffix);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingTask);
Thread.sleep(2000);
verify(workQueue, times(6)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public final void singleFileTest() throws IOException {
try {
generateFiles(1);
fileConfig = FileSourceConfig.load(map);
listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
executor.execute(cleanupThread);
Thread.sleep(2000);
Expand Down Expand Up @@ -93,10 +93,10 @@ public final void multipleFileTest() throws IOException {
try {
generateFiles(50);
fileConfig = FileSourceConfig.load(map);
listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
executor.execute(cleanupThread);
Thread.sleep(2000);
Expand Down Expand Up @@ -134,10 +134,10 @@ public final void keepFileTest() throws IOException {
try {
generateFiles(1);
fileConfig = FileSourceConfig.load(map);
listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
executor.execute(cleanupThread);
Thread.sleep(7900); // Should pull the same file 5 times?
Expand Down Expand Up @@ -173,10 +173,10 @@ public final void continuousRunTest() throws IOException {
directory.toString(), "continuous", ".txt", getPermissions());
executor.execute(generatorThread);

listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
executor.execute(cleanupThread);

Expand Down Expand Up @@ -222,14 +222,14 @@ public final void multipleConsumerTest() throws IOException {
directory.toString(), "continuous", ".txt", getPermissions());
executor.execute(generatorThread);

listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
FileConsumerThread consumerThread2 = new FileConsumerThread(consumer, workQueue, inProcess,
recentlyProcessed);
FileConsumerThread consumerThread3 = new FileConsumerThread(consumer, workQueue, inProcess,
recentlyProcessed);
cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
executor.execute(consumerThread2);
executor.execute(consumerThread3);
Expand Down Expand Up @@ -280,10 +280,10 @@ public final void renameFileTest() throws IOException {
"continuous", ".txt", getPermissions());
executor.execute(generatorThread);

listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
executor.execute(listingThread);
executor.execute(listingTask);
executor.execute(consumerThread);
executor.execute(cleanupThread);

Expand Down