Skip to content

[improve][io] JDBC sink: replace synchronized queue with LinkedBlockingDeque for proper back-pressure#17

Open
harangozop wants to merge 1 commit intoapache:masterfrom
harangozop:fix/jdbc-sink-blocking-backpressure
Open

[improve][io] JDBC sink: replace synchronized queue with LinkedBlockingDeque for proper back-pressure#17
harangozop wants to merge 1 commit intoapache:masterfrom
harangozop:fix/jdbc-sink-blocking-backpressure

Conversation

@harangozop
Copy link
Copy Markdown
Contributor

@harangozop harangozop commented Apr 8, 2026

Motivation

Follow-up to #9. The record.fail() back-pressure introduced in #9 causes a nack/redeliver storm under sustained load — the consumer delivers messages, the sink immediately fails them, they get redelivered, repeating endlessly. This wastes CP time.

Modifications

Replace LinkedList + synchronized with LinkedBlockingDeque:

  • write(): offer(record, 1, TimeUnit.SECONDS) blocks the Pulsar IO thread when queue is full — this IS the back-pressure (stops consumption). At most 1 nack/second on timeout vs. thousands/second before.
  • flush(): drainTo(swapList, batchSize) — non-blocking atomic drain, no synchronized blocks. Automatically wakes blocked offer() calls by making space.
  • Recursive → iterative: flush() uses a while loop instead of recursive self-calls.
  • isFlushing in finally: prevents stuck flag after exceptions.
  • close() drains queue: fails remaining records for clean shutdown.

Follows the pattern already used by Aerospike (LinkedBlockingDeque), HDFS, Kinesis, and DynamoDB (LinkedBlockingQueue) connectors in this repo.

Verifying this change

  • Existing tests pass (queue semantics preserved)
  • testBoundedQueueBackPressure may need timeout adjustment (6th write now blocks 1s instead of failing instantly)

Fixes #16

…ngDeque for proper back-pressure

Replaces manual `synchronized(incomingList)` blocks with JDK's
`LinkedBlockingDeque` for proper blocking back-pressure instead of
the nack/redeliver storm caused by `record.fail()`.

Key changes:
- Use `offer(record, 1s timeout)` in write() to block the Pulsar IO
  thread when queue is full, stopping message consumption naturally
- Use `drainTo()` in flush() for non-blocking atomic batch drain
- Replace recursive flush() with iterative while loop
- Move isFlushing.set(false) to finally block
- Drain remaining records in close() for clean shutdown

Follows the established pattern used by Aerospike, HDFS, Kinesis,
and DynamoDB connectors in this repository.

Fixes apache#16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[improve][io] JDBC sink: replace synchronized queue with LinkedBlockingDeque for proper back-pressure

1 participant