Skip to content

[BUG] Silent consumer stall with Azure Service Bus + Spring Cloud Stream (LINK_CREDIT=0, IdleTimeout) #48929

@RafaelGomez1

Description

@RafaelGomez1

Describe the bug
We are experiencing a silent consumer stall when using Azure Service Bus with Spring Boot and Spring Cloud Stream. Under certain conditions, a consumer bound to a Service Bus topic subscription stops consuming messages without any application-level errors, exceptions, or restarts.
Eventually, Azure Service Bus emits a terminal AMQP error indicating the receiver link was force detached due to idle timeout with LINK_CREDIT = 0. After this happens, the consumer does not recover or reconnect automatically, and consumption only resumes after restarting the application/pod.

This issue started appearing after:

  • Adding a second consumer to the same application.

Things we've tried before openning the issue

  • Message Completion Experiments
    We tested both supported completion approaches:
    ServiceBusReceivedMessageContext
    context.complete()
    context.abandon()
    Checkpointer (recommended approach)
    checkpointer.success().block()
    checkpointer.failure().block()

✅ Switching to Checkpointer improved consistency
❌ It did not eliminate the silent stall

Prefetch / Concurrency Experiments
We tested multiple permutations:

High prefetch vs low prefetch
prefetch-count == max-concurrent-calls
Very low prefetch (1)
Larger lock renewal duration (max-auto-lock-renew-duration)

✅ These changes influenced lock renewal behavior
❌ They did not prevent the silent stop when the issue manifests

QUESTIONS

  1. Is this a known failure mode when using blocking work inside Spring Cloud Stream functional consumers?
  2. Is the receiver expected to recover automatically after a LINK_CREDIT=0 / idle detach scenario?
  3. Are there recommended guardrails or best practices when:
  • multiple consumers inside one app
  • Azure Service Bus binder (async client underneath)?

Is there any supported way to isolate execution between consumers within the same application?

  1. We also saw some thread sharing between both consumers, from what we saw online the binder user a thread shared pool per binder so it should not be sharing threads between consumers right? Is this something we may've misunderstood? Is it due to configuration?

Exception or Stack Trace

{
  "az.sdk.message": "Receiver emitted terminal error.",
  "connectionId": "random",
  "entity.guid": "random",
  "entity.guids": "random",
  "entity.name": "entity-name",
  "entityPath": "topic-and-subscription-name",
  "exception": "The link 'G31:85797156:topic-and-subscription-name' is force detached. Code: consumer(link1798640(85797157_G31)). Details: AmqpMessageConsumer.IdleTimerExpired: Idle timeout: 00:10:00. TrackingId:2bd053db000271a9001b71f069e76bc6_G31_B24, SystemTracker:gi::G31:85797156:topic-and-subscription-name, bi::link1798640(85797157_G31), Timestamp:2026-04-21T12:36:38, errorContext[NAMESPACE: servicebus-namespace. ERROR CONTEXT: N/A, PATH: topic-and-subscription-name, REFERENCE_ID: topic-and-subscription-name, LINK_CREDIT: 0]",
  "hostname": "hostname",
  "instrumentation": "logback-classic-1.5.20",
  "level": "WARN",
  "linkName": "topic-and-subscription-link-name",
  "logger.fqcn": "ch.qos.logback.classic.Logger",
  "logger.name": "com.azure.core.amqp.implementation.MessageFlux",
  "messageFlux": "mf_80204d_1776774085512",
  "newrelic.source": "logs.APM",
  "thread.id": 83,
  "thread.name": "receiverPump-2",
  "timestamp": 1776774998097
}

To Reproduce
Steps to reproduce the behavior:

  1. Create both consumers with the architecture flow shown in additional context
  2. Start seeding traffic to the first consumer, eventually the second one should stop consuming
  3. 15 minutes later, idle timeout is logged with terminal error
  4. Consumption of that topic is stopped and never regained unless the container is restarted

Code Snippet
Failing Consumer

The code snippets have been simplified in order to avoid sharing sensitive details.

package com.example.consumer;

import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component("notification-consumer")
public class NotificationConsumer extends BaseConsumer {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(NotificationConsumer.class);

    private static final String EVENT_TYPE_A = "EventTypeA";
    private static final String EVENT_TYPE_B = "EventTypeB";

    private final ObjectMapper objectMapper;
    private final BlockingHttpClient httpClient;

    public NotificationConsumer(
        ObjectMapper objectMapper,
        BlockingHttpClient httpClient,
        Tracer tracer,
        Propagator propagator
    ) {
        super(tracer, propagator);
        this.objectMapper = objectMapper;
        this.httpClient = httpClient;
    }

    @Override
    protected void process(Message<String> message) {

        LOGGER.info("[NotificationConsumer] Event received");

        Checkpointer checkpointer = extractCheckpointer(message);

        try {
            processEvent(message);
            completeMessage(checkpointer);
            LOGGER.info("[NotificationConsumer] Event processed successfully");

        } catch (JsonProcessingException | InvalidEventException e) {
            handleNonRetriableError(e, checkpointer, message);

        } catch (Exception e) {
            handleRetriableError(message, checkpointer, e);
        }
    }

    private Checkpointer extractCheckpointer(Message<String> message) {
        return message
            .getHeaders()
            .get(AzureHeaders.CHECKPOINTER, Checkpointer.class);
    }

    private void processEvent(Message<String> message)
        throws JsonProcessingException {

        String eventType = extractEventType(message);
        validateEventType(eventType, message);

        switch (eventType) {
            case EVENT_TYPE_A -> processTypeA(message);
            case EVENT_TYPE_B -> processTypeB(message);
            default -> { }
        }
    }

    private void processTypeA(Message<String> message)
        throws JsonProcessingException {

        DomainEventA event =
            objectMapper.readValue(message.getPayload(), DomainEventA.class);

        // Synchronous / blocking external call
        httpClient.send(event);
    }

    private void processTypeB(Message<String> message)
        throws JsonProcessingException {

        DomainEventB event =
            objectMapper.readValue(message.getPayload(), DomainEventB.class);

        // Synchronous / blocking external call
        httpClient.send(event);
    }

    private void validateEventType(String eventType, Message<String> message) {
        Optional.ofNullable(eventType)
            .orElseThrow(() ->
                new InvalidEventException(
                    "Event type is missing for message: " + message
                )
            );
    }

    private String extractEventType(Message<String> message) {
        return (String) message.getHeaders().get("eventType");
    }

    private void completeMessage(Checkpointer checkpointer) {
        checkpointer
            .success()
            .doOnSuccess(
                s -> LOGGER.info(
                    "[NotificationConsumer] Message checkpointed successfully"
                )
            )
            .doOnError(
                e -> LOGGER.error(
                    "[NotificationConsumer] Failed to checkpoint message",
                    e
                )
            )
            .block(Duration.ofSeconds(1));
    }

    private void handleNonRetriableError(
        Exception error,
        Checkpointer checkpointer,
        Message<String> message
    ) {
        LOGGER.error(
            "Non-retriable error, message will be dead-lettered",
            error
        );

        checkpointer
            .failure()
            .doOnSuccess(
                s -> LOGGER.info(
                    "[NotificationConsumer] Message checkpointed as failure"
                )
            )
            .doOnError(
                e -> LOGGER.error(
                    "[NotificationConsumer] Failed to checkpoint message as failure",
                    e
                )
            )
            .block(Duration.ofSeconds(1));
    }

    private void handleRetriableError(
        Message<String> message,
        Checkpointer checkpointer,
        Exception error
    ) {
        LOGGER.error(
            "Retriable error, message will be retried",
            error
        );

        checkpointer
            .failure()
            .doOnSuccess(
                s -> LOGGER.info(
                    "[NotificationConsumer] Message checkpointed as failure"
                )
            )
            .doOnError(
                e -> LOGGER.error(
                    "[NotificationConsumer] Failed to checkpoint message as failure",
                    e
                )
            )
            .block(Duration.ofSeconds(1));
    }
}

Base Consumer
Needed since the binder does not seems to propagate W3C Context out of the box and our current observability agent doesn't do it either for service bus, we need this to be able to have distributed tracing.

package com.example.consumer;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/**
 * Base consumer that wraps message processing with tracing logic.
 * Concrete consumers implement {@link #process(Message)}.
 */
public abstract class BaseConsumer implements Consumer<Message<String>> {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(BaseConsumer.class);

    private static final String TRACEPARENT_HEADER = "traceparent";
    private static final String TRACESTATE_HEADER = "tracestate";

    private final Tracer tracer;
    private final Propagator propagator;

    protected BaseConsumer(Tracer tracer, Propagator propagator) {
        this.tracer = tracer;
        this.propagator = propagator;
    }

    @Override
    public final void accept(Message<String> message) {
        try {
            Span span = extractSpan(message);
            try (var ignored = tracer.withSpan(span)) {
                process(message);
            } finally {
                span.end();
            }
        } catch (Exception e) {
            LOGGER.error("Unhandled exception in BaseConsumer", e);
            throw e;
        }
    }

    /**
     * Implemented by concrete consumers to process the message.
     */
    protected abstract void process(Message<String> message);

    private Span extractSpan(Message<String> message) {
        var headers = message.getHeaders();
        Object traceparent = headers.get(TRACEPARENT_HEADER);

        if (traceparent == null) {
            LOGGER.info(
                "[{}] No trace context found, starting new root span",
                getClass().getSimpleName()
            );
            return tracer
                .nextSpan()
                .name(getClass().getSimpleName())
                .tag("span.kind", "CONSUMER")
                .start();
        }

        Object tracestate = headers.get(TRACESTATE_HEADER);

        Map<String, String> carrier =
            Map.of(
                TRACEPARENT_HEADER,
                traceparent.toString(),
                TRACESTATE_HEADER,
                tracestate != null ? tracestate.toString() : ""
            );

        return propagator
            .extract(carrier, Map::get)
            .name(getClass().getSimpleName())
            .tag("span.kind", "CONSUMER")
            .start();
    }
}

Configuration

spring:
  cloud:
    stream:
      default:
        consumer:
          observation-enabled: false

      binders:
        servicebus-consumer:
          type: servicebus
          environment:
            spring.cloud.azure.servicebus.connection-string: ${CONN_1}

        servicebus-partner-notification:
          type: servicebus
          environment:
            spring.cloud.azure.servicebus.connection-string: ${CONN_2}

      bindings:
        sensor-data-consumer-in-0:
          binder: servicebus-consumer
          destination: sensor-data-topic
          group: sensor-data-subscription

        partner-notifications-in-0:
          binder: servicebus-partner-notification
          destination: partner-notifications-topic
          group: partner-notifications-subscription

      servicebus:
        bindings:
          sensor-data-consumer-in-0:
            consumer:
              auto-complete: false
              max-concurrent-calls: 6
              prefetch-count: 6

          partner-notifications-in-0:
            consumer:
              auto-complete: false
              max-concurrent-calls: 4
              prefetch-count: 4

Expected behavior
If a consumer stalls internally, we expect:

  • errors
  • binder health check to fail
  • retries or automatic receiver recovery

Instead, the application enters a permanent silent stall until restart.

Screenshots
All attempts to add screenshots failed

Setup (please complete the following information):
Setup

  • OS: Linux (containerized)
  • IDE: IntelliJ IDEA
  • Java version: Java 25
  • Runtime / Environment: Azure Container Apps

Frameworks:

  • Spring Boot 3.5.13
  • Spring Cloud Stream 4.3.1 (functional model)
  • Spring Cloud Azure Service Bus Binder 6.2.0
  • Spring Cloud Azure BOM 6.2.0

Messaging / Azure SDK:

  • com.azure.spring:spring-cloud-azure-stream-binder-servicebus:6.2.0
  • com.azure.spring:spring-cloud-azure-dependencies:6.2.0
  • Azure Service Bus client (transitive via the binder)

HTTP Client:

  • Apache HTTP Client 5 5.5.1 (synchronous / blocking)

Additional context
Architecture Overview

Service Bus Topic
        |
     (consume)
        v
Foo Service Consumer A
        |
     (publish)
        v
Service Bus Topic
        |
     (consume)
        v
Foo Service Consumer B
        |
     (HTTP call)
        v
3rd‑Party System

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

Metadata

Metadata

Assignees

Labels

azure-springAll azure-spring related issuesazure-spring-servicebusSpring service bus related issues.customer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK teamquestionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

Type

No fields configured for Bug.

Projects

Status

Todo

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions