diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java similarity index 91% rename from file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java rename to file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java index 5958fc3748..7e399dc870 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileListingTask.java @@ -35,11 +35,11 @@ import org.apache.commons.lang3.StringUtils; /** - * Worker thread that checks the configured input directory for + * Worker that checks the configured input directory for * files that meet the provided filtering criteria, and publishes * them to a work queue for processing by the FileConsumerThreads. */ -public class FileListingThread extends Thread { +public class FileListingTask implements Runnable { private final AtomicLong queueLastUpdated = new AtomicLong(0L); private final Lock listingLock = new ReentrantLock(); @@ -53,10 +53,10 @@ public class FileListingThread extends Thread { private final boolean keepOriginal; private final long pollingInterval; - public FileListingThread(FileSourceConfig fileConfig, - BlockingQueue workQueue, - BlockingQueue inProcess, - BlockingQueue recentlyProcessed) { + public FileListingTask(FileSourceConfig fileConfig, + BlockingQueue workQueue, + BlockingQueue inProcess, + BlockingQueue recentlyProcessed) { this.workQueue = workQueue; this.inProcess = inProcess; this.recentlyProcessed = recentlyProcessed; @@ -68,9 +68,10 @@ public FileListingThread(FileSourceConfig fileConfig, fileFilterRef.set(createFileFilter(fileConfig)); } + @Override public void run() { - while (true) { - if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { + while (!Thread.currentThread().isInterrupted()) { + if ((queueLastUpdated.get() <= System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { try { final File directory = new File(inputDir); final Set listing = performListing(directory, fileFilterRef.get(), recurseDirs); @@ -97,9 +98,10 @@ public void run() { } try { - sleep(pollingInterval - 1); + Thread.sleep(pollingInterval); } catch (InterruptedException e) { - // Just ignore + Thread.currentThread().interrupt(); + break; } } } diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileSource.java b/file/src/main/java/org/apache/pulsar/io/file/FileSource.java index c201977d56..85d46c519f 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileSource.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileSource.java @@ -45,9 +45,9 @@ public void open(Map config, SourceContext sourceContext) throws FileSourceConfig fileConfig = FileSourceConfig.load(config); fileConfig.validate(); - // One extra for the File listing thread, and another for the cleanup thread + // One extra for the File listing task, and another for the cleanup thread executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2); - executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed)); + executor.execute(new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed)); executor.execute(new ProcessedFileThread(fileConfig, recentlyProcessed)); for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) { diff --git a/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java b/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java index 1cfa2202eb..70e820a3fd 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTest.java @@ -49,7 +49,7 @@ public abstract class AbstractFileTest { protected BlockingQueue producedFiles; protected TestFileGenerator generatorThread; - protected FileListingThread listingThread; + protected FileListingTask listingTask; protected ExecutorService executor; protected Path directory; diff --git a/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java b/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java index 6f1d958dde..6f2a182e57 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTest.java @@ -50,9 +50,9 @@ public final void singleFileTest() throws IOException { try { generateFiles(1); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); Thread.sleep(2000); @@ -85,9 +85,9 @@ public final void mulitpleFileTest() throws IOException { try { generateFiles(50, 2); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); Thread.sleep(2000); @@ -120,9 +120,9 @@ public final void multiLineFileTest() throws IOException { try { generateFiles(1, 10); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); Thread.sleep(2000); diff --git a/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java b/file/src/test/java/org/apache/pulsar/io/file/FileListingTaskTest.java similarity index 76% rename from file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java rename to file/src/test/java/org/apache/pulsar/io/file/FileListingTaskTest.java index 8df748d887..d4fdeef9b1 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/FileListingTaskTest.java @@ -30,7 +30,7 @@ import org.testng.annotations.Test; -public class FileListingThreadTest extends AbstractFileTest { +public class FileListingTaskTest extends AbstractFileTest { @Test public final void singleFileTest() throws IOException { @@ -40,8 +40,8 @@ public final void singleFileTest() throws IOException { try { generateFiles(1); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(producedFiles, times(1)).put(any(File.class)); verify(workQueue, times(1)).offer(any(File.class)); @@ -63,8 +63,8 @@ public final void fiftyFileTest() throws IOException { try { generateFiles(50); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(50)).offer(any(File.class)); @@ -86,8 +86,8 @@ public final void minimumSizeTest() throws IOException { try { // Create 50 zero size files generateFiles(50, 0); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(0)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -106,8 +106,8 @@ public final void maximumSizeTest() throws IOException { // Create 5 files that exceed the limit and 45 that don't generateFiles(5, 1000); generateFiles(45, 10); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(45)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -127,8 +127,8 @@ public final void minimumAgeTest() throws IOException { try { // Create 5 files that will be too "new" for processing generateFiles(5); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(0)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -152,8 +152,8 @@ public final void maximumAgeTest() throws IOException { // Create 5 files that will be too "old" for processing generateFiles(5); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -163,6 +163,35 @@ public final void maximumAgeTest() throws IOException { } } + @Test + public void pollingIntervalTest() throws IOException { + int pollingInterval = 100; + int tolerance = 20; + + Map map = new HashMap<>(); + map.put("inputDirectory", directory.toString()); + map.put("pollingInterval", pollingInterval); + + try { + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); + + generateFiles(1); + Thread.sleep(pollingInterval + tolerance); + + verify(workQueue, times(1)).offer(any(File.class)); + + generateFiles(1); + Thread.sleep(pollingInterval + tolerance); + + verify(workQueue, times(2)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } + @Test public final void doRecurseTest() throws IOException { @@ -176,8 +205,8 @@ public final void doRecurseTest() throws IOException { // Create 5 files in a sub-folder generateFiles(5, 1, directory.toString() + File.separator + "sub-dir"); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(10)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -200,8 +229,8 @@ public final void doNotRecurseTest() throws IOException { // Create 5 files in a sub-folder generateFiles(5, 1, directory.toString() + File.separator + "sub-dir"); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -223,8 +252,8 @@ public final void pathFilterTest() throws IOException { // Create 5 files in a sub-folder generateFiles(5, 1, directory.toString() + File.separator + "sub-dir-a"); generateFiles(5, 1, directory.toString() + File.separator + "dir-b"); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -246,8 +275,8 @@ public final void processedFileFilterTest() throws IOException { try { generateFiles(5, 1, directory.toString(), ".txt"); generateFiles(1, 1, directory.toString(), processedFileSuffix); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(5)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { @@ -269,8 +298,8 @@ public final void processedFileFilterTest2() throws IOException { try { generateFiles(5, 1, directory.toString(), ".txt"); generateFiles(1, 1, directory.toString(), processedFileSuffix); - listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); - executor.execute(listingThread); + listingTask = new FileListingTask(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingTask); Thread.sleep(2000); verify(workQueue, times(6)).offer(any(File.class)); } catch (InterruptedException | ExecutionException e) { diff --git a/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java b/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java index 0b1f8b1584..4d50eb8dc1 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTest.java @@ -54,10 +54,10 @@ public final void singleFileTest() throws IOException { try { generateFiles(1); fileConfig = FileSourceConfig.load(map); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); Thread.sleep(2000); @@ -93,10 +93,10 @@ public final void multipleFileTest() throws IOException { try { generateFiles(50); fileConfig = FileSourceConfig.load(map); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); Thread.sleep(2000); @@ -134,10 +134,10 @@ public final void keepFileTest() throws IOException { try { generateFiles(1); fileConfig = FileSourceConfig.load(map); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); Thread.sleep(7900); // Should pull the same file 5 times? @@ -173,10 +173,10 @@ public final void continuousRunTest() throws IOException { directory.toString(), "continuous", ".txt", getPermissions()); executor.execute(generatorThread); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread); @@ -222,14 +222,14 @@ public final void multipleConsumerTest() throws IOException { directory.toString(), "continuous", ".txt", getPermissions()); executor.execute(generatorThread); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); FileConsumerThread consumerThread2 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); FileConsumerThread consumerThread3 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(consumerThread2); executor.execute(consumerThread3); @@ -280,10 +280,10 @@ public final void renameFileTest() throws IOException { "continuous", ".txt", getPermissions()); executor.execute(generatorThread); - listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + listingTask = new FileListingTask(fileConfig, workQueue, inProcess, recentlyProcessed); consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); - executor.execute(listingThread); + executor.execute(listingTask); executor.execute(consumerThread); executor.execute(cleanupThread);