Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dcdd946
Scaffold rxjava-3.0 instrumentation module
amarziali May 29, 2026
49cf92d
Add rxjava-3.0 Tracing observer/subscriber wrappers
amarziali May 29, 2026
55ad198
Add rxjava-3.0 type instrumentations, module and async result extension
amarziali May 29, 2026
1f65614
Register rxjava3 async result extension for Graal native-image
amarziali May 29, 2026
49c7174
Add rxjava-3.0 SubscriptionTest
amarziali May 29, 2026
c21e594
Add rxjava-3.0 core context propagation test
amarziali May 29, 2026
c6750de
Fix pending-trace leak from RxJava3 AbstractDirectTask static initial…
amarziali May 29, 2026
52d4976
Add rxjava-3.0 @WithSpan async result extension test
amarziali May 29, 2026
3724055
Add rxjava-3.0 Java 8 interop context-propagation investigation test
amarziali May 29, 2026
5d93104
Unify rxjava-3.0 test package under testdog.trace.instrumentation.rxj…
amarziali May 29, 2026
93a4fb2
review
amarziali Jun 1, 2026
e290336
Add config json entry
amarziali Jun 1, 2026
b4cf310
Skip root context capture in Observable and Single CaptureParentSpanA…
ValentinZakharov Jun 22, 2026
23e7443
Add scheduler context propagation and no-spurious-traces tests for Ob…
ValentinZakharov Jun 22, 2026
fcd0e5b
Fix misleading comment in correctParentsFromSubscriptionTime test
ValentinZakharov Jun 22, 2026
9734820
Extend cancel test to cover all five reactive types
ValentinZakharov Jun 22, 2026
af25c55
Ensure publisher-parent span is always finished in cancelUnderTrace
ValentinZakharov Jun 22, 2026
e54fba3
Normalize PropagateParentSpanAdvice guard and add @Nonnull to Tracing…
ValentinZakharov Jun 22, 2026
647452a
spotless
ValentinZakharov Jun 22, 2026
1978e70
spotless
ValentinZakharov Jun 22, 2026
5ad8d82
Extend SubscriptionTest to cover all five RxJava 3 reactive types
ValentinZakharov Jun 26, 2026
7677d50
Add cancelledNever test to cover dispose-path span lifecycle in RxJav…
ValentinZakharov Jun 26, 2026
1ddb984
Remove strictTraceWrites(false) overrides — all tests pass under stri…
ValentinZakharov Jul 2, 2026
0edde12
Remove redundant package comment — testdog convention is implicit acr…
ValentinZakharov Jul 3, 2026
d7f2a8c
Remove redundant comments that describe what the code does rather tha…
ValentinZakharov Jul 3, 2026
14f51b0
Use childOfIndex(0) instead of childOf(parentId) in RxJava3InteropTest
ValentinZakharov Jul 3, 2026
3ea2d88
Remove redundant @WithConfig for opentelemetry-annotations-1.20 integ…
ValentinZakharov Jul 3, 2026
90abb3f
Remove redundant null check on parentContext in CaptureParentSpanAdvice
ValentinZakharov Jul 3, 2026
a6cb915
Extract local variable for repeated Context class name in contextStore()
ValentinZakharov Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) String[
+ "datadog.trace.instrumentation.reactivestreams.ReactiveStreamsAsyncResultExtension:build_time,"
+ "datadog.trace.instrumentation.reactor.core.ReactorAsyncResultExtension:build_time,"
+ "datadog.trace.instrumentation.rxjava2.RxJavaAsyncResultExtension:build_time,"
+ "datadog.trace.instrumentation.rxjava3.RxJavaAsyncResultExtension:build_time,"
+ "datadog.trace.logging.ddlogger.DDLogger:build_time,"
+ "datadog.trace.logging.ddlogger.DDLoggerFactory:build_time,"
+ "datadog.trace.logging.ddlogger.DDLoggerFactory$HelperWrapper:build_time,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public AsyncPropagatingDisableInstrumentation() {
namedOneOf("reactor.core.scheduler.SchedulerTask", "reactor.core.scheduler.WorkerTask");
private static final ElementMatcher<TypeDescription> RXJAVA2_DISABLED_TYPE_INITIALIZERS =
named("io.reactivex.internal.schedulers.AbstractDirectTask");

/**
* RxJava 3's AbstractDirectTask creates FINISHED/DISPOSED sentinel FutureTask instances in its
* static initializer.
*/
Comment on lines +51 to +54

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 chore: We don't want LLM reasoning here, here is a trimmed version to keep relevant info only:‏

Suggested change
/**
* RxJava 3's AbstractDirectTask creates FINISHED/DISPOSED sentinel FutureTask instances in its
* static initializer. If that initializer runs while a trace is active (e.g. the first scheduled
* delay/timeout under a span), the executor instrumentation captures a continuation on those
* static singletons that is never cancelled, leaking the pending trace. Disable async propagation
* while the type initializer runs.
*/
/**
* RxJava 3's AbstractDirectTask creates FINISHED/DISPOSED sentinel FutureTask instances in its
* static initializer.
*/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java docs was cleared and simplified

private static final ElementMatcher<TypeDescription> RXJAVA3_DISABLED_TYPE_INITIALIZERS =
named("io.reactivex.rxjava3.internal.schedulers.AbstractDirectTask");

private static final ElementMatcher<TypeDescription> NETTY_GLOBAL_EVENT_EXECUTOR =
namedOneOf(
"io.netty.util.concurrent.GlobalEventExecutor",
Expand Down Expand Up @@ -90,6 +98,7 @@ public String[] knownMatchingTypes() {
"org.apache.activemq.broker.TransactionBroker",
"com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager",
"io.reactivex.internal.schedulers.AbstractDirectTask",
"io.reactivex.rxjava3.internal.schedulers.AbstractDirectTask",
"jdk.internal.net.http.HttpClientImpl",
LETTUCE_HANDSHAKE_HANDLER,
"io.netty.util.concurrent.GlobalEventExecutor",
Expand All @@ -110,6 +119,7 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
.or(GRPC_MANAGED_CHANNEL)
.or(REACTOR_DISABLED_TYPE_INITIALIZERS)
.or(RXJAVA2_DISABLED_TYPE_INITIALIZERS)
.or(RXJAVA3_DISABLED_TYPE_INITIALIZERS)
.or(JAVA_HTTP_CLIENT);
}

Expand Down Expand Up @@ -196,6 +206,8 @@ public void methodAdvice(MethodTransformer transformer) {
isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice);
transformer.applyAdvice(
isTypeInitializer().and(isDeclaredBy(RXJAVA2_DISABLED_TYPE_INITIALIZERS)), advice);
transformer.applyAdvice(
isTypeInitializer().and(isDeclaredBy(RXJAVA3_DISABLED_TYPE_INITIALIZERS)), advice);
transformer.applyAdvice(
isTypeInitializer().and(isDeclaredBy(NETTY_GLOBAL_EVENT_EXECUTOR)), advice);
transformer.applyAdvice(namedOneOf("sendAsync").and(isDeclaredBy(JAVA_HTTP_CLIENT)), advice);
Expand Down
36 changes: 36 additions & 0 deletions dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
muzzle {
pass {
group = "io.reactivex.rxjava3"
module = "rxjava"
versions = "[3.0.0,)"
}
// Assert the rxjava3 advice never resolves against rxjava2 — the two namespaces
// must not overlap. rxjava3 references io.reactivex.rxjava3.core.*, absent from the
// rxjava2 artifact, so muzzle must fail to match it.
fail {
name = "rxjava2-must-not-match"
group = "io.reactivex.rxjava2"
module = "rxjava"
versions = "[2.0.0,)"
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')

dependencies {
compileOnly group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3'
compileOnly group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0'

testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation')
testImplementation project(':dd-java-agent:instrumentation:opentelemetry:opentelemetry-annotations-1.20')
testImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0'
testImplementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '1.28.0'

// Load the rxjava2 instrumenter at test runtime to prove the two versions coexist on
// the agent without interference (it stays dormant with only rxjava3 on the classpath).
testRuntimeOnly project(':dd-java-agent:instrumentation:rxjava:rxjava-2.0')

latestDepTestImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import net.bytebuddy.asm.Advice;

public final class CompletableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Completable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.CompletableObserver"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Completable completable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Completable.class, Context.class)
.put(completable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Completable completable,
@Advice.Argument(value = 0, readOnly = false) CompletableObserver observer) {
if (observer != null) {
Context parentContext =
InstrumentationContext.get(Completable.class, Context.class).get(completable);
if (parentContext != null) {
observer = new TracingCompletableObserver(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import net.bytebuddy.asm.Advice;

public final class FlowableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Flowable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.FlowableSubscriber"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Flowable<?> flowable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Flowable.class, Context.class).put(flowable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Flowable<?> flowable,
@Advice.Argument(value = 0, readOnly = false) FlowableSubscriber<?> subscriber) {
if (subscriber != null) {
Context parentContext =
InstrumentationContext.get(Flowable.class, Context.class).get(flowable);
if (parentContext != null) {
subscriber = new TracingSubscriber<>(subscriber, parentContext);
// attach the context here in case additional subscribers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeObserver;
import net.bytebuddy.asm.Advice;

public final class MaybeInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Maybe";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.MaybeObserver"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Maybe<?> maybe) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Maybe.class, Context.class).put(maybe, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Maybe<?> maybe,
@Advice.Argument(value = 0, readOnly = false) MaybeObserver<?> observer) {
if (observer != null) {
Context parentContext = InstrumentationContext.get(Maybe.class, Context.class).get(maybe);
if (parentContext != null) {
observer = new TracingMaybeObserver<>(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import net.bytebuddy.asm.Advice;

public final class ObservableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Observable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.Observer"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Observable<?> observable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Observable.class, Context.class).put(observable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Observable<?> observable,
@Advice.Argument(value = 0, readOnly = false) Observer<?> observer) {
if (observer != null) {
Context parentContext =
InstrumentationContext.get(Observable.class, Context.class).get(observable);
if (parentContext != null) {
observer = new TracingObserver<>(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Loading
Loading