Skip to content

[improve][io] JDBC sink: replace synchronized queue with LinkedBlockingDeque for proper back-pressure #16

@harangozop

Description

@harangozop

Motivation

This is a follow-up to #9 which introduced bounded queue back-pressure via record.fail() when the queue is full. While that prevents OOM, it causes a nack/redeliver storm under sustained load: the consumer continuously delivers messages, the sink immediately fails them, they get redelivered, and the cycle repeats — wasting CPU time on rejecting/redelivering messages instead of actual DB writes.

Changes

1. Replace LinkedList + synchronized with LinkedBlockingDeque

Replace the manual synchronized(incomingList) blocks with JDK's LinkedBlockingDeque, which handles all synchronization internally. This follows the established pattern already used by other connectors in this repo (Aerospike sink uses LinkedBlockingDeque, HDFS/Kinesis/DynamoDB sinks use LinkedBlockingQueue).

2. Blocking back-pressure in write()

Instead of record.fail() (which triggers immediate nack/redeliver), use incomingList.offer(record, 1, TimeUnit.SECONDS) which blocks the Pulsar IO thread until space is available. Blocking the IO thread is the correct back-pressure signal — it stops the consumer from fetching more messages for this sink's subscription.

  • If space becomes available within 1 second (flush drains the queue), the record is accepted — zero nacks.
  • If the timeout expires (flush thread stuck or very slow), the record is failed — at most 1 nack/second vs. thousands/second with the current approach.

3. Replace recursive flush() with iterative loop

The current flush() calls itself recursively when needAnotherRound is true. Under sustained load with large queues, this can build up deep stack frames. Replace with a while loop.

4. Move isFlushing.set(false) to finally block

Prevents the isFlushing flag from getting stuck if an exception occurs before the flag is cleared, which would prevent all future flushes.

5. Drain remaining records in close()

On shutdown, drain any records still in the queue and fail them, ensuring clean shutdown without message loss.

Compatibility

  • maxQueueSize config is fully preserved: -1 (unbounded), 0 (auto), positive (bounded)
  • LinkedBlockingDeque with no capacity argument = unbounded (same as legacy LinkedList)
  • No new configuration parameters required

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions