diff --git a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 73ba6b712f..3f2477a384 100644 --- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -33,6 +33,7 @@ import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -78,8 +79,12 @@ private enum State { private Deque> incomingList; private AtomicBoolean isFlushing; private int batchSize; + private int maxQueueSize; private ScheduledExecutorService flushExecutor; + private ScheduledFuture scheduledFlushTask; private SinkContext sinkContext; + private Properties connectionProperties; + private volatile boolean queueFullLogged = false; private final AtomicReference state = new AtomicReference<>(State.OPEN); @Override @@ -93,17 +98,17 @@ public void open(Map config, SinkContext sinkContext) throws Exc throw new IllegalArgumentException("Required jdbc Url not set."); } - Properties properties = new Properties(); + connectionProperties = new Properties(); String username = jdbcSinkConfig.getUserName(); String password = jdbcSinkConfig.getPassword(); if (username != null) { - properties.setProperty("user", username); + connectionProperties.setProperty("user", username); } if (password != null) { - properties.setProperty("password", password); + connectionProperties.setProperty("password", password); } - connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties); + connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties); connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions()); log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); @@ -114,12 +119,21 @@ public void open(Map config, SinkContext sinkContext) throws Exc int timeoutMs = jdbcSinkConfig.getTimeoutMs(); batchSize = jdbcSinkConfig.getBatchSize(); + maxQueueSize = jdbcSinkConfig.getMaxQueueSize(); + if (maxQueueSize == 0) { + // Auto-size: default to 10x batch size (overflow-safe) + long calculated = batchSize > 0 ? (long) batchSize * 10L : 10000L; + maxQueueSize = calculated > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) calculated; + } + // maxQueueSize < 0 (i.e. -1) means unbounded (legacy behavior) + log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? maxQueueSize : "unbounded"); incomingList = new LinkedList<>(); isFlushing = new AtomicBoolean(false); flushExecutor = Executors.newScheduledThreadPool(1); if (timeoutMs > 0) { - flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); + scheduledFlushTask = flushExecutor.scheduleAtFixedRate( + this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); } } @@ -158,6 +172,10 @@ private static List getListFromConfig(String jdbcSinkConfig) { @Override public void close() throws Exception { state.set(State.CLOSED); + if (scheduledFlushTask != null) { + scheduledFlushTask.cancel(false); + scheduledFlushTask = null; + } if (flushExecutor != null) { int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2; flushExecutor.shutdown(); @@ -188,10 +206,35 @@ public void close() throws Exception { @Override public void write(Record record) throws Exception { - int number; + if (state.get() != State.OPEN) { + log.warn("Sink is not in OPEN state (current: {}), failing record", state.get()); + record.fail(); + return; + } + int number = 0; + boolean shouldFail = false; + boolean shouldLogQueueFull = false; + int queueSizeSnapshot = 0; synchronized (incomingList) { - incomingList.add(record); - number = incomingList.size(); + if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) { + if (!queueFullLogged) { + queueFullLogged = true; + shouldLogQueueFull = true; + queueSizeSnapshot = incomingList.size(); + } + shouldFail = true; + } else { + incomingList.add(record); + number = incomingList.size(); + } + } + if (shouldFail) { + if (shouldLogQueueFull) { + log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure", + queueSizeSnapshot, maxQueueSize); + } + record.fail(); + return; } if (batchSize > 0 && number >= batchSize) { if (log.isDebugEnabled()) { @@ -239,26 +282,42 @@ protected enum MutationType { private void flush() { - if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) { - boolean needAnotherRound; - final Deque> swapList = new LinkedList<>(); - - synchronized (incomingList) { - if (log.isDebugEnabled()) { - log.debug("Starting flush, queue size: {}", incomingList.size()); + if (state.get() == State.CLOSED) { + return; + } + if (!isFlushing.compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + synchronized (incomingList) { + log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); } - final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) : - incomingList.size(); + } + return; + } + boolean needAnotherRound; + final Deque> swapList = new LinkedList<>(); - for (int i = 0; i < actualBatchSize; i++) { - swapList.add(incomingList.removeFirst()); - } - needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize; + synchronized (incomingList) { + if (incomingList.isEmpty()) { + isFlushing.set(false); + return; + } + if (log.isDebugEnabled()) { + log.debug("Starting flush, queue size: {}", incomingList.size()); + } + final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) : + incomingList.size(); + + for (int i = 0; i < actualBatchSize; i++) { + swapList.add(incomingList.removeFirst()); } + needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize; + } long start = System.nanoTime(); int count = 0; try { + ensureConnection(); + PreparedStatement currentBatch = null; final List mutations = swapList .stream() @@ -308,6 +367,7 @@ private void flush() { } else { internalFlush(swapList); } + queueFullLogged = false; } catch (Exception e) { log.error("Got exception {} after {} ms, failing {} messages", e.getMessage(), @@ -329,10 +389,37 @@ private void flush() { if (needAnotherRound) { flush(); } - } else { - if (log.isDebugEnabled()) { - log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); + } + + private void ensureConnection() throws Exception { + try { + if (connection != null && connection.isValid(2)) { + return; + } + } catch (SQLException e) { + log.warn("Connection validation failed: {}", e.getMessage()); + } + + log.info("JDBC connection is invalid, attempting to reconnect to: {}", jdbcUrl); + closeConnectionQuietly(); + + connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties); + connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions()); + + tableId = JdbcUtils.getTableId(connection, tableName); + initStatement(); + + log.info("Successfully reconnected to: {}", jdbcUrl); + } + + private void closeConnectionQuietly() { + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + log.debug("Error closing stale connection", e); } + connection = null; } } @@ -404,6 +491,10 @@ private static boolean isBatchItemFailed(int returnCode) { */ private void fatal(Exception e) { if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) { + log.error("Fatal error in JDBC sink, signaling framework for shutdown", e); + if (scheduledFlushTask != null) { + scheduledFlushTask.cancel(false); + } sinkContext.fatal(e); } } diff --git a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java index 854d683813..a59067b370 100644 --- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java +++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable { ) private NullValueAction nullValueAction = NullValueAction.FAIL; + @FieldDoc( + required = false, + defaultValue = "-1", + help = "Maximum number of records to buffer in the internal queue before applying back-pressure. " + + "When the queue is full, incoming records will be failed (negatively acknowledged) so that " + + "the Pulsar consumer can redeliver them later. This prevents out-of-memory errors when the " + + "database connection is slow or broken. " + + "A value of 0 auto-sizes to batchSize * 10. " + + "A value of -1 (default) disables the limit (unbounded, legacy behavior)." + ) + private int maxQueueSize = -1; + public enum InsertMode { INSERT, UPSERT, @@ -155,6 +167,10 @@ public void validate() { if (timeoutMs <= 0 && batchSize <= 0) { throw new IllegalArgumentException("timeoutMs or batchSize must be set to a positive value."); } + if (maxQueueSize < -1) { + throw new IllegalArgumentException("maxQueueSize must be -1 (unbounded), 0 (auto-size), " + + "or a positive value."); + } } } diff --git a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index 7cab33df30..c76436a138 100644 --- a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -932,6 +932,170 @@ public void testFatalCalledOnFlushException() throws Exception { } } + /** + * Test that write() rejects records when the sink is in FAILED state. + * After fatal() is called, records should be failed immediately instead of queuing. + */ + @Test + public void testWriteRejectsRecordsAfterFatal() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 1); + + SinkContext mockSinkContext = mock(SinkContext.class); + SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink(); + try { + sinkWithContext.open(conf, mockSinkContext); + + // Force a fatal error by replacing insertStatement with a mock that throws + PreparedStatement mockStatement = mock(PreparedStatement.class); + doThrow(new SQLException("Connection lost")).when(mockStatement).execute(); + FieldUtils.writeField(sinkWithContext, "insertStatement", mockStatement, true); + + // Write first record to trigger fatal + Foo obj1 = new Foo("f1", "f2", 10); + CompletableFuture future1 = new CompletableFuture<>(); + sinkWithContext.write(createMockFooRecord(obj1, Maps.newHashMap(), future1)); + + // Wait for fatal to be called + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> + verify(mockSinkContext).fatal(any(Throwable.class))); + + // Now write another record — it should be failed immediately + Foo obj2 = new Foo("f3", "f4", 11); + CompletableFuture future2 = new CompletableFuture<>(); + sinkWithContext.write(createMockFooRecord(obj2, Maps.newHashMap(), future2)); + + // Record should be failed (false), not acked (true) + Assert.assertFalse(future2.get(1, TimeUnit.SECONDS)); + } finally { + sinkWithContext.close(); + } + } + + /** + * Test that the bounded queue applies back-pressure by failing records when full. + */ + @Test + public void testBoundedQueueBackPressure() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + // Large batch size so flush is not triggered by writes + conf.put("batchSize", 1000); + // No time-based flush + conf.put("timeoutMs", 0); + // Small queue to test back-pressure + conf.put("maxQueueSize", 5); + + SqliteJdbcAutoSchemaSink boundedSink = new SqliteJdbcAutoSchemaSink(); + try { + boundedSink.open(conf, null); + + // Fill the queue to capacity + List> futures = new java.util.ArrayList<>(); + for (int i = 0; i < 5; i++) { + CompletableFuture f = new CompletableFuture<>(); + futures.add(f); + boundedSink.write(createMockFooRecord(new Foo("f1", "f2", i + 100), Maps.newHashMap(), f)); + } + + // Next write should be rejected due to queue full + CompletableFuture overflowFuture = new CompletableFuture<>(); + boundedSink.write(createMockFooRecord(new Foo("overflow", "val", 999), Maps.newHashMap(), overflowFuture)); + + // The overflow record should be failed immediately + Assert.assertFalse(overflowFuture.get(1, TimeUnit.SECONDS)); + } finally { + boundedSink.close(); + } + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = ".*maxQueueSize.*") + public void testInvalidMaxQueueSizeRejected() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", sqliteUtils.sqliteUri()); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 200); + conf.put("timeoutMs", 500); + conf.put("maxQueueSize", -2); + + SqliteJdbcAutoSchemaSink sink = new SqliteJdbcAutoSchemaSink(); + sink.open(conf, null); + } + + /** + * Test that ensureConnection() reconnects when the existing connection becomes invalid. + * Simulates a database going away and coming back by closing the connection mid-flight, + * then verifying the sink recovers and processes the next batch successfully. + */ + @Test + public void testReconnectOnBrokenConnection() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 1); + + SqliteJdbcAutoSchemaSink reconnectSink = new SqliteJdbcAutoSchemaSink(); + try { + reconnectSink.open(conf, null); + + // First write should succeed — connection is healthy + Foo obj1 = new Foo("reconnect1", "val1", 50); + CompletableFuture future1 = new CompletableFuture<>(); + reconnectSink.write(createMockFooRecord(obj1, Maps.newHashMap(), future1)); + Assert.assertTrue(future1.get(5, TimeUnit.SECONDS)); + + // Verify record was persisted + int count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE field3=50", (rs) -> { + Assert.assertEquals(rs.getString(1), "reconnect1"); + }); + Assert.assertEquals(count, 1); + + // Now break the connection by closing it behind the sink's back + reconnectSink.getConnection().close(); + + // Next write should trigger ensureConnection() → reconnect → succeed + Foo obj2 = new Foo("reconnect2", "val2", 51); + CompletableFuture future2 = new CompletableFuture<>(); + reconnectSink.write(createMockFooRecord(obj2, Maps.newHashMap(), future2)); + Assert.assertTrue(future2.get(5, TimeUnit.SECONDS)); + + // Verify second record was also persisted after reconnect + count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE field3=51", (rs) -> { + Assert.assertEquals(rs.getString(1), "reconnect2"); + }); + Assert.assertEquals(count, 1); + } finally { + reconnectSink.close(); + } + } + @SuppressWarnings("unchecked") private Record createMockFooRecord(Foo record, Map actionProperties, CompletableFuture future) {