Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1950,28 +1950,6 @@ project(':clients') {
shadowed
}

// Rewire test fixtures dependencies to avoid depending on the shadow JAR:
// java-test-fixtures creates dependencies testImplementation -> testFixturesApi -> (main artifact)
// It prefers the main artifact over the raw main classpath so that dependents on fixtures get the same
// packaged artifact as production consumers. In this module the main artifact is the shadow JAR,
// so this creates a test dependency on createVersionFile breaking AppInfoParserTest, and worse:
// test code is compiled against the original packages, but the shadow JAR contains relocated bytecode,
// causing runtime errors for various tests. To fix this, we rewire the test fixtures dependency to the raw
// source set output, so tests continue to use compiled classes and original dependencies instead of the shadow JAR.
// https://github.com/gradle/gradle/blob/v9.4.1/platforms/jvm/plugins-jvm-test-fixtures/src/main/java/org/gradle/api/plugins/JavaTestFixturesPlugin.java#L89-L90
afterEvaluate {
configurations.testFixturesApi.dependencies.removeIf { dep ->
dep instanceof ProjectDependency && dep.name == project.name
}
dependencies {
testFixturesApi files(sourceSets.main.output.classesDirs)
testFixturesApi files(sourceSets.main.output.resourcesDir)
}
tasks.named('compileTestFixturesJava') {
dependsOn tasks.named('classes')
}
}

dependencies {
implementation libs.zstd
implementation libs.lz4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.metrics.MetricsContext;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -135,6 +136,19 @@ Resource resource() {
return resource;
}

/**
* The resource attributes as plain labels.
*
* @return resource attributes keyed by label name, preserving insertion order.
*/
Map<String, String> resourceLabels() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method could be reused by updateLabels

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be synchronized?

Map<String, String> labels = new LinkedHashMap<>();
for (KeyValue attribute : resource.getAttributesList()) {
labels.put(attribute.getKey(), attribute.getValue().getStringValue());
}
return labels;
}

/**
* Domain of the active provider i.e. specifies prefix to the metrics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import java.util.function.Predicate;

import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;

public class ClientTelemetryUtils {

Expand Down Expand Up @@ -214,6 +217,29 @@ public static ByteBuffer compress(MetricsData metrics, CompressionType compressi
}
}

/**
* Assembles and compresses a {@code MetricsData} payload for the given metrics. The io.opentelemetry
* types are relocated in the shaded clients jar, so keeping this assembly in main code lets test code
* build and compress a payload without referencing io.opentelemetry directly. The layout mirrors the
* payload produced by {@link ClientTelemetryReporter} (one resource metric per single point metric).
*
* @param metrics the metrics to assemble into a {@code MetricsData} payload
* @param compressionType the compression to apply
* @return the compressed {@code MetricsData} payload
*/
public static ByteBuffer compressMetrics(List<SinglePointMetric> metrics, CompressionType compressionType) throws IOException {
MetricsData.Builder builder = MetricsData.newBuilder();
for (SinglePointMetric metric : metrics) {
builder.addResourceMetrics(ResourceMetrics.newBuilder()
.setResource(Resource.newBuilder().build())
.addScopeMetrics(ScopeMetrics.newBuilder()
.addMetrics(metric.builder())
.build())
.build());
}
return compress(builder.build(), compressionType);
}

public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compressionType, int maxDecompressedBytes) {
Compression compression = Compression.of(compressionType).build();
try (InputStream in = compression.wrapForInput(metrics, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
Expand All @@ -235,14 +261,6 @@ public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compress
}
}

public static MetricsData deserializeMetricsData(ByteBuffer serializedMetricsData) {
try {
return MetricsData.parseFrom(serializedMetricsData);
} catch (IOException e) {
throw new KafkaException("Unable to parse MetricsData payload", e);
}
}

public static Uuid fetchClientInstanceId(ClientTelemetryReporter clientTelemetryReporter, Duration timeout) {
if (timeout.isNegative()) {
throw new IllegalArgumentException("The timeout cannot be negative.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.telemetry.internals;

import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -50,6 +51,60 @@ public Metric.Builder builder() {
return metricBuilder;
}

boolean hasSum() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add comments for those test-only methods?

return metricBuilder.hasSum();
}

boolean hasGauge() {
return metricBuilder.hasGauge();
}

boolean isMonotonic() {
return metricBuilder.getSum().getIsMonotonic();
}

boolean isDeltaTemporality() {
return metricBuilder.getSum().getAggregationTemporality() == AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;
}

int dataPointsCount() {
return metricBuilder.hasSum()
? metricBuilder.getSum().getDataPointsCount()
: metricBuilder.getGauge().getDataPointsCount();
}

long timeUnixNano() {
return dataPoint().getTimeUnixNano();
}

long startTimeUnixNano() {
return dataPoint().getStartTimeUnixNano();
}

double doubleValue() {
return dataPoint().getAsDouble();
}

long longValue() {
return dataPoint().getAsInt();
}

int attributesCount() {
return dataPoint().getAttributesCount();
}

Map<String, String> attributes() {
Map<String, String> attributes = new LinkedHashMap<>();
for (KeyValue attribute : dataPoint().getAttributesList()) {
attributes.put(attribute.getKey(), attribute.getValue().getStringValue());
}
return attributes;
}

private NumberDataPoint dataPoint() {
return metricBuilder.hasSum() ? metricBuilder.getSum().getDataPoints(0) : metricBuilder.getGauge().getDataPoints(0);
}

/*
Methods to construct gauge metric type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@
import java.util.Collections;
import java.util.List;

import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -59,24 +53,22 @@ public void testGetErrorResponse() {
@ParameterizedTest
@EnumSource(CompressionType.class)
public void testMetricsDataCompression(CompressionType compressionType) throws IOException {
MetricsData metricsData = getMetricsData();
PushTelemetryRequest req = getPushTelemetryRequest(metricsData, compressionType);
List<SinglePointMetric> metrics = sampleMetrics();
byte[] raw = Utils.toArray(ClientTelemetryUtils.compressMetrics(metrics, CompressionType.NONE));
PushTelemetryRequest req = getPushTelemetryRequest(metrics, raw, compressionType);

ByteBuffer receivedMetricsBuffer = req.metricsData(1024 * 1024);
assertNotNull(receivedMetricsBuffer);
assertTrue(receivedMetricsBuffer.capacity() > 0);

MetricsData receivedData = ClientTelemetryUtils.deserializeMetricsData(receivedMetricsBuffer);
assertEquals(metricsData, receivedData);
assertArrayEquals(raw, Utils.toArray(receivedMetricsBuffer));
}

private PushTelemetryRequest getPushTelemetryRequest(MetricsData metricsData, CompressionType compressionType) throws IOException {
ByteBuffer compressedData = ClientTelemetryUtils.compress(metricsData, compressionType);
byte[] data = metricsData.toByteArray();
private PushTelemetryRequest getPushTelemetryRequest(List<SinglePointMetric> metrics, byte[] raw, CompressionType compressionType) throws IOException {
ByteBuffer compressedData = ClientTelemetryUtils.compressMetrics(metrics, compressionType);
if (compressionType != CompressionType.NONE) {
assertTrue(compressedData.limit() < data.length);
assertTrue(compressedData.limit() < raw.length);
} else {
assertArrayEquals(Utils.toArray(compressedData), data);
assertArrayEquals(Utils.toArray(compressedData), raw);
}

return new PushTelemetryRequest.Builder(
Expand All @@ -85,36 +77,19 @@ private PushTelemetryRequest getPushTelemetryRequest(MetricsData metricsData, Co
.setCompressionType(compressionType.id)).build();
}

private MetricsData getMetricsData() {
List<Metric> metricsList = new ArrayList<>();
metricsList.add(SinglePointMetric.sum(
new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.sum(
new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.deltaSum(
new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.gauge(
new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.gauge(
new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet())
.builder().build());

MetricsData.Builder builder = MetricsData.newBuilder();
for (Metric metric : metricsList) {
ResourceMetrics rm = ResourceMetrics.newBuilder()
.setResource(Resource.newBuilder().build())
.addScopeMetrics(ScopeMetrics.newBuilder()
.addMetrics(metric)
.build()
).build();
builder.addResourceMetrics(rm);
}

return builder.build();
private List<SinglePointMetric> sampleMetrics() {
List<SinglePointMetric> metrics = new ArrayList<>();
metrics.add(SinglePointMetric.sum(
new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet()));
metrics.add(SinglePointMetric.sum(
new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet()));
metrics.add(SinglePointMetric.deltaSum(
new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet()));
metrics.add(SinglePointMetric.gauge(
new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet()));
metrics.add(SinglePointMetric.gauge(
new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet()));
return metrics;
}

}
Loading
Loading