Skip to content

Add RxJava 3 instrumentation#11849

Merged
gh-worker-dd-mergequeue-cf854d[bot] merged 29 commits into
masterfrom
vzakharov/rxjava3
Jul 3, 2026
Merged

Add RxJava 3 instrumentation#11849
gh-worker-dd-mergequeue-cf854d[bot] merged 29 commits into
masterfrom
vzakharov/rxjava3

Conversation

@ValentinZakharov

Copy link
Copy Markdown
Contributor

What Does This Do

We already trace RxJava 1 and 2 but not 3. RxJava 3 moved its types to the io.reactivex.rxjava3.core.* namespace, so the existing rxjava-2.0 instrumentation never matches it. This adds a new rxjava-3.0 module that brings RxJava 3 to parity.

What's in it:

  • New dd-java-agent/instrumentation/rxjava/rxjava-3.0 module. It ports the rxjava-2.0 logic to the RxJava 3 namespace: context capture on the five reactive types (Flowable, Observable, Single, Maybe, Completable) and re-attachment around subscriber callbacks, plus the async result extension that finishes @WithSpan / @Trace spans when the stream completes, errors, or is cancelled.
  • Muzzle has a pass for rxjava3 [3.0.0,) and a fail block that asserts the advice can never match the rxjava2 artifact, so the two versions stay isolated. The module also pulls in rxjava-2.0 as a test runtime dependency to confirm both instrumenters coexist.
  • Registered in the GraalVM native image build time list next to the rxjava2 entry.
  • Tests are JUnit 5 (44 cases): subscription propagation, the core propagation suite, the @WithSpan result extension across all five types, and a Java 8 interop check (fromCompletionStage, fromStream, fromOptional). latestDepTest runs the same suite against the newest published rxjava3.

One thing reviewers should look at closely: this also touches the shared java-concurrent module. While testing delayed chains I found that RxJava 3 added io.reactivex.rxjava3.internal.schedulers.AbstractDirectTask, whose static initializer builds two sentinel FutureTask instances. When that initializer first runs under an active trace (the first delay or timeout hop inside a span), the executor instrumentation captures a continuation on those static singletons that never gets cancelled, so the trace stays pending and is never reported. RxJava 2 has no equivalent class, which is why the byte for byte equivalent rxjava-2.0 code never hit this(correction: 2.0.9+ seem to have it). The fix disables async propagation while that type initializer runs, following the same pattern already used for Reactor's SchedulerTask and WorkerTask. The matcher is an exact class name, so it stays inert unless RxJava 3 is on the classpath.

The Java 8 interop check found no propagation gaps once that fix was in place.

Motivation

Additional Notes

Contributor Checklist

  • Format the title according to the contribution guidelines
  • Assign the type: and (comp: or inst:) labels in addition to any other useful labels
  • Avoid using close, fix, or any linking keywords when referencing an issue
    Use solves instead, and assign the PR milestone to the issue
  • Update the CODEOWNERS file on source file addition, migration, or deletion
  • Update public documentation with any new configuration flags or behaviors
  • Add your completed PR to the merge queue by commenting /merge. You can also:
    • Customize the commit message associated with the merge with /merge --commit-message "..."
    • Remove your PR from the merge queue with /merge -c
    • Skip all merge queue checks with /merge -f --reason "reason"; please use this judiciously, as some checks do not run at the PR-level (note: the PR still needs to be mergeable, this will only skip the pre-merge build)
    • Get more information in this doc

Jira ticket: [PROJ-IDENT]

@ValentinZakharov ValentinZakharov self-assigned this Jul 2, 2026
@ValentinZakharov ValentinZakharov requested review from a team as code owners July 2, 2026 22:27
@ValentinZakharov ValentinZakharov requested review from PerfectSlayer and removed request for a team July 2, 2026 22:27
@ValentinZakharov ValentinZakharov added the type: enhancement Enhancements and improvements label Jul 2, 2026
@ValentinZakharov ValentinZakharov requested review from ygree and removed request for a team July 2, 2026 22:27
@ValentinZakharov ValentinZakharov added inst: others All other instrumentations tag: ai generated Largely based on code generated by an AI or LLM labels Jul 2, 2026
@dd-octo-sts

dd-octo-sts Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

🟡 Java Benchmark SLOs — Performance SLO warning (near threshold)

Suite Status
Startup 🟡 warning

SLO thresholds are defined here based on automatically generated metrics. A warning is raised when results are within 5% of the threshold.

PR vs. master results
Scenario Candidate master Δ (95% CI of mean)
startup:insecure-bank:iast:Agent 13.95 s 13.95 s [-0.9%; +0.9%] (no difference)
startup:insecure-bank:tracing:Agent 12.99 s 13.00 s [-1.1%; +0.8%] (no difference)
startup:petclinic:appsec:Agent 17.01 s 16.77 s [+0.2%; +2.7%] (maybe worse)
startup:petclinic:iast:Agent 16.95 s 16.90 s [-0.6%; +1.1%] (no difference)
startup:petclinic:profiling:Agent 16.71 s 16.86 s [-1.8%; +0.1%] (no difference)
startup:petclinic:sca:Agent 16.96 s 16.70 s [+0.8%; +2.3%] (maybe worse)
startup:petclinic:tracing:Agent 16.03 s 15.63 s [-1.8%; +6.9%] (no difference)

Commit: a6cb915e · CI Pipeline · Benchmarking Platform UI


Load and DaCapo benchmarks can be triggered manually in the GitLab pipeline. Results will appear in the Benchmarking Platform UI after completion.

@PerfectSlayer PerfectSlayer left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed structure and tests. Left a bunch of comments.

Comment on lines +26 to +28
// NOTE: This test lives in the `testdog` package (not `datadog`) on purpose: the agent ignores
// `datadog.*` classes for instrumentation, so `@Trace`-annotated methods declared under `datadog.*`
// would never be instrumented. See RxJava3Test for the same convention.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 chore: Please clean up internal comments and LLM reasoning‏

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleared, thanks

SORT_BY_START_TIME,
span().root().operationName("interop-parent").resourceName("interop-parent"),
span()
.childOf(Worker.parentId)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 suggestion: ‏There is a childOfIndex() that would simplify the test case, cleaning up parentId, activeSpan().getSpanId(), etc... This is applicable to multiple parts in the test files.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't know that - looks like childOfIndex() method was introduced quite recently
Thanks

import org.junit.jupiter.params.provider.EnumSource;

@WithConfig(key = "trace.otel.enabled", value = "true")
@WithConfig(key = "integration.opentelemetry-annotations-1.20.enabled", value = "true")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 chore: ‏‏This is a duplicate of trace.otel.enabled

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication removed

String method = "traceAsync" + type.type;
assertTraces(
trace(
otelSpan("RxJava3TracedMethods." + method)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 praise: ‏Interesting use of factory method

Comment on lines +43 to +45
// NOTE: This test lives in the `testdog` package (not `datadog`) on purpose: the agent ignores
// `datadog.*` classes for instrumentation, so `@Trace`-annotated methods declared under `datadog.*`
// would never be instrumented. See the java-lang-21 tests for the same convention.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines +51 to +57
/**
* RxJava 3's AbstractDirectTask creates FINISHED/DISPOSED sentinel FutureTask instances in its
* static initializer. If that initializer runs while a trace is active (e.g. the first scheduled
* delay/timeout under a span), the executor instrumentation captures a continuation on those
* static singletons that is never cancelled, leaking the pending trace. Disable async propagation
* while the type initializer runs.
*/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 chore: We don't want LLM reasoning here, here is a trimmed version to keep relevant info only:‏

Suggested change
/**
* RxJava 3's AbstractDirectTask creates FINISHED/DISPOSED sentinel FutureTask instances in its
* static initializer. If that initializer runs while a trace is active (e.g. the first scheduled
* delay/timeout under a span), the executor instrumentation captures a continuation on those
* static singletons that is never cancelled, leaking the pending trace. Disable async propagation
* while the type initializer runs.
*/
/**
* RxJava 3's AbstractDirectTask creates FINISHED/DISPOSED sentinel FutureTask instances in its
* static initializer.
*/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java docs was cleared and simplified

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Maybe<?> maybe) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 chore: ‏parentContext will never be null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Observable<?> observable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 chore: ‏parentContext will never be null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed

@Override
public Map<String, String> contextStore() {
final Map<String, String> store = new HashMap<>();
store.put("io.reactivex.rxjava3.core.Flowable", Context.class.getName());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 suggestion: ‏Use temporary constant for context class name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this way much better

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Single<?> single) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 chore: ‏parentContext will never be null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

amarziali and others added 16 commits July 3, 2026 10:37
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…izer

RxJava 3 introduced io.reactivex.rxjava3.internal.schedulers.AbstractDirectTask,
whose static initializer constructs FINISHED/DISPOSED sentinel FutureTask
instances. When that initializer first runs under an active trace (e.g. the
first delay/timeout scheduler hop inside a span), the executor instrumentation
captures a ScopeContinuation on those static singletons that is never cancelled,
leaking the pending trace so it is never reported.

Disable async propagation while AbstractDirectTask's type initializer runs,
mirroring the existing reactor.core.scheduler.SchedulerTask/WorkerTask handling.
The matcher is inert unless RxJava 3 is on the classpath. RxJava 2 has no
equivalent class. Restores the delayed-Maybe coverage in RxJava3Test, which
fails without this fix and passes with it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ava3

Two tests were under datadog.trace.instrumentation.rxjava3 while the @Trace-using
tests must live under testdog.* (the agent ignores datadog.* for instrumentation).
Move all four tests to testdog.* for consistency.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@PerfectSlayer PerfectSlayer added the tag: concurrency Virtual Threads, Coroutines, Async, RX, Executors label Jul 3, 2026

@PerfectSlayer PerfectSlayer left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow up changes!

@datadog-datadog-prod-us1

This comment has been minimized.

@ValentinZakharov ValentinZakharov added this pull request to the merge queue Jul 3, 2026
@dd-octo-sts

dd-octo-sts Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

/merge

@gh-worker-devflow-routing-ef8351

gh-worker-devflow-routing-ef8351 Bot commented Jul 3, 2026

Copy link
Copy Markdown

View all feedbacks in Devflow UI.

2026-07-03 10:08:26 UTC ℹ️ Start processing command /merge


2026-07-03 10:08:31 UTC ℹ️ MergeQueue: pull request added to the queue

The expected merge time in master is approximately 1h (p90).


2026-07-03 12:09:05 UTCMergeQueue: The build pipeline has timeout

The merge request has been interrupted because the build 64026140340904028 took longer than expected. The current limit for the base branch 'master' is 120 minutes.

Possible reasons:

  • some mandatory checkruns are failing:
    • Check pull request labels

@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks Jul 3, 2026
@ValentinZakharov

Copy link
Copy Markdown
Contributor Author

/merge

@gh-worker-devflow-routing-ef8351

gh-worker-devflow-routing-ef8351 Bot commented Jul 3, 2026

Copy link
Copy Markdown

View all feedbacks in Devflow UI.

2026-07-03 14:14:40 UTC ℹ️ Start processing command /merge


2026-07-03 14:14:45 UTC ℹ️ MergeQueue: pull request added to the queue

The expected merge time in master is approximately 1h (p90).


2026-07-03 15:46:42 UTC ℹ️ MergeQueue: This merge request was merged

@gh-worker-dd-mergequeue-cf854d gh-worker-dd-mergequeue-cf854d Bot merged commit bb6b14a into master Jul 3, 2026
590 of 591 checks passed
@gh-worker-dd-mergequeue-cf854d gh-worker-dd-mergequeue-cf854d Bot deleted the vzakharov/rxjava3 branch July 3, 2026 15:46
@github-actions github-actions Bot added this to the 1.64.0 milestone Jul 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

inst: others All other instrumentations tag: ai generated Largely based on code generated by an AI or LLM tag: concurrency Virtual Threads, Coroutines, Async, RX, Executors type: enhancement Enhancements and improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants