Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package com.automq.opentelemetry;

import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;

import com.automq.opentelemetry.exporter.DelegatingMetricReader;
import com.automq.opentelemetry.exporter.MetricsExportConfig;
import com.automq.opentelemetry.exporter.MetricsExporter;
import com.automq.opentelemetry.exporter.MetricsExporterURI;
Expand All @@ -31,13 +36,14 @@
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -68,19 +74,24 @@
* This class is responsible for initializing, configuring, and managing the lifecycle of all
* telemetry components, including the OpenTelemetry SDK, metric exporters, and various metric sources.
*/
public class AutoMQTelemetryManager {
public class AutoMQTelemetryManager implements Reconfigurable {
private static final Logger LOGGER = LoggerFactory.getLogger(AutoMQTelemetryManager.class);

// Reconfigurable config keys
public static final String EXPORTER_URI_CONFIG = "s3.telemetry.metrics.exporter.uri";
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(EXPORTER_URI_CONFIG);

// Singleton instance support
private static volatile AutoMQTelemetryManager instance;
private static final Object LOCK = new Object();

private final String exporterUri;
private volatile String exporterUri;
private final String serviceName;
private final String instanceId;
private final MetricsExportConfig metricsExportConfig;
private final List<MetricReader> metricReaders = new ArrayList<>();
private DelegatingMetricReader delegatingReader;
private final List<AutoCloseable> autoCloseableList;
private final Object reconfigureLock = new Object();
private OpenTelemetrySdk openTelemetrySdk;
private YammerMetricsReporter yammerReporter;

Expand Down Expand Up @@ -199,12 +210,13 @@ private SdkMeterProvider buildMeterProvider() {

// Configure exporters from URI
MetricsExporterURI exporterURI = buildMetricsExporterURI(exporterUri, metricsExportConfig);
List<MetricReader> readers = new ArrayList<>();
for (MetricsExporter exporter : exporterURI.getMetricsExporters()) {
MetricReader reader = exporter.asMetricReader();
metricReaders.add(reader);
SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(meterProviderBuilder, reader,
instrumentType -> metricCardinalityLimit);
readers.add(exporter.asMetricReader());
}
this.delegatingReader = new DelegatingMetricReader(readers);
SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(meterProviderBuilder, delegatingReader,
instrumentType -> metricCardinalityLimit);

return meterProviderBuilder.build();
}
Expand Down Expand Up @@ -286,19 +298,40 @@ public void shutdown() {
LOGGER.error("Failed to close auto closeable", e);
}
});
metricReaders.forEach(metricReader -> {
metricReader.forceFlush();
try {
metricReader.close();
} catch (IOException e) {
LOGGER.error("Failed to close metric reader", e);
}
});
if (openTelemetrySdk != null) {
openTelemetrySdk.close();
}
}

public void reconfigure(String newExporterUri) {
synchronized (reconfigureLock) {
if (newExporterUri == null || newExporterUri.equals(this.exporterUri)) {
LOGGER.debug("Exporter URI unchanged, skipping reconfiguration");
return;
}
LOGGER.info("Reconfiguring metrics exporter from {} to {}", this.exporterUri, newExporterUri);
this.exporterUri = newExporterUri;
try {
MetricsExporterURI exporterURIParsed = buildMetricsExporterURI(newExporterUri, metricsExportConfig);
List<MetricsExporter> newExporters = exporterURIParsed.getMetricsExporters();
List<MetricReader> newReaders = new ArrayList<>();
for (MetricsExporter exporter : newExporters) {
newReaders.add(exporter.asMetricReader());
}
if (this.delegatingReader != null) {
this.delegatingReader.setDelegates(newReaders);
}
LOGGER.info("Metrics exporter reconfiguration completed successfully");
} catch (Exception e) {
LOGGER.error("Failed to reconfigure metrics exporter with URI: {}", newExporterUri, e);
}
}
}

public String getExporterUri() {
return this.exporterUri;
}

/**
* get YammerMetricsReporter instance.
*
Expand Down Expand Up @@ -327,4 +360,43 @@ public Meter getMeter() {
}
return this.openTelemetrySdk.getMeter(TelemetryConstants.TELEMETRY_SCOPE_NAME);
}

@Override
public Set<String> reconfigurableConfigs() {
return RECONFIGURABLE_CONFIGS;
}

@Override
public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
if (configs.containsKey(EXPORTER_URI_CONFIG)) {
Object value = configs.get(EXPORTER_URI_CONFIG);
String uri = value instanceof Password ? ((Password) value).value()
: (value != null ? value.toString() : null);
if (uri != null && !uri.isBlank()) {
try {
MetricsExporterURI.parse(uri, metricsExportConfig);
} catch (Exception e) {
throw new ConfigException(EXPORTER_URI_CONFIG, uri, "Invalid exporter URI: " + e.getMessage());
}
}
}
}

@Override
public void reconfigure(Map<String, ?> configs) {
if (configs.containsKey(EXPORTER_URI_CONFIG)) {
Object value = configs.get(EXPORTER_URI_CONFIG);
String newUri;
if (value instanceof Password) {
newUri = ((Password) value).value();
} else {
newUri = value != null ? value.toString() : null;
}
reconfigure(newUri);
}
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.opentelemetry.exporter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
import io.opentelemetry.sdk.metrics.export.MetricReader;

public class DelegatingMetricReader implements MetricReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DelegatingMetricReader.class);
private static final long SHUTDOWN_TIMEOUT_SECONDS = 10;

private volatile List<MetricReader> delegates;
private volatile CollectionRegistration collectionRegistration;
private final Object lock = new Object();

public DelegatingMetricReader(List<MetricReader> initialDelegates) {
this.delegates = new ArrayList<>(initialDelegates);
}

public void setDelegates(List<MetricReader> newDelegates) {
synchronized (lock) {
List<MetricReader> oldDelegates = this.delegates;
if (collectionRegistration != null) {
for (MetricReader reader : newDelegates) {
reader.register(collectionRegistration);
}
}
this.delegates = new ArrayList<>(newDelegates);
if (oldDelegates != null && !oldDelegates.isEmpty()) {
List<CompletableResultCode> shutdownResults = new ArrayList<>();
for (MetricReader oldDelegate : oldDelegates) {
try {
shutdownResults.add(oldDelegate.shutdown());
} catch (Exception e) {
LOGGER.warn("Error shutting down old delegate", e);
}
}
CompletableResultCode.ofAll(shutdownResults)
.join(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}
}

public List<MetricReader> getDelegates() {
return Collections.unmodifiableList(delegates);
}

@Override
public void register(CollectionRegistration registration) {
synchronized (lock) {
this.collectionRegistration = registration;
for (MetricReader delegate : delegates) {
delegate.register(registration);
}
}
}

@Override
public CompletableResultCode forceFlush() {
List<MetricReader> current = delegates;
if (current.isEmpty()) {
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> codes = new ArrayList<>();
for (MetricReader reader : current) {
codes.add(reader.forceFlush());
}
return CompletableResultCode.ofAll(codes);
}

@Override
public CompletableResultCode shutdown() {
synchronized (lock) {
List<MetricReader> current = delegates;
if (current.isEmpty()) {
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> codes = new ArrayList<>();
for (MetricReader reader : current) {
codes.add(reader.shutdown());
}
return CompletableResultCode.ofAll(codes);
}
}

@Override
public void close() throws IOException {
shutdown().join(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
List<MetricReader> current = delegates;
if (!current.isEmpty()) {
// should be consistent for all the MetricReader
return current.get(0).getAggregationTemporality(instrumentType);
}
return AggregationTemporality.CUMULATIVE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.server

import kafka.autobalancer.config.{AutoBalancerControllerConfig, AutoBalancerMetricsReporterConfig}
import kafka.automq.backpressure.BackPressureConfig
import com.automq.opentelemetry.AutoMQTelemetryManager

import java.util
import java.util.{Collections, Properties}
Expand Down Expand Up @@ -103,7 +104,8 @@ object DynamicBrokerConfig {
DynamicRemoteLogConfig.ReconfigurableConfigs ++
AutoBalancerControllerConfig.RECONFIGURABLE_CONFIGS.asScala ++
AutoBalancerMetricsReporterConfig.RECONFIGURABLE_CONFIGS.asScala ++
BackPressureConfig.RECONFIGURABLE_CONFIGS.asScala
BackPressureConfig.RECONFIGURABLE_CONFIGS.asScala ++
AutoMQTelemetryManager.RECONFIGURABLE_CONFIGS.asScala

private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
Expand Down Expand Up @@ -275,6 +277,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
kafkaServer match {
case brokerServer: BrokerServer =>
addReconfigurable(brokerServer.backPressureManager)
if (brokerServer.sharedServer.telemetryManager != null) {
addReconfigurable(brokerServer.sharedServer.telemetryManager)
}
case _ =>
}
// AutoMQ inject end
Expand Down
37 changes: 37 additions & 0 deletions tests/kafkatest/tests/core/automq_telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,40 @@ def _logs_uploaded():
finally:
self.num_brokers = original_num_brokers
self._stop_kafka()

@cluster(num_nodes=4)
def test_dynamic_prometheus_exporter_reconfiguration(self):
"""Verify that the Prometheus metrics exporter can be dynamically reconfigured to a new port."""
initial_port = 9464
new_port = 9465

server_overrides = [
["s3.telemetry.metrics.exporter.uri", f"prometheus://0.0.0.0:{initial_port}"]
]

self._start_kafka(server_overrides=server_overrides)

try:
self._wait_for_metrics_available(port=initial_port)

for node in self.kafka.nodes:
output = self._fetch_metrics(node, port=initial_port)
self._assert_prometheus_metrics(output)
self.logger.info("Verified metrics on initial port %d", initial_port)

self.logger.info("Reconfiguring telemetry exporter URI to port %d", new_port)
config_command = (
f"{self.kafka.path.script('kafka-configs.sh', self.kafka.nodes[0])} "
f"--bootstrap-server {self.kafka.bootstrap_servers()} "
f"--entity-type brokers --entity-default "
f"--alter --add-config s3.telemetry.metrics.exporter.uri=prometheus://0.0.0.0:{new_port}"
)
self.kafka.nodes[0].account.ssh(config_command)
self._wait_for_metrics_available(port=new_port, timeout_sec=60)
for node in self.kafka.nodes:
output = self._fetch_metrics(node, port=new_port)
self._assert_prometheus_metrics(output)
self.logger.info("Verified metrics on new port %d after dynamic reconfiguration", new_port)

finally:
self._stop_kafka()
Loading