From f4aa7fe2ca12bdbc3eb7f2ce5a338c2f9293e503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Harangoz=C3=B3?= Date: Fri, 3 Apr 2026 08:18:50 +0200 Subject: [PATCH 1/2] [fix][io] JDBC sink: prevent OOM from unbounded queue on connection failure The JDBC sink's internal queue (incomingList) is unbounded. When the database connection drops, executeBatch() hangs until the TCP socket times out. During this period, isFlushing stays true, preventing any draining, while write() continues accepting records without limit. This causes OutOfMemoryError in production. This commit fixes 4 issues: 1. Bounded internal queue: write() now rejects records when queue exceeds maxQueueSize (configurable, defaults to 10x batchSize), applying Pulsar-level back-pressure via negative acknowledgment. 2. State check in write(): records are failed immediately when the sink state is not OPEN (after fatal() or close()). 3. Connection validation and reconnection: ensureConnection() validates the JDBC connection before each flush and reconnects automatically on failure, allowing recovery from transient database outages. 4. Scheduled flush cancellation: fatal() and close() now cancel the periodic flush task to prevent repeated failures on a broken connection. Fixes https://github.com/apache/pulsar/issues/25030 --- .../pulsar/io/jdbc/JdbcAbstractSink.java | 83 +++++++++- .../apache/pulsar/io/jdbc/JdbcSinkConfig.java | 12 ++ .../pulsar/io/jdbc/SqliteJdbcSinkTest.java | 145 ++++++++++++++++++ 3 files changed, 235 insertions(+), 5 deletions(-) diff --git a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 73ba6b712f..e6eda75bfe 100644 --- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -33,6 +33,7 @@ import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -78,8 +79,12 @@ private enum State { private Deque> incomingList; private AtomicBoolean isFlushing; private int batchSize; + private int maxQueueSize; private ScheduledExecutorService flushExecutor; + private ScheduledFuture scheduledFlushTask; private SinkContext sinkContext; + private Properties connectionProperties; + private volatile boolean queueFullLogged = false; private final AtomicReference state = new AtomicReference<>(State.OPEN); @Override @@ -93,17 +98,17 @@ public void open(Map config, SinkContext sinkContext) throws Exc throw new IllegalArgumentException("Required jdbc Url not set."); } - Properties properties = new Properties(); + connectionProperties = new Properties(); String username = jdbcSinkConfig.getUserName(); String password = jdbcSinkConfig.getPassword(); if (username != null) { - properties.setProperty("user", username); + connectionProperties.setProperty("user", username); } if (password != null) { - properties.setProperty("password", password); + connectionProperties.setProperty("password", password); } - connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties); + connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties); connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions()); log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); @@ -114,12 +119,20 @@ public void open(Map config, SinkContext sinkContext) throws Exc int timeoutMs = jdbcSinkConfig.getTimeoutMs(); batchSize = jdbcSinkConfig.getBatchSize(); + maxQueueSize = jdbcSinkConfig.getMaxQueueSize(); + if (maxQueueSize == 0) { + // Auto-size: default to 10x batch size + maxQueueSize = batchSize > 0 ? batchSize * 10 : 10000; + } + // maxQueueSize < 0 (e.g. -1) means unbounded (legacy behavior) + log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? maxQueueSize : "unbounded"); incomingList = new LinkedList<>(); isFlushing = new AtomicBoolean(false); flushExecutor = Executors.newScheduledThreadPool(1); if (timeoutMs > 0) { - flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); + scheduledFlushTask = flushExecutor.scheduleAtFixedRate( + this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); } } @@ -158,6 +171,10 @@ private static List getListFromConfig(String jdbcSinkConfig) { @Override public void close() throws Exception { state.set(State.CLOSED); + if (scheduledFlushTask != null) { + scheduledFlushTask.cancel(false); + scheduledFlushTask = null; + } if (flushExecutor != null) { int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2; flushExecutor.shutdown(); @@ -188,8 +205,22 @@ public void close() throws Exception { @Override public void write(Record record) throws Exception { + if (state.get() != State.OPEN) { + log.warn("Sink is not in OPEN state (current: {}), failing record", state.get()); + record.fail(); + return; + } int number; synchronized (incomingList) { + if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) { + if (!queueFullLogged) { + log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure", + incomingList.size(), maxQueueSize); + queueFullLogged = true; + } + record.fail(); + return; + } incomingList.add(record); number = incomingList.size(); } @@ -239,6 +270,9 @@ protected enum MutationType { private void flush() { + if (state.get() == State.CLOSED) { + return; + } if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) { boolean needAnotherRound; final Deque> swapList = new LinkedList<>(); @@ -259,6 +293,8 @@ private void flush() { int count = 0; try { + ensureConnection(); + PreparedStatement currentBatch = null; final List mutations = swapList .stream() @@ -308,6 +344,7 @@ private void flush() { } else { internalFlush(swapList); } + queueFullLogged = false; } catch (Exception e) { log.error("Got exception {} after {} ms, failing {} messages", e.getMessage(), @@ -336,6 +373,38 @@ private void flush() { } } + private void ensureConnection() throws Exception { + try { + if (connection != null && connection.isValid(2)) { + return; + } + } catch (SQLException e) { + log.warn("Connection validation failed: {}", e.getMessage()); + } + + log.info("JDBC connection is invalid, attempting to reconnect to: {}", jdbcUrl); + closeConnectionQuietly(); + + connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties); + connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions()); + + tableId = JdbcUtils.getTableId(connection, tableName); + initStatement(); + + log.info("Successfully reconnected to: {}", jdbcUrl); + } + + private void closeConnectionQuietly() { + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + log.debug("Error closing stale connection", e); + } + connection = null; + } + } + private void internalFlush(Deque> swapList) throws SQLException { if (jdbcSinkConfig.isUseTransactions()) { connection.commit(); @@ -404,6 +473,10 @@ private static boolean isBatchItemFailed(int returnCode) { */ private void fatal(Exception e) { if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) { + log.error("Fatal error in JDBC sink, signaling framework for shutdown", e); + if (scheduledFlushTask != null) { + scheduledFlushTask.cancel(false); + } sinkContext.fatal(e); } } diff --git a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java index 854d683813..acd55f9de6 100644 --- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java +++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable { ) private NullValueAction nullValueAction = NullValueAction.FAIL; + @FieldDoc( + required = false, + defaultValue = "0", + help = "Maximum number of records to buffer in the internal queue before applying back-pressure. " + + "When the queue is full, incoming records will be failed (negatively acknowledged) so that " + + "the Pulsar consumer can redeliver them later. This prevents out-of-memory errors when the " + + "database connection is slow or broken. " + + "A value of 0 (default) auto-sizes to batchSize * 10. " + + "A value of -1 disables the limit (unbounded, legacy behavior)." + ) + private int maxQueueSize = 0; + public enum InsertMode { INSERT, UPSERT, diff --git a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index 7cab33df30..c516fc20ae 100644 --- a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -932,6 +932,151 @@ public void testFatalCalledOnFlushException() throws Exception { } } + /** + * Test that write() rejects records when the sink is in FAILED state. + * After fatal() is called, records should be failed immediately instead of queuing. + */ + @Test + public void testWriteRejectsRecordsAfterFatal() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 1); + + SinkContext mockSinkContext = mock(SinkContext.class); + SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink(); + try { + sinkWithContext.open(conf, mockSinkContext); + + // Force a fatal error by replacing insertStatement with a mock that throws + PreparedStatement mockStatement = mock(PreparedStatement.class); + doThrow(new SQLException("Connection lost")).when(mockStatement).execute(); + FieldUtils.writeField(sinkWithContext, "insertStatement", mockStatement, true); + + // Write first record to trigger fatal + Foo obj1 = new Foo("f1", "f2", 10); + CompletableFuture future1 = new CompletableFuture<>(); + sinkWithContext.write(createMockFooRecord(obj1, Maps.newHashMap(), future1)); + + // Wait for fatal to be called + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> + verify(mockSinkContext).fatal(any(Throwable.class))); + + // Now write another record — it should be failed immediately + Foo obj2 = new Foo("f3", "f4", 11); + CompletableFuture future2 = new CompletableFuture<>(); + sinkWithContext.write(createMockFooRecord(obj2, Maps.newHashMap(), future2)); + + // Record should be failed (false), not acked (true) + Assert.assertFalse(future2.get(1, TimeUnit.SECONDS)); + } finally { + sinkWithContext.close(); + } + } + + /** + * Test that the bounded queue applies back-pressure by failing records when full. + */ + @Test + public void testBoundedQueueBackPressure() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + // Large batch size so flush is not triggered by writes + conf.put("batchSize", 1000); + // No time-based flush + conf.put("timeoutMs", 0); + // Small queue to test back-pressure + conf.put("maxQueueSize", 5); + + SqliteJdbcAutoSchemaSink boundedSink = new SqliteJdbcAutoSchemaSink(); + try { + boundedSink.open(conf, null); + + // Fill the queue to capacity + List> futures = new java.util.ArrayList<>(); + for (int i = 0; i < 5; i++) { + CompletableFuture f = new CompletableFuture<>(); + futures.add(f); + boundedSink.write(createMockFooRecord(new Foo("f1", "f2", i + 100), Maps.newHashMap(), f)); + } + + // Next write should be rejected due to queue full + CompletableFuture overflowFuture = new CompletableFuture<>(); + boundedSink.write(createMockFooRecord(new Foo("overflow", "val", 999), Maps.newHashMap(), overflowFuture)); + + // The overflow record should be failed immediately + Assert.assertFalse(overflowFuture.get(1, TimeUnit.SECONDS)); + } finally { + boundedSink.close(); + } + } + + /** + * Test that ensureConnection() reconnects when the existing connection becomes invalid. + * Simulates a database going away and coming back by closing the connection mid-flight, + * then verifying the sink recovers and processes the next batch successfully. + */ + @Test + public void testReconnectOnBrokenConnection() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 1); + + SqliteJdbcAutoSchemaSink reconnectSink = new SqliteJdbcAutoSchemaSink(); + try { + reconnectSink.open(conf, null); + + // First write should succeed — connection is healthy + Foo obj1 = new Foo("reconnect1", "val1", 50); + CompletableFuture future1 = new CompletableFuture<>(); + reconnectSink.write(createMockFooRecord(obj1, Maps.newHashMap(), future1)); + Assert.assertTrue(future1.get(5, TimeUnit.SECONDS)); + + // Verify record was persisted + int count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE field3=50", (rs) -> { + Assert.assertEquals(rs.getString(1), "reconnect1"); + }); + Assert.assertEquals(count, 1); + + // Now break the connection by closing it behind the sink's back + reconnectSink.getConnection().close(); + + // Next write should trigger ensureConnection() → reconnect → succeed + Foo obj2 = new Foo("reconnect2", "val2", 51); + CompletableFuture future2 = new CompletableFuture<>(); + reconnectSink.write(createMockFooRecord(obj2, Maps.newHashMap(), future2)); + Assert.assertTrue(future2.get(5, TimeUnit.SECONDS)); + + // Verify second record was also persisted after reconnect + count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE field3=51", (rs) -> { + Assert.assertEquals(rs.getString(1), "reconnect2"); + }); + Assert.assertEquals(count, 1); + } finally { + reconnectSink.close(); + } + } + @SuppressWarnings("unchecked") private Record createMockFooRecord(Foo record, Map actionProperties, CompletableFuture future) { From c141a2d51380a938700f13187814a2ec458bc801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Harangoz=C3=B3?= Date: Fri, 3 Apr 2026 08:19:02 +0200 Subject: [PATCH 2/2] [fix][connector] Address review: thread-safety, default, validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move record.fail() outside synchronized(incomingList) to avoid holding the lock during framework callbacks - Move incomingList.size() check inside synchronized in flush() to fix data race on non-thread-safe LinkedList - Change maxQueueSize default from 0 (auto-bounded) to -1 (unbounded) to preserve backwards-compatible legacy behavior; users opt-in to bounded queue by setting maxQueueSize=0 (auto) or a positive value - Add overflow-safe auto-sizing (long arithmetic capped at MAX_VALUE) - Validate maxQueueSize in JdbcSinkConfig.validate() — reject < -1 - Add test for invalid maxQueueSize rejection --- .../pulsar/io/jdbc/JdbcAbstractSink.java | 74 ++++++++++++------- .../apache/pulsar/io/jdbc/JdbcSinkConfig.java | 12 ++- .../pulsar/io/jdbc/SqliteJdbcSinkTest.java | 19 +++++ 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index e6eda75bfe..3f2477a384 100644 --- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -121,10 +121,11 @@ public void open(Map config, SinkContext sinkContext) throws Exc batchSize = jdbcSinkConfig.getBatchSize(); maxQueueSize = jdbcSinkConfig.getMaxQueueSize(); if (maxQueueSize == 0) { - // Auto-size: default to 10x batch size - maxQueueSize = batchSize > 0 ? batchSize * 10 : 10000; + // Auto-size: default to 10x batch size (overflow-safe) + long calculated = batchSize > 0 ? (long) batchSize * 10L : 10000L; + maxQueueSize = calculated > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) calculated; } - // maxQueueSize < 0 (e.g. -1) means unbounded (legacy behavior) + // maxQueueSize < 0 (i.e. -1) means unbounded (legacy behavior) log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? maxQueueSize : "unbounded"); incomingList = new LinkedList<>(); isFlushing = new AtomicBoolean(false); @@ -210,19 +211,30 @@ public void write(Record record) throws Exception { record.fail(); return; } - int number; + int number = 0; + boolean shouldFail = false; + boolean shouldLogQueueFull = false; + int queueSizeSnapshot = 0; synchronized (incomingList) { if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) { if (!queueFullLogged) { - log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure", - incomingList.size(), maxQueueSize); queueFullLogged = true; + shouldLogQueueFull = true; + queueSizeSnapshot = incomingList.size(); } - record.fail(); - return; + shouldFail = true; + } else { + incomingList.add(record); + number = incomingList.size(); + } + } + if (shouldFail) { + if (shouldLogQueueFull) { + log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure", + queueSizeSnapshot, maxQueueSize); } - incomingList.add(record); - number = incomingList.size(); + record.fail(); + return; } if (batchSize > 0 && number >= batchSize) { if (log.isDebugEnabled()) { @@ -273,22 +285,33 @@ private void flush() { if (state.get() == State.CLOSED) { return; } - if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) { - boolean needAnotherRound; - final Deque> swapList = new LinkedList<>(); - - synchronized (incomingList) { - if (log.isDebugEnabled()) { - log.debug("Starting flush, queue size: {}", incomingList.size()); + if (!isFlushing.compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + synchronized (incomingList) { + log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); } - final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) : - incomingList.size(); + } + return; + } + boolean needAnotherRound; + final Deque> swapList = new LinkedList<>(); - for (int i = 0; i < actualBatchSize; i++) { - swapList.add(incomingList.removeFirst()); - } - needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize; + synchronized (incomingList) { + if (incomingList.isEmpty()) { + isFlushing.set(false); + return; + } + if (log.isDebugEnabled()) { + log.debug("Starting flush, queue size: {}", incomingList.size()); + } + final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) : + incomingList.size(); + + for (int i = 0; i < actualBatchSize; i++) { + swapList.add(incomingList.removeFirst()); } + needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize; + } long start = System.nanoTime(); int count = 0; @@ -366,11 +389,6 @@ private void flush() { if (needAnotherRound) { flush(); } - } else { - if (log.isDebugEnabled()) { - log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); - } - } } private void ensureConnection() throws Exception { diff --git a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java index acd55f9de6..a59067b370 100644 --- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java +++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -132,15 +132,15 @@ public class JdbcSinkConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "0", + defaultValue = "-1", help = "Maximum number of records to buffer in the internal queue before applying back-pressure. " + "When the queue is full, incoming records will be failed (negatively acknowledged) so that " + "the Pulsar consumer can redeliver them later. This prevents out-of-memory errors when the " + "database connection is slow or broken. " - + "A value of 0 (default) auto-sizes to batchSize * 10. " - + "A value of -1 disables the limit (unbounded, legacy behavior)." + + "A value of 0 auto-sizes to batchSize * 10. " + + "A value of -1 (default) disables the limit (unbounded, legacy behavior)." ) - private int maxQueueSize = 0; + private int maxQueueSize = -1; public enum InsertMode { INSERT, @@ -167,6 +167,10 @@ public void validate() { if (timeoutMs <= 0 && batchSize <= 0) { throw new IllegalArgumentException("timeoutMs or batchSize must be set to a positive value."); } + if (maxQueueSize < -1) { + throw new IllegalArgumentException("maxQueueSize must be -1 (unbounded), 0 (auto-size), " + + "or a positive value."); + } } } diff --git a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index c516fc20ae..c76436a138 100644 --- a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -1024,6 +1024,25 @@ public void testBoundedQueueBackPressure() throws Exception { } } + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = ".*maxQueueSize.*") + public void testInvalidMaxQueueSizeRejected() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", sqliteUtils.sqliteUri()); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 200); + conf.put("timeoutMs", 500); + conf.put("maxQueueSize", -2); + + SqliteJdbcAutoSchemaSink sink = new SqliteJdbcAutoSchemaSink(); + sink.open(conf, null); + } + /** * Test that ensureConnection() reconnects when the existing connection becomes invalid. * Simulates a database going away and coming back by closing the connection mid-flight,