From 1a43879e33d1c8ed35990ebbd64783ef73b45846 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 29 Jun 2026 20:22:31 +0800 Subject: [PATCH 1/5] completed migration --- build.gradle | 22 -- .../internals/ClientTelemetryProvider.java | 14 + .../internals/ClientTelemetryReporter.java | 2 +- .../internals/ClientTelemetryUtils.java | 29 +- .../internals/SinglePointMetric.java | 55 ++++ .../requests/PushTelemetryRequestTest.java | 67 ++--- .../ClientTelemetryReporterTest.java | 74 ++--- .../internals/ClientTelemetryUtilsTest.java | 73 ++--- .../internals/KafkaMetricsCollectorTest.java | 243 +++++++--------- .../internals/SinglePointMetricTest.java | 272 +++++++----------- 10 files changed, 349 insertions(+), 502 deletions(-) diff --git a/build.gradle b/build.gradle index 4d44740d9e631..e92b53b65d2bd 100644 --- a/build.gradle +++ b/build.gradle @@ -1950,28 +1950,6 @@ project(':clients') { shadowed } - // Rewire test fixtures dependencies to avoid depending on the shadow JAR: - // java-test-fixtures creates dependencies testImplementation -> testFixturesApi -> (main artifact) - // It prefers the main artifact over the raw main classpath so that dependents on fixtures get the same - // packaged artifact as production consumers. In this module the main artifact is the shadow JAR, - // so this creates a test dependency on createVersionFile breaking AppInfoParserTest, and worse: - // test code is compiled against the original packages, but the shadow JAR contains relocated bytecode, - // causing runtime errors for various tests. To fix this, we rewire the test fixtures dependency to the raw - // source set output, so tests continue to use compiled classes and original dependencies instead of the shadow JAR. - // https://github.com/gradle/gradle/blob/v9.4.1/platforms/jvm/plugins-jvm-test-fixtures/src/main/java/org/gradle/api/plugins/JavaTestFixturesPlugin.java#L89-L90 - afterEvaluate { - configurations.testFixturesApi.dependencies.removeIf { dep -> - dep instanceof ProjectDependency && dep.name == project.name - } - dependencies { - testFixturesApi files(sourceSets.main.output.classesDirs) - testFixturesApi files(sourceSets.main.output.resourcesDir) - } - tasks.named('compileTestFixturesJava') { - dependsOn tasks.named('classes') - } - } - dependencies { implementation libs.zstd implementation libs.lz4 diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java index e6ee5a45cd3a2..6f6db218ae055 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryProvider.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.MetricsContext; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -135,6 +136,19 @@ Resource resource() { return resource; } + /** + * The resource attributes as plain labels. + * + * @return resource attributes keyed by label name, preserving insertion order. + */ + Map resourceLabels() { + Map labels = new LinkedHashMap<>(); + for (KeyValue attribute : resource.getAttributesList()) { + labels.put(attribute.getKey(), attribute.getValue().getStringValue()); + } + return labels; + } + /** * Domain of the active provider i.e. specifies prefix to the metrics. * diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index cee8ff435a912..4bb3da49ae62d 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -725,7 +725,7 @@ private Optional> createPushRequest(ClientTelemetrySubscription local CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { - compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); + compressedPayload = ClientTelemetryUtils.compress(ByteBuffer.wrap(payload.toByteArray()), compressionType); } catch (Throwable e) { // Distinguish between recoverable errors (NoClassDefFoundError for missing compression libs) // and fatal errors (OutOfMemoryError, etc.) that should terminate telemetry. diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java index d67775e1afbcd..9e1478b17c4fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.record.internal.CompressionType; import org.apache.kafka.common.record.internal.RecordBatch; import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.internals.BufferSupplier; import org.slf4j.Logger; @@ -43,6 +44,9 @@ import java.util.function.Predicate; import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.resource.v1.Resource; public class ClientTelemetryUtils { @@ -203,17 +207,30 @@ public static CompressionType preferredCompressionType(List acc .orElse(CompressionType.NONE); } - public static ByteBuffer compress(MetricsData metrics, CompressionType compressionType) throws IOException { + public static ByteBuffer compress(ByteBuffer serializedMetrics, CompressionType compressionType) throws IOException { try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { Compression compression = Compression.of(compressionType).build(); try (OutputStream out = compression.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { - metrics.writeTo(out); + out.write(Utils.toArray(serializedMetrics.duplicate())); } compressedOut.buffer().flip(); return compressedOut.buffer(); } } + public static ByteBuffer serializeMetricsData(List metrics) { + MetricsData.Builder builder = MetricsData.newBuilder(); + for (SinglePointMetric metric : metrics) { + builder.addResourceMetrics(ResourceMetrics.newBuilder() + .setResource(Resource.newBuilder().build()) + .addScopeMetrics(ScopeMetrics.newBuilder() + .addMetrics(metric.builder()) + .build()) + .build()); + } + return ByteBuffer.wrap(builder.build().toByteArray()); + } + public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compressionType, int maxDecompressedBytes) { Compression compression = Compression.of(compressionType).build(); try (InputStream in = compression.wrapForInput(metrics, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); @@ -235,14 +252,6 @@ public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compress } } - public static MetricsData deserializeMetricsData(ByteBuffer serializedMetricsData) { - try { - return MetricsData.parseFrom(serializedMetricsData); - } catch (IOException e) { - throw new KafkaException("Unable to parse MetricsData payload", e); - } - } - public static Uuid fetchClientInstanceId(ClientTelemetryReporter clientTelemetryReporter, Duration timeout) { if (timeout.isNegative()) { throw new IllegalArgumentException("The timeout cannot be negative."); diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java index ec02f220fc549..c391865fe36fe 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.telemetry.internals; import java.time.Instant; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -50,6 +51,60 @@ public Metric.Builder builder() { return metricBuilder; } + boolean hasSum() { + return metricBuilder.hasSum(); + } + + boolean hasGauge() { + return metricBuilder.hasGauge(); + } + + boolean isMonotonic() { + return metricBuilder.getSum().getIsMonotonic(); + } + + boolean isDeltaTemporality() { + return metricBuilder.getSum().getAggregationTemporality() == AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; + } + + int dataPointsCount() { + return metricBuilder.hasSum() + ? metricBuilder.getSum().getDataPointsCount() + : metricBuilder.getGauge().getDataPointsCount(); + } + + long timeUnixNano() { + return dataPoint().getTimeUnixNano(); + } + + long startTimeUnixNano() { + return dataPoint().getStartTimeUnixNano(); + } + + double doubleValue() { + return dataPoint().getAsDouble(); + } + + long longValue() { + return dataPoint().getAsInt(); + } + + int attributesCount() { + return dataPoint().getAttributesCount(); + } + + Map attributes() { + Map attributes = new LinkedHashMap<>(); + for (KeyValue attribute : dataPoint().getAttributesList()) { + attributes.put(attribute.getKey(), attribute.getValue().getStringValue()); + } + return attributes; + } + + private NumberDataPoint dataPoint() { + return metricBuilder.hasSum() ? metricBuilder.getSum().getDataPoints(0) : metricBuilder.getGauge().getDataPoints(0); + } + /* Methods to construct gauge metric type. */ diff --git a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java index 198b8b4340e69..5523148b856f9 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java @@ -36,12 +36,6 @@ import java.util.Collections; import java.util.List; -import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.MetricsData; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import io.opentelemetry.proto.metrics.v1.ScopeMetrics; -import io.opentelemetry.proto.resource.v1.Resource; - import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -59,24 +53,22 @@ public void testGetErrorResponse() { @ParameterizedTest @EnumSource(CompressionType.class) public void testMetricsDataCompression(CompressionType compressionType) throws IOException { - MetricsData metricsData = getMetricsData(); - PushTelemetryRequest req = getPushTelemetryRequest(metricsData, compressionType); + ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); + byte[] raw = Utils.toArray(serialized.duplicate()); + PushTelemetryRequest req = getPushTelemetryRequest(serialized, raw, compressionType); ByteBuffer receivedMetricsBuffer = req.metricsData(1024 * 1024); assertNotNull(receivedMetricsBuffer); assertTrue(receivedMetricsBuffer.capacity() > 0); - - MetricsData receivedData = ClientTelemetryUtils.deserializeMetricsData(receivedMetricsBuffer); - assertEquals(metricsData, receivedData); + assertArrayEquals(raw, Utils.toArray(receivedMetricsBuffer)); } - private PushTelemetryRequest getPushTelemetryRequest(MetricsData metricsData, CompressionType compressionType) throws IOException { - ByteBuffer compressedData = ClientTelemetryUtils.compress(metricsData, compressionType); - byte[] data = metricsData.toByteArray(); + private PushTelemetryRequest getPushTelemetryRequest(ByteBuffer serializedMetrics, byte[] raw, CompressionType compressionType) throws IOException { + ByteBuffer compressedData = ClientTelemetryUtils.compress(serializedMetrics, compressionType); if (compressionType != CompressionType.NONE) { - assertTrue(compressedData.limit() < data.length); + assertTrue(compressedData.limit() < raw.length); } else { - assertArrayEquals(Utils.toArray(compressedData), data); + assertArrayEquals(Utils.toArray(compressedData), raw); } return new PushTelemetryRequest.Builder( @@ -85,36 +77,19 @@ private PushTelemetryRequest getPushTelemetryRequest(MetricsData metricsData, Co .setCompressionType(compressionType.id)).build(); } - private MetricsData getMetricsData() { - List metricsList = new ArrayList<>(); - metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet()) - .builder().build()); - metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet()) - .builder().build()); - metricsList.add(SinglePointMetric.deltaSum( - new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet()) - .builder().build()); - metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet()) - .builder().build()); - metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet()) - .builder().build()); - - MetricsData.Builder builder = MetricsData.newBuilder(); - for (Metric metric : metricsList) { - ResourceMetrics rm = ResourceMetrics.newBuilder() - .setResource(Resource.newBuilder().build()) - .addScopeMetrics(ScopeMetrics.newBuilder() - .addMetrics(metric) - .build() - ).build(); - builder.addResourceMetrics(rm); - } - - return builder.build(); + private List sampleMetrics() { + List metrics = new ArrayList<>(); + metrics.add(SinglePointMetric.sum( + new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet())); + metrics.add(SinglePointMetric.sum( + new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet())); + metrics.add(SinglePointMetric.deltaSum( + new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet())); + metrics.add(SinglePointMetric.gauge( + new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet())); + metrics.add(SinglePointMetric.gauge( + new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet())); + return metrics; } } diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index 64697907deadf..192971a820605 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -62,8 +62,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import io.opentelemetry.proto.common.v1.KeyValue; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -102,11 +100,9 @@ public void testInitTelemetryReporter() { clientTelemetryReporter.configure(configs); clientTelemetryReporter.contextChange(metricsContext); assertNotNull(clientTelemetryReporter.metricsCollector()); - assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); - assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount()); - assertEquals( - ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey()); - assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue()); + Map labels = clientTelemetryReporter.telemetryProvider().resourceLabels(); + assertEquals(1, labels.size()); + assertEquals("rack", labels.get(ClientTelemetryProvider.CLIENT_RACK)); } @Test @@ -130,17 +126,11 @@ public void testProducerLabels() { clientTelemetryReporter.configure(configs); clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.producer")); assertNotNull(clientTelemetryReporter.metricsCollector()); - assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); - - List attributes = clientTelemetryReporter.telemetryProvider().resource().getAttributesList(); - assertEquals(2, attributes.size()); - attributes.forEach(attribute -> { - if (attribute.getKey().equals(ClientTelemetryProvider.CLIENT_RACK)) { - assertEquals("rack", attribute.getValue().getStringValue()); - } else if (attribute.getKey().equals(ClientTelemetryProvider.TRANSACTIONAL_ID)) { - assertEquals("transaction-id", attribute.getValue().getStringValue()); - } - }); + + Map labels = clientTelemetryReporter.telemetryProvider().resourceLabels(); + assertEquals(2, labels.size()); + assertEquals("rack", labels.get(ClientTelemetryProvider.CLIENT_RACK)); + assertEquals("transaction-id", labels.get(ClientTelemetryProvider.TRANSACTIONAL_ID)); } @Test @@ -154,19 +144,12 @@ public void testConsumerLabels() { clientTelemetryReporter.configure(configs); clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.consumer")); assertNotNull(clientTelemetryReporter.metricsCollector()); - assertNotNull(clientTelemetryReporter.telemetryProvider().resource()); - - List attributes = clientTelemetryReporter.telemetryProvider().resource().getAttributesList(); - assertEquals(3, attributes.size()); - attributes.forEach(attribute -> { - if (attribute.getKey().equals(ClientTelemetryProvider.CLIENT_RACK)) { - assertEquals("rack", attribute.getValue().getStringValue()); - } else if (attribute.getKey().equals(ClientTelemetryProvider.GROUP_ID)) { - assertEquals("group-id", attribute.getValue().getStringValue()); - } else if (attribute.getKey().equals(ClientTelemetryProvider.GROUP_INSTANCE_ID)) { - assertEquals("group-instance-id", attribute.getValue().getStringValue()); - } - }); + + Map labels = clientTelemetryReporter.telemetryProvider().resourceLabels(); + assertEquals(3, labels.size()); + assertEquals("rack", labels.get(ClientTelemetryProvider.CLIENT_RACK)); + assertEquals("group-id", labels.get(ClientTelemetryProvider.GROUP_ID)); + assertEquals("group-instance-id", labels.get(ClientTelemetryProvider.GROUP_INSTANCE_ID)); } @Test @@ -188,34 +171,19 @@ public void testTelemetryReporterCloseMultipleTimesNoException() { public void testUpdateMetricsLabels() { clientTelemetryReporter.configure(configs); clientTelemetryReporter.contextChange(metricsContext); - assertTrue(clientTelemetryReporter.telemetryProvider().resource().getAttributesList().isEmpty()); + assertTrue(clientTelemetryReporter.telemetryProvider().resourceLabels().isEmpty()); clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key1", "value1")); - assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size()); - assertEquals("key1", clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0).getKey()); - assertEquals("value1", clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0).getValue().getStringValue()); + assertEquals(Collections.singletonMap("key1", "value1"), + clientTelemetryReporter.telemetryProvider().resourceLabels()); clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "value2")); - assertEquals(2, clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size()); - clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute -> { - if (attribute.getKey().equals("key1")) { - assertEquals("value1", attribute.getValue().getStringValue()); - } else { - assertEquals("key2", attribute.getKey()); - assertEquals("value2", attribute.getValue().getStringValue()); - } - }); + assertEquals(Map.of("key1", "value1", "key2", "value2"), + clientTelemetryReporter.telemetryProvider().resourceLabels()); clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "valueUpdated")); - assertEquals(2, clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size()); - clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute -> { - if (attribute.getKey().equals("key1")) { - assertEquals("value1", attribute.getValue().getStringValue()); - } else { - assertEquals("key2", attribute.getKey()); - assertEquals("valueUpdated", attribute.getValue().getStringValue()); - } - }); + assertEquals(Map.of("key1", "value1", "key2", "valueUpdated"), + clientTelemetryReporter.telemetryProvider().resourceLabels()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java index 7ca1a41fb4ccb..21bd86110b030 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java @@ -36,12 +36,6 @@ import java.util.Set; import java.util.function.Predicate; -import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.MetricsData; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import io.opentelemetry.proto.metrics.v1.ScopeMetrics; -import io.opentelemetry.proto.resource.v1.Resource; - import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -146,9 +140,9 @@ public void testPreferredCompressionType() { @ParameterizedTest @EnumSource(CompressionType.class) public void testCompressDecompress(CompressionType compressionType) throws IOException { - MetricsData metricsData = getMetricsData(); - byte[] raw = metricsData.toByteArray(); - ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, compressionType); + ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); + byte[] raw = Utils.toArray(serialized.duplicate()); + ByteBuffer compressed = ClientTelemetryUtils.compress(serialized, compressionType); assertNotNull(compressed); if (compressionType != CompressionType.NONE) { assertTrue(compressed.limit() < raw.length); @@ -157,49 +151,30 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc } ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType, 1024 * 1024); assertNotNull(decompressed); - byte[] actualResult = Utils.toArray(decompressed); - assertArrayEquals(raw, actualResult); + assertArrayEquals(raw, Utils.toArray(decompressed)); } - private MetricsData getMetricsData() { - List metricsList = new ArrayList<>(); - metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName"), 1.0, true, Instant.now(), null, Set.of()) - .builder().build()); - metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Set.of()) - .builder().build()); - metricsList.add(SinglePointMetric.deltaSum( - new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Set.of()) - .builder().build()); - metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName3"), 1.0, Instant.now(), Set.of()) - .builder().build()); - metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Set.of()) - .builder().build()); - - MetricsData.Builder builder = MetricsData.newBuilder(); - for (Metric metric : metricsList) { - ResourceMetrics rm = ResourceMetrics.newBuilder() - .setResource(Resource.newBuilder().build()) - .addScopeMetrics(ScopeMetrics.newBuilder() - .addMetrics(metric) - .build() - ).build(); - builder.addResourceMetrics(rm); - } - - return builder.build(); + private List sampleMetrics() { + List metrics = new ArrayList<>(); + metrics.add(SinglePointMetric.sum( + new MetricKey("metricName"), 1.0, true, Instant.now(), null, Set.of())); + metrics.add(SinglePointMetric.sum( + new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Set.of())); + metrics.add(SinglePointMetric.deltaSum( + new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Set.of())); + metrics.add(SinglePointMetric.gauge( + new MetricKey("metricName3"), 1.0, Instant.now(), Set.of())); + metrics.add(SinglePointMetric.gauge( + new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Set.of())); + return metrics; } @Test public void testDecompressExceedingMaxSizeThrows() throws IOException { - // Compress a large payload using the existing compress API (via MetricsData) - // then verify decompression with a small limit throws - MetricsData metricsData = getMetricsData(); - ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, CompressionType.GZIP); - byte[] raw = metricsData.toByteArray(); + // Compress a payload then verify decompression with a small limit throws. + ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); + byte[] raw = Utils.toArray(serialized.duplicate()); + ByteBuffer compressed = ClientTelemetryUtils.compress(serialized, CompressionType.GZIP); // Set limit smaller than the actual decompressed size int smallLimit = raw.length - 1; @@ -210,9 +185,9 @@ public void testDecompressExceedingMaxSizeThrows() throws IOException { @Test public void testDecompressWithPayloadSizeSucceeds() throws IOException { - MetricsData metricsData = getMetricsData(); - byte[] raw = metricsData.toByteArray(); - ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, CompressionType.GZIP); + ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); + byte[] raw = Utils.toArray(serialized.duplicate()); + ByteBuffer compressed = ClientTelemetryUtils.compress(serialized, CompressionType.GZIP); // Set limit to exact limit prior compression. ByteBuffer result = ClientTelemetryUtils.decompress(compressed, CompressionType.GZIP, raw.length); diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollectorTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollectorTest.java index bc6a9c4ca3080..b23ce1aa0e152 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollectorTest.java @@ -35,13 +35,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import io.opentelemetry.proto.common.v1.KeyValue; -import io.opentelemetry.proto.metrics.v1.AggregationTemporality; -import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -99,21 +92,18 @@ public void testMeasurableCounter() { // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable. assertEquals(2, result.size()); - Metric counter = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get(); + SinglePointMetric counter = metricByName(result, "test.domain.group1.name1"); assertTrue(counter.hasSum()); - assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList())); + assertEquals(tags, counter.attributes()); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality()); - assertTrue(counter.getSum().getIsMonotonic()); - NumberDataPoint point = counter.getSum().getDataPoints(0); - assertEquals(2d, point.getAsDouble(), 0.0); + assertFalse(counter.isDeltaTemporality()); + assertTrue(counter.isMonotonic()); + assertEquals(2d, counter.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) + - Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(61L).getNano(), counter.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) + - Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(1L).getNano(), counter.startTimeUnixNano()); } @Test @@ -134,21 +124,18 @@ public void testMeasurableCounterDeltaMetrics() { // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable. assertEquals(2, result.size()); - Metric counter = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get(); + SinglePointMetric counter = metricByName(result, "test.domain.group1.name1"); assertTrue(counter.hasSum()); - assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList())); + assertEquals(tags, counter.attributes()); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality()); - assertTrue(counter.getSum().getIsMonotonic()); - NumberDataPoint point = counter.getSum().getDataPoints(0); - assertEquals(2d, point.getAsDouble(), 0.0); + assertTrue(counter.isDeltaTemporality()); + assertTrue(counter.isMonotonic()); + assertEquals(2d, counter.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) + - Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(61L).getNano(), counter.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) + - Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(1L).getNano(), counter.startTimeUnixNano()); } @Test @@ -166,13 +153,11 @@ public void testMeasurableTotal() { // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable. assertEquals(2, result.size()); - Metric counter = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get(); + SinglePointMetric counter = metricByName(result, "test.domain.group1.name1"); assertTrue(counter.hasSum()); - assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList())); - assertEquals(15, counter.getSum().getDataPoints(0).getAsDouble(), 0.0); + assertEquals(tags, counter.attributes()); + assertEquals(15, counter.doubleValue(), 0.0); } @Test @@ -191,13 +176,11 @@ public void testMeasurableTotalDeltaMetrics() { // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable. assertEquals(2, result.size()); - Metric counter = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get(); + SinglePointMetric counter = metricByName(result, "test.domain.group1.name1"); assertTrue(counter.hasSum()); - assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList())); - assertEquals(15, counter.getSum().getDataPoints(0).getAsDouble(), 0.0); + assertEquals(tags, counter.attributes()); + assertEquals(15, counter.doubleValue(), 0.0); } @Test @@ -210,13 +193,11 @@ public void testMeasurableGauge() { // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable. assertEquals(2, result.size()); - Metric counter = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get(); + SinglePointMetric counter = metricByName(result, "test.domain.group1.name1"); assertTrue(counter.hasGauge()); - assertEquals(tags, getTags(counter.getGauge().getDataPoints(0).getAttributesList())); - assertEquals(100L, counter.getGauge().getDataPoints(0).getAsDouble(), 0.0); + assertEquals(tags, counter.attributes()); + assertEquals(100L, counter.doubleValue(), 0.0); } @Test @@ -233,21 +214,19 @@ public void testNonMeasurable() { assertEquals(5, result.size()); result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.(float|double)")).forEach( + .filter(metric -> metric.key().name().equals("test.domain.group1.(float|double)")).forEach( doubleGauge -> { assertTrue(doubleGauge.hasGauge()); - assertEquals(tags, getTags(doubleGauge.getGauge().getDataPoints(0).getAttributesList())); - assertEquals(99d, doubleGauge.getGauge().getDataPoints(0).getAsDouble(), 0.0); + assertEquals(tags, doubleGauge.attributes()); + assertEquals(99d, doubleGauge.doubleValue(), 0.0); }); result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.(int|long)")).forEach( + .filter(metric -> metric.key().name().equals("test.domain.group1.(int|long)")).forEach( intGauge -> { assertTrue(intGauge.hasGauge()); - assertEquals(tags, getTags(intGauge.getGauge().getDataPoints(0).getAttributesList())); - assertEquals(100, intGauge.getGauge().getDataPoints(0).getAsDouble(), 0.0); + assertEquals(tags, intGauge.attributes()); + assertEquals(100, intGauge.doubleValue(), 0.0); }); } @@ -263,11 +242,9 @@ public void testMeasurableWithException() { //Verify only the global count of metrics exist assertEquals(1, result.size()); // Group is registered as kafka-metrics-count - assertEquals("test.domain.kafka.count.count", result.get(0).builder().build().getName()); + assertEquals("test.domain.kafka.count.count", result.get(0).key().name()); //Verify metrics with measure() method throw exception is not returned - assertFalse(result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .anyMatch(metric -> metric.getName().equals("test.domain.group1.name1"))); + assertFalse(result.stream().anyMatch(metric -> metric.key().name().equals("test.domain.group1.name1"))); } @Test @@ -285,7 +262,7 @@ public void testMetricRemoval() { collector.collect(testEmitter); List collected = testEmitter.emittedMetrics(); assertEquals(1, collected.size()); - assertEquals("test.domain.kafka.count.count", collected.get(0).builder().build().getName()); + assertEquals("test.domain.kafka.count.count", collected.get(0).key().name()); } @Test @@ -314,18 +291,15 @@ public void testSecondCollectCumulative() { assertEquals(2, result.size()); - Metric cumulative = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get(); + SinglePointMetric cumulative = metricByName(result, "test.domain.group1.name1"); - NumberDataPoint point = cumulative.getSum().getDataPoints(0); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, cumulative.getSum().getAggregationTemporality()); - assertTrue(cumulative.getSum().getIsMonotonic()); - assertEquals(7d, point.getAsDouble(), 0.0); + assertFalse(cumulative.isDeltaTemporality()); + assertTrue(cumulative.isMonotonic()); + assertEquals(7d, cumulative.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) + - Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(121L).getNano(), cumulative.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) + - Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(1L).getNano(), cumulative.startTimeUnixNano()); } @Test @@ -354,18 +328,15 @@ public void testSecondDeltaCollectDouble() { assertEquals(2, result.size()); - Metric cumulative = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get(); + SinglePointMetric cumulative = metricByName(result, "test.domain.group1.name1"); - NumberDataPoint point = cumulative.getSum().getDataPoints(0); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, cumulative.getSum().getAggregationTemporality()); - assertTrue(cumulative.getSum().getIsMonotonic()); - assertEquals(5d, point.getAsDouble(), 0.0); + assertTrue(cumulative.isDeltaTemporality()); + assertTrue(cumulative.isMonotonic()); + assertEquals(5d, cumulative.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) + - Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(121L).getNano(), cumulative.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) + - Instant.ofEpochSecond(61L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(61L).getNano(), cumulative.startTimeUnixNano()); } @Test @@ -379,10 +350,10 @@ public void testCollectFilter() { // Should get exactly 1 Kafka measurables because we excluded the count measurable assertEquals(1, result.size()); - Metric counter = result.get(0).builder().build(); + SinglePointMetric counter = result.get(0); assertTrue(counter.hasGauge()); - assertEquals(100L, counter.getGauge().getDataPoints(0).getAsDouble(), 0.0); + assertEquals(100L, counter.doubleValue(), 0.0); } @Test @@ -484,14 +455,13 @@ public void testCollectMetricsWithTemporalityChange() { collector.collect(testEmitter); List result = testEmitter.emittedMetrics(); - Metric counter = result.get(0).builder().build(); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality()); - NumberDataPoint point = counter.getSum().getDataPoints(0); - assertEquals(1d, point.getAsDouble()); + SinglePointMetric counter = result.get(0); + assertFalse(counter.isDeltaTemporality()); + assertEquals(1d, counter.doubleValue()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) + - Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(61L).getNano(), counter.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) + - Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(1L).getNano(), counter.startTimeUnixNano()); // Again emit metrics as cumulative, verify the start time is unchanged and current time is // advanced by 60 seconds again. @@ -501,14 +471,13 @@ public void testCollectMetricsWithTemporalityChange() { collector.collect(testEmitter); result = testEmitter.emittedMetrics(); - counter = result.get(0).builder().build(); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality()); - point = counter.getSum().getDataPoints(0); - assertEquals(2d, point.getAsDouble(), 0.0); + counter = result.get(0); + assertFalse(counter.isDeltaTemporality()); + assertEquals(2d, counter.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) + - Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(121L).getNano(), counter.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) + - Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(1L).getNano(), counter.startTimeUnixNano()); // Change Temporality. Emit metrics as delta, verify the temporality changes to delta and start time is reset to @@ -521,14 +490,13 @@ public void testCollectMetricsWithTemporalityChange() { collector.collect(testEmitter); result = testEmitter.emittedMetrics(); - counter = result.get(0).builder().build(); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality()); - point = counter.getSum().getDataPoints(0); - assertEquals(3d, point.getAsDouble(), 0.0); + counter = result.get(0); + assertTrue(counter.isDeltaTemporality()); + assertEquals(3d, counter.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) + - Instant.ofEpochSecond(181L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(181L).getNano(), counter.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) + - Instant.ofEpochSecond(181L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(181L).getNano(), counter.startTimeUnixNano()); // Again emit metrics as delta, verify the start time is tracked properly and only delta value // is present on response. @@ -538,14 +506,13 @@ public void testCollectMetricsWithTemporalityChange() { collector.collect(testEmitter); result = testEmitter.emittedMetrics(); - counter = result.get(0).builder().build(); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality()); - point = counter.getSum().getDataPoints(0); - assertEquals(1d, point.getAsDouble(), 0.0); + counter = result.get(0); + assertTrue(counter.isDeltaTemporality()); + assertEquals(1d, counter.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(241L).getEpochSecond()) + - Instant.ofEpochSecond(241L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(241L).getNano(), counter.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) + - Instant.ofEpochSecond(181L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(181L).getNano(), counter.startTimeUnixNano()); // Change Temporality. Emit metrics as cumulative, verify the temporality changes to cumulative // and start time is reset to current time. @@ -557,14 +524,13 @@ public void testCollectMetricsWithTemporalityChange() { collector.collect(testEmitter); result = testEmitter.emittedMetrics(); - counter = result.get(0).builder().build(); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality()); - point = counter.getSum().getDataPoints(0); - assertEquals(5d, point.getAsDouble(), 0.0); + counter = result.get(0); + assertFalse(counter.isDeltaTemporality()); + assertEquals(5d, counter.doubleValue(), 0.0); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(301L).getEpochSecond()) + - Instant.ofEpochSecond(301L).getNano(), point.getTimeUnixNano()); + Instant.ofEpochSecond(301L).getNano(), counter.timeUnixNano()); assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(301L).getEpochSecond()) + - Instant.ofEpochSecond(301L).getNano(), point.getStartTimeUnixNano()); + Instant.ofEpochSecond(301L).getNano(), counter.startTimeUnixNano()); } @Test @@ -592,26 +558,19 @@ public void testCollectMetricsWithExcludeLabels() { // Collect sum metrics collector.collect(testEmitter); List result = testEmitter.emittedMetrics(); - Metric metric = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(m -> m.getName().equals("test.domain.group1.nonmeasurable")).findFirst().get(); - - assertEquals(1, metric.getGauge().getDataPointsCount()); - NumberDataPoint point = metric.getGauge().getDataPoints(0); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag1", point.getAttributes(0).getKey()); - assertEquals("value1", point.getAttributes(0).getValue().getStringValue()); - - metric = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(m -> m.getName().equals("test.domain.group1.counter")).findFirst().get(); - - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality()); - assertEquals(1, metric.getSum().getDataPointsCount()); - point = metric.getSum().getDataPoints(0); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag1", point.getAttributes(0).getKey()); - assertEquals("value1", point.getAttributes(0).getValue().getStringValue()); + SinglePointMetric metric = metricByName(result, "test.domain.group1.nonmeasurable"); + + assertTrue(metric.hasGauge()); + assertEquals(1, metric.dataPointsCount()); + assertEquals(1, metric.attributesCount()); + assertEquals(Collections.singletonMap("tag1", "value1"), metric.attributes()); + + metric = metricByName(result, "test.domain.group1.counter"); + + assertFalse(metric.isDeltaTemporality()); + assertEquals(1, metric.dataPointsCount()); + assertEquals(1, metric.attributesCount()); + assertEquals(Collections.singletonMap("tag1", "value1"), metric.attributes()); testEmitter.reset(); testEmitter.onlyDeltaMetrics(true); @@ -619,16 +578,19 @@ public void testCollectMetricsWithExcludeLabels() { result = testEmitter.emittedMetrics(); // Delta metrics. - metric = result.stream() - .flatMap(metrics -> Stream.of(metrics.builder().build())) - .filter(m -> m.getName().equals("test.domain.group1.counter")).findFirst().get(); - - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, metric.getSum().getAggregationTemporality()); - assertEquals(1, metric.getSum().getDataPointsCount()); - point = metric.getSum().getDataPoints(0); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag1", point.getAttributes(0).getKey()); - assertEquals("value1", point.getAttributes(0).getValue().getStringValue()); + metric = metricByName(result, "test.domain.group1.counter"); + + assertTrue(metric.isDeltaTemporality()); + assertEquals(1, metric.dataPointsCount()); + assertEquals(1, metric.attributesCount()); + assertEquals(Collections.singletonMap("tag1", "value1"), metric.attributes()); + } + + private static SinglePointMetric metricByName(List metrics, String name) { + return metrics.stream() + .filter(metric -> metric.key().name().equals(name)) + .findFirst() + .orElseThrow(); } private MetricsReporter getTestMetricsReporter() { @@ -660,13 +622,4 @@ public void configure(Map configs) { } }; } - - public static Map getTags(List attributes) { - return attributes.stream() - .filter(attr -> attr.getValue().hasStringValue()) - .collect(Collectors.toMap( - KeyValue::getKey, - attr -> attr.getValue().getStringValue() - )); - } -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/SinglePointMetricTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/SinglePointMetricTest.java index 464d6c3afeba7..033a907a22d8a 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/SinglePointMetricTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/SinglePointMetricTest.java @@ -24,10 +24,6 @@ import java.util.HashMap; import java.util.Map; -import io.opentelemetry.proto.metrics.v1.AggregationTemporality; -import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.NumberDataPoint; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -38,8 +34,8 @@ public class SinglePointMetricTest { private Instant now; /* - Test compares the metric representation from returned builder to ensure that the metric is - constructed correctly. + Test compares the metric representation from the constructed SinglePointMetric to ensure that the + metric is constructed correctly. For example: Gauge metric with name "name" and double value 1.0 at certain time is represented as: @@ -61,53 +57,42 @@ public void setUp() { @Test public void testGaugeWithNumberValue() { SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now, Collections.emptySet()); - MetricKey metricKey = gaugeNumber.key(); - assertEquals("name", metricKey.name()); - - Metric metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - - NumberDataPoint point = metric.getGauge().getDataPoints(0); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano()); - assertEquals(0, point.getStartTimeUnixNano()); - assertEquals(1, point.getAsInt()); - assertEquals(0, point.getAttributesCount()); + assertEquals("name", gaugeNumber.key().name()); + + assertTrue(gaugeNumber.hasGauge()); + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), gaugeNumber.timeUnixNano()); + assertEquals(0, gaugeNumber.startTimeUnixNano()); + assertEquals(1, gaugeNumber.longValue()); + assertEquals(0, gaugeNumber.attributesCount()); } @Test public void testGaugeWithDoubleValue() { SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now, Collections.emptySet()); - MetricKey metricKey = gaugeNumber.key(); - assertEquals("name", metricKey.name()); - - Metric metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - - NumberDataPoint point = metric.getGauge().getDataPoints(0); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano()); - assertEquals(0, point.getStartTimeUnixNano()); - assertEquals(1.0, point.getAsDouble()); - assertEquals(0, point.getAttributesCount()); + assertEquals("name", gaugeNumber.key().name()); + + assertTrue(gaugeNumber.hasGauge()); + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), gaugeNumber.timeUnixNano()); + assertEquals(0, gaugeNumber.startTimeUnixNano()); + assertEquals(1.0, gaugeNumber.doubleValue()); + assertEquals(0, gaugeNumber.attributesCount()); } @Test public void testGaugeWithMetricTags() { MetricKey metricKey = new MetricKey("name", Collections.singletonMap("tag", "value")); SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now, Collections.emptySet()); - - MetricKey key = gaugeNumber.key(); - assertEquals("name", key.name()); - - Metric metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - - NumberDataPoint point = metric.getGauge().getDataPoints(0); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano()); - assertEquals(0, point.getStartTimeUnixNano()); - assertEquals(1.0, point.getAsDouble()); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag", point.getAttributes(0).getKey()); - assertEquals("value", point.getAttributes(0).getValue().getStringValue()); + assertEquals("name", gaugeNumber.key().name()); + + assertTrue(gaugeNumber.hasGauge()); + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), gaugeNumber.timeUnixNano()); + assertEquals(0, gaugeNumber.startTimeUnixNano()); + assertEquals(1.0, gaugeNumber.doubleValue()); + assertEquals(1, gaugeNumber.attributesCount()); + assertEquals(Collections.singletonMap("tag", "value"), gaugeNumber.attributes()); } @Test @@ -118,30 +103,17 @@ public void testGaugeNumberWithExcludeLabels() { MetricKey metricKey = new MetricKey("name", tags); SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now, Collections.singleton("random")); - Metric metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - NumberDataPoint point = metric.getGauge().getDataPoints(0); - assertEquals(2, point.getAttributesCount()); - for (int i = 0; i < point.getAttributesCount(); i++) { - assertTrue( - point.getAttributes(i).getKey().equals("tag1") || point.getAttributes(i).getKey().equals("tag2")); - assertTrue( - point.getAttributes(i).getValue().getStringValue().equals("value1") || point.getAttributes(i).getValue().getStringValue().equals("value2")); - } + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(2, gaugeNumber.attributesCount()); + assertEquals(Map.of("tag1", "value1", "tag2", "value2"), gaugeNumber.attributes()); gaugeNumber = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now, Collections.singleton("tag1")); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - point = metric.getGauge().getDataPoints(0); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag2", point.getAttributes(0).getKey()); - assertEquals("value2", point.getAttributes(0).getValue().getStringValue()); + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(Collections.singletonMap("tag2", "value2"), gaugeNumber.attributes()); gaugeNumber = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now, tags.keySet()); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - point = metric.getGauge().getDataPoints(0); - assertEquals(0, point.getAttributesCount()); + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(0, gaugeNumber.attributesCount()); } @Test @@ -152,71 +124,49 @@ public void testGaugeDoubleWithExcludeLabels() { MetricKey metricKey = new MetricKey("name", tags); SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now, Collections.singleton("random")); - Metric metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - NumberDataPoint point = metric.getGauge().getDataPoints(0); - assertEquals(2, point.getAttributesCount()); - for (int i = 0; i < point.getAttributesCount(); i++) { - assertTrue( - point.getAttributes(i).getKey().equals("tag1") || point.getAttributes(i).getKey().equals("tag2")); - assertTrue( - point.getAttributes(i).getValue().getStringValue().equals("value1") || point.getAttributes(i).getValue().getStringValue().equals("value2")); - } + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(2, gaugeNumber.attributesCount()); + assertEquals(Map.of("tag1", "value1", "tag2", "value2"), gaugeNumber.attributes()); gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now, Collections.singleton("tag1")); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - point = metric.getGauge().getDataPoints(0); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag2", point.getAttributes(0).getKey()); - assertEquals("value2", point.getAttributes(0).getValue().getStringValue()); + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(Collections.singletonMap("tag2", "value2"), gaugeNumber.attributes()); gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now, tags.keySet()); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getGauge().getDataPointsCount()); - point = metric.getGauge().getDataPoints(0); - assertEquals(0, point.getAttributesCount()); + assertEquals(1, gaugeNumber.dataPointsCount()); + assertEquals(0, gaugeNumber.attributesCount()); } @Test public void testSum() { SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, false, now, null, Collections.emptySet()); - - MetricKey key = sum.key(); - assertEquals("name", key.name()); - - Metric metric = sum.builder().build(); - assertFalse(metric.getSum().getIsMonotonic()); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality()); - assertEquals(1, metric.getSum().getDataPointsCount()); - - NumberDataPoint point = metric.getSum().getDataPoints(0); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano()); - assertEquals(0, point.getStartTimeUnixNano()); - assertEquals(1.0, point.getAsDouble()); - assertEquals(0, point.getAttributesCount()); + assertEquals("name", sum.key().name()); + + assertTrue(sum.hasSum()); + assertFalse(sum.isMonotonic()); + assertFalse(sum.isDeltaTemporality()); + assertEquals(1, sum.dataPointsCount()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), sum.timeUnixNano()); + assertEquals(0, sum.startTimeUnixNano()); + assertEquals(1.0, sum.doubleValue()); + assertEquals(0, sum.attributesCount()); } @Test public void testSumWithStartTimeAndTags() { MetricKey metricKey = new MetricKey("name", Collections.singletonMap("tag", "value")); SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now, now, Collections.emptySet()); - - MetricKey key = sum.key(); - assertEquals("name", key.name()); - - Metric metric = sum.builder().build(); - assertTrue(metric.getSum().getIsMonotonic()); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality()); - assertEquals(1, metric.getSum().getDataPointsCount()); - - NumberDataPoint point = metric.getSum().getDataPoints(0); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano()); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getStartTimeUnixNano()); - assertEquals(1.0, point.getAsDouble()); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag", point.getAttributes(0).getKey()); - assertEquals("value", point.getAttributes(0).getValue().getStringValue()); + assertEquals("name", sum.key().name()); + + assertTrue(sum.hasSum()); + assertTrue(sum.isMonotonic()); + assertFalse(sum.isDeltaTemporality()); + assertEquals(1, sum.dataPointsCount()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), sum.timeUnixNano()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), sum.startTimeUnixNano()); + assertEquals(1.0, sum.doubleValue()); + assertEquals(1, sum.attributesCount()); + assertEquals(Collections.singletonMap("tag", "value"), sum.attributes()); } @Test @@ -226,50 +176,33 @@ public void testSumWithExcludeLabels() { tags.put("tag2", "value2"); MetricKey metricKey = new MetricKey("name", tags); - SinglePointMetric gaugeNumber = SinglePointMetric.sum(metricKey, 1.0, true, now, Collections.singleton("random")); - Metric metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getSum().getDataPointsCount()); - NumberDataPoint point = metric.getSum().getDataPoints(0); - assertEquals(2, point.getAttributesCount()); - for (int i = 0; i < point.getAttributesCount(); i++) { - assertTrue( - point.getAttributes(i).getKey().equals("tag1") || point.getAttributes(i).getKey().equals("tag2")); - assertTrue( - point.getAttributes(i).getValue().getStringValue().equals("value1") || point.getAttributes(i).getValue().getStringValue().equals("value2")); - } - - gaugeNumber = SinglePointMetric.sum(metricKey, 1.0, true, now, Collections.singleton("tag1")); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getSum().getDataPointsCount()); - point = metric.getSum().getDataPoints(0); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag2", point.getAttributes(0).getKey()); - assertEquals("value2", point.getAttributes(0).getValue().getStringValue()); - - gaugeNumber = SinglePointMetric.sum(metricKey, 1.0, true, now, tags.keySet()); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getSum().getDataPointsCount()); - point = metric.getSum().getDataPoints(0); - assertEquals(0, point.getAttributesCount()); + SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now, Collections.singleton("random")); + assertEquals(1, sum.dataPointsCount()); + assertEquals(2, sum.attributesCount()); + assertEquals(Map.of("tag1", "value1", "tag2", "value2"), sum.attributes()); + + sum = SinglePointMetric.sum(metricKey, 1.0, true, now, Collections.singleton("tag1")); + assertEquals(1, sum.dataPointsCount()); + assertEquals(Collections.singletonMap("tag2", "value2"), sum.attributes()); + + sum = SinglePointMetric.sum(metricKey, 1.0, true, now, tags.keySet()); + assertEquals(1, sum.dataPointsCount()); + assertEquals(0, sum.attributesCount()); } @Test public void testDeltaSum() { SinglePointMetric sum = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now, Collections.emptySet()); - - MetricKey key = sum.key(); - assertEquals("name", key.name()); - - Metric metric = sum.builder().build(); - assertTrue(metric.getSum().getIsMonotonic()); - assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, metric.getSum().getAggregationTemporality()); - assertEquals(1, metric.getSum().getDataPointsCount()); - - NumberDataPoint point = metric.getSum().getDataPoints(0); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano()); - assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getStartTimeUnixNano()); - assertEquals(1.0, point.getAsDouble()); - assertEquals(0, point.getAttributesCount()); + assertEquals("name", sum.key().name()); + + assertTrue(sum.hasSum()); + assertTrue(sum.isMonotonic()); + assertTrue(sum.isDeltaTemporality()); + assertEquals(1, sum.dataPointsCount()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), sum.timeUnixNano()); + assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), sum.startTimeUnixNano()); + assertEquals(1.0, sum.doubleValue()); + assertEquals(0, sum.attributesCount()); } @Test @@ -279,30 +212,17 @@ public void testDeltaSumWithExcludeLabels() { tags.put("tag2", "value2"); MetricKey metricKey = new MetricKey("name", tags); - SinglePointMetric gaugeNumber = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now, Collections.singleton("random")); - Metric metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getSum().getDataPointsCount()); - NumberDataPoint point = metric.getSum().getDataPoints(0); - assertEquals(2, point.getAttributesCount()); - for (int i = 0; i < point.getAttributesCount(); i++) { - assertTrue( - point.getAttributes(i).getKey().equals("tag1") || point.getAttributes(i).getKey().equals("tag2")); - assertTrue( - point.getAttributes(i).getValue().getStringValue().equals("value1") || point.getAttributes(i).getValue().getStringValue().equals("value2")); - } - - gaugeNumber = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now, Collections.singleton("tag1")); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getSum().getDataPointsCount()); - point = metric.getSum().getDataPoints(0); - assertEquals(1, point.getAttributesCount()); - assertEquals("tag2", point.getAttributes(0).getKey()); - assertEquals("value2", point.getAttributes(0).getValue().getStringValue()); - - gaugeNumber = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now, tags.keySet()); - metric = gaugeNumber.builder().build(); - assertEquals(1, metric.getSum().getDataPointsCount()); - point = metric.getSum().getDataPoints(0); - assertEquals(0, point.getAttributesCount()); + SinglePointMetric sum = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now, Collections.singleton("random")); + assertEquals(1, sum.dataPointsCount()); + assertEquals(2, sum.attributesCount()); + assertEquals(Map.of("tag1", "value1", "tag2", "value2"), sum.attributes()); + + sum = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now, Collections.singleton("tag1")); + assertEquals(1, sum.dataPointsCount()); + assertEquals(Collections.singletonMap("tag2", "value2"), sum.attributes()); + + sum = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now, tags.keySet()); + assertEquals(1, sum.dataPointsCount()); + assertEquals(0, sum.attributesCount()); } } From 9d5cebe7e2441d44209d78239df20bfcbb5f5e4f Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 29 Jun 2026 21:29:57 +0800 Subject: [PATCH 2/5] addressed by comment --- .../internals/ClientTelemetryReporter.java | 2 +- .../internals/ClientTelemetryUtils.java | 19 ++++++++++++++----- .../requests/PushTelemetryRequestTest.java | 10 +++++----- .../internals/ClientTelemetryUtilsTest.java | 18 +++++++++--------- 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index 4bb3da49ae62d..554fb40563e9b 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -725,7 +725,7 @@ private Optional> createPushRequest(ClientTelemetrySubscription local CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { - compressedPayload = ClientTelemetryUtils.compress(ByteBuffer.wrap(payload.toByteArray()), compressionType); + compressedPayload = ClientTelemetryUtils.compress(payload.toByteArray(), compressionType); } catch (Throwable e) { // Distinguish between recoverable errors (NoClassDefFoundError for missing compression libs) // and fatal errors (OutOfMemoryError, etc.) that should terminate telemetry. diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java index 9e1478b17c4fa..43f643d0bbc5d 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.record.internal.CompressionType; import org.apache.kafka.common.record.internal.RecordBatch; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.internals.BufferSupplier; import org.slf4j.Logger; @@ -207,18 +206,28 @@ public static CompressionType preferredCompressionType(List acc .orElse(CompressionType.NONE); } - public static ByteBuffer compress(ByteBuffer serializedMetrics, CompressionType compressionType) throws IOException { + public static ByteBuffer compress(byte[] serializedMetrics, CompressionType compressionType) throws IOException { try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { Compression compression = Compression.of(compressionType).build(); try (OutputStream out = compression.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { - out.write(Utils.toArray(serializedMetrics.duplicate())); + out.write(serializedMetrics); } compressedOut.buffer().flip(); return compressedOut.buffer(); } } - public static ByteBuffer serializeMetricsData(List metrics) { + /** + * Assembles and compresses a {@code MetricsData} payload for the given metrics. The io.opentelemetry + * types are relocated in the shaded clients jar, so keeping this assembly in main code lets test code + * build and compress a payload without referencing io.opentelemetry directly. The layout mirrors the + * payload produced by {@link ClientTelemetryReporter} (one resource metric per single point metric). + * + * @param metrics the metrics to assemble into a {@code MetricsData} payload + * @param compressionType the compression to apply + * @return the compressed {@code MetricsData} payload + */ + public static ByteBuffer compressMetrics(List metrics, CompressionType compressionType) throws IOException { MetricsData.Builder builder = MetricsData.newBuilder(); for (SinglePointMetric metric : metrics) { builder.addResourceMetrics(ResourceMetrics.newBuilder() @@ -228,7 +237,7 @@ public static ByteBuffer serializeMetricsData(List metrics) { .build()) .build()); } - return ByteBuffer.wrap(builder.build().toByteArray()); + return compress(builder.build().toByteArray(), compressionType); } public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compressionType, int maxDecompressedBytes) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java index 5523148b856f9..3816e8ba714ea 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java @@ -53,9 +53,9 @@ public void testGetErrorResponse() { @ParameterizedTest @EnumSource(CompressionType.class) public void testMetricsDataCompression(CompressionType compressionType) throws IOException { - ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); - byte[] raw = Utils.toArray(serialized.duplicate()); - PushTelemetryRequest req = getPushTelemetryRequest(serialized, raw, compressionType); + List metrics = sampleMetrics(); + byte[] raw = Utils.toArray(ClientTelemetryUtils.compressMetrics(metrics, CompressionType.NONE)); + PushTelemetryRequest req = getPushTelemetryRequest(metrics, raw, compressionType); ByteBuffer receivedMetricsBuffer = req.metricsData(1024 * 1024); assertNotNull(receivedMetricsBuffer); @@ -63,8 +63,8 @@ public void testMetricsDataCompression(CompressionType compressionType) throws I assertArrayEquals(raw, Utils.toArray(receivedMetricsBuffer)); } - private PushTelemetryRequest getPushTelemetryRequest(ByteBuffer serializedMetrics, byte[] raw, CompressionType compressionType) throws IOException { - ByteBuffer compressedData = ClientTelemetryUtils.compress(serializedMetrics, compressionType); + private PushTelemetryRequest getPushTelemetryRequest(List metrics, byte[] raw, CompressionType compressionType) throws IOException { + ByteBuffer compressedData = ClientTelemetryUtils.compressMetrics(metrics, compressionType); if (compressionType != CompressionType.NONE) { assertTrue(compressedData.limit() < raw.length); } else { diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java index 21bd86110b030..90ec715780a6c 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java @@ -140,9 +140,9 @@ public void testPreferredCompressionType() { @ParameterizedTest @EnumSource(CompressionType.class) public void testCompressDecompress(CompressionType compressionType) throws IOException { - ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); - byte[] raw = Utils.toArray(serialized.duplicate()); - ByteBuffer compressed = ClientTelemetryUtils.compress(serialized, compressionType); + List metrics = sampleMetrics(); + byte[] raw = Utils.toArray(ClientTelemetryUtils.compressMetrics(metrics, CompressionType.NONE)); + ByteBuffer compressed = ClientTelemetryUtils.compressMetrics(metrics, compressionType); assertNotNull(compressed); if (compressionType != CompressionType.NONE) { assertTrue(compressed.limit() < raw.length); @@ -172,9 +172,9 @@ private List sampleMetrics() { @Test public void testDecompressExceedingMaxSizeThrows() throws IOException { // Compress a payload then verify decompression with a small limit throws. - ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); - byte[] raw = Utils.toArray(serialized.duplicate()); - ByteBuffer compressed = ClientTelemetryUtils.compress(serialized, CompressionType.GZIP); + List metrics = sampleMetrics(); + byte[] raw = Utils.toArray(ClientTelemetryUtils.compressMetrics(metrics, CompressionType.NONE)); + ByteBuffer compressed = ClientTelemetryUtils.compressMetrics(metrics, CompressionType.GZIP); // Set limit smaller than the actual decompressed size int smallLimit = raw.length - 1; @@ -185,9 +185,9 @@ public void testDecompressExceedingMaxSizeThrows() throws IOException { @Test public void testDecompressWithPayloadSizeSucceeds() throws IOException { - ByteBuffer serialized = ClientTelemetryUtils.serializeMetricsData(sampleMetrics()); - byte[] raw = Utils.toArray(serialized.duplicate()); - ByteBuffer compressed = ClientTelemetryUtils.compress(serialized, CompressionType.GZIP); + List metrics = sampleMetrics(); + byte[] raw = Utils.toArray(ClientTelemetryUtils.compressMetrics(metrics, CompressionType.NONE)); + ByteBuffer compressed = ClientTelemetryUtils.compressMetrics(metrics, CompressionType.GZIP); // Set limit to exact limit prior compression. ByteBuffer result = ClientTelemetryUtils.decompress(compressed, CompressionType.GZIP, raw.length); From 199356c421b5a229a78a0ca7688761f797c8fb13 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 30 Jun 2026 01:10:27 +0800 Subject: [PATCH 3/5] fix fail test --- .../kafka/common/utils/internals/AppInfoParserTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/utils/internals/AppInfoParserTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/AppInfoParserTest.java index 8cc5d204307d1..c5f4a5fb8653e 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/internals/AppInfoParserTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/AppInfoParserTest.java @@ -38,8 +38,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class AppInfoParserTest { - private static final String EXPECTED_COMMIT_VERSION = AppInfoParser.DEFAULT_VALUE; - private static final String EXPECTED_VERSION = AppInfoParser.DEFAULT_VALUE; + private static final String EXPECTED_COMMIT_VERSION = AppInfoParser.getCommitId(); + private static final String EXPECTED_VERSION = AppInfoParser.getVersion(); private static final Long EXPECTED_START_MS = 1552313875722L; private static final String METRICS_PREFIX = "app-info-test"; private static final String METRICS_ID = "test"; From cf7473de186a680faf347d8c2920a8a8da7d1c12 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 30 Jun 2026 20:56:45 +0800 Subject: [PATCH 4/5] addressed by comment --- .../internals/ClientTelemetryReporter.java | 2 +- .../internals/ClientTelemetryUtils.java | 8 ++-- .../ClientTelemetryReporterTest.java | 44 +++++++++++++------ 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index 554fb40563e9b..cee8ff435a912 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -725,7 +725,7 @@ private Optional> createPushRequest(ClientTelemetrySubscription local CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { - compressedPayload = ClientTelemetryUtils.compress(payload.toByteArray(), compressionType); + compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); } catch (Throwable e) { // Distinguish between recoverable errors (NoClassDefFoundError for missing compression libs) // and fatal errors (OutOfMemoryError, etc.) that should terminate telemetry. diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java index 43f643d0bbc5d..398a930875458 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java @@ -206,11 +206,13 @@ public static CompressionType preferredCompressionType(List acc .orElse(CompressionType.NONE); } - public static ByteBuffer compress(byte[] serializedMetrics, CompressionType compressionType) throws IOException { + public static ByteBuffer compress(MetricsData metricsData, CompressionType compressionType) throws IOException { try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { Compression compression = Compression.of(compressionType).build(); try (OutputStream out = compression.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { - out.write(serializedMetrics); + // Serialize the metrics straight into the compression stream to avoid the intermediate + // byte[] copy that MetricsData#toByteArray would allocate. + metricsData.writeTo(out); } compressedOut.buffer().flip(); return compressedOut.buffer(); @@ -237,7 +239,7 @@ public static ByteBuffer compressMetrics(List metrics, Compre .build()) .build()); } - return compress(builder.build().toByteArray(), compressionType); + return compress(builder.build(), compressionType); } public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compressionType, int maxDecompressedBytes) { diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index 192971a820605..d293dec79507a 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.NetworkException; @@ -52,6 +53,7 @@ import org.mockito.internal.stubbing.answers.CallsRealMethods; import java.io.IOException; +import java.io.OutputStream; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -70,7 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.anyByte; public class ClientTelemetryReporterTest { @@ -360,7 +362,7 @@ public void testCreateRequestPushCompression(CompressionType compressionType) { } @Test - public void testCreateRequestPushCompressionException() { + public void testCreateRequestPushCompressionException() throws IOException { clientTelemetryReporter.configure(configs); clientTelemetryReporter.contextChange(metricsContext); @@ -372,8 +374,9 @@ public void testCreateRequestPushCompressionException() { uuid, 1234, 20000, Collections.singletonList(CompressionType.GZIP), true, null); telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); - try (MockedStatic mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) { - mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), any())).thenThrow(new IOException()); + Compression.Builder failingCompression = compressionFailingWithIOException(); + try (MockedStatic mockedCompression = Mockito.mockStatic(Compression.class, new CallsRealMethods())) { + mockedCompression.when(() -> Compression.of(CompressionType.GZIP)).thenReturn(failingCompression); Optional> requestOptional = telemetrySender.createRequest(); assertNotNull(requestOptional); @@ -403,9 +406,9 @@ public void testCreateRequestPushCompressionFallbackToNextType() { uuid, 1234, 20000, List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), true, null); telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); - try (MockedStatic mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) { + try (MockedStatic mockedCompression = Mockito.mockStatic(Compression.class, new CallsRealMethods())) { // First request: GZIP fails with NoClassDefFoundError, should use NONE for this request - mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.GZIP))).thenThrow(new NoClassDefFoundError("GZIP not available")); + mockedCompression.when(() -> Compression.of(CompressionType.GZIP)).thenThrow(new NoClassDefFoundError("GZIP not available")); Optional> requestOptional = telemetrySender.createRequest(); assertNotNull(requestOptional); @@ -422,7 +425,7 @@ public void testCreateRequestPushCompressionFallbackToNextType() { // Second request: LZ4 is selected (since GZIP is now cached as unsupported), LZ4 fails, should use NONE // Note that some libraries eg. LZ4 return KafkaException with cause as NoClassDefFoundError - mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))).thenThrow(new KafkaException(new NoClassDefFoundError("LZ4 not available"))); + mockedCompression.when(() -> Compression.of(CompressionType.LZ4)).thenThrow(new KafkaException(new NoClassDefFoundError("LZ4 not available"))); requestOptional = telemetrySender.createRequest(); assertNotNull(requestOptional); @@ -438,7 +441,7 @@ public void testCreateRequestPushCompressionFallbackToNextType() { assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); // Third request: SNAPPY is selected (since GZIP and LZ4 are now cached as unsupported), SNAPPY fails, should use NONE - mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.SNAPPY))).thenThrow(new NoClassDefFoundError("SNAPPY not available")); + mockedCompression.when(() -> Compression.of(CompressionType.SNAPPY)).thenThrow(new NoClassDefFoundError("SNAPPY not available")); requestOptional = telemetrySender.createRequest(); assertNotNull(requestOptional); @@ -480,10 +483,10 @@ public void testCreateRequestPushCompressionFallbackAndTermination() { uuid, 1234, 20000, List.of(CompressionType.ZSTD, CompressionType.LZ4), true, null); telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); - try (MockedStatic mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) { - + try (MockedStatic mockedCompression = Mockito.mockStatic(Compression.class, new CallsRealMethods())) { + // === Test 1: NoClassDefFoundError fallback (recoverable) === - mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.ZSTD))) + mockedCompression.when(() -> Compression.of(CompressionType.ZSTD)) .thenThrow(new NoClassDefFoundError("com/github/luben/zstd/BufferPool")); assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state()); @@ -500,8 +503,8 @@ public void testCreateRequestPushCompressionFallbackAndTermination() { assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); // === Test 2: OutOfMemoryError causes termination (non-recoverable Error) === - mockedCompress.reset(); - mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))) + mockedCompression.reset(); + mockedCompression.when(() -> Compression.of(CompressionType.LZ4)) .thenThrow(new OutOfMemoryError("Out of memory during compression")); assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state()); @@ -517,6 +520,21 @@ public void testCreateRequestPushCompressionFallbackAndTermination() { } } + /** + * Builds a {@link Compression.Builder} whose stream raises an {@link IOException} while writing the + * payload, so that {@link ClientTelemetryUtils#compress} surfaces a recoverable compression failure. + */ + @SuppressWarnings("unchecked") + private static Compression.Builder compressionFailingWithIOException() throws IOException { + OutputStream out = Mockito.mock(OutputStream.class); + Mockito.doThrow(new IOException()).when(out).close(); + Compression compression = Mockito.mock(Compression.class); + Mockito.when(compression.wrapForOutput(any(), anyByte())).thenReturn(out); + Compression.Builder builder = Mockito.mock(Compression.Builder.class); + Mockito.doReturn(compression).when(builder).build(); + return builder; + } + @Test public void testHandleResponseGetSubscriptions() { ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); From a52e5c46dab9b246d26ef6468f990340e830da74 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 30 Jun 2026 20:57:39 +0800 Subject: [PATCH 5/5] addressed by comment --- .../common/telemetry/internals/ClientTelemetryUtils.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java index 398a930875458..6a4c90f22cec3 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java @@ -206,13 +206,11 @@ public static CompressionType preferredCompressionType(List acc .orElse(CompressionType.NONE); } - public static ByteBuffer compress(MetricsData metricsData, CompressionType compressionType) throws IOException { + public static ByteBuffer compress(MetricsData metrics, CompressionType compressionType) throws IOException { try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { Compression compression = Compression.of(compressionType).build(); try (OutputStream out = compression.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { - // Serialize the metrics straight into the compression stream to avoid the intermediate - // byte[] copy that MetricsData#toByteArray would allocate. - metricsData.writeTo(out); + metrics.writeTo(out); } compressedOut.buffer().flip(); return compressedOut.buffer();