diff --git a/automq-metrics/src/main/java/com/automq/opentelemetry/AutoMQTelemetryManager.java b/automq-metrics/src/main/java/com/automq/opentelemetry/AutoMQTelemetryManager.java index 38beb126e3..905591b781 100644 --- a/automq-metrics/src/main/java/com/automq/opentelemetry/AutoMQTelemetryManager.java +++ b/automq-metrics/src/main/java/com/automq/opentelemetry/AutoMQTelemetryManager.java @@ -19,6 +19,11 @@ package com.automq.opentelemetry; +import org.apache.kafka.common.Reconfigurable; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; + +import com.automq.opentelemetry.exporter.DelegatingMetricReader; import com.automq.opentelemetry.exporter.MetricsExportConfig; import com.automq.opentelemetry.exporter.MetricsExporter; import com.automq.opentelemetry.exporter.MetricsExporterURI; @@ -31,13 +36,14 @@ import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; -import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,19 +74,24 @@ * This class is responsible for initializing, configuring, and managing the lifecycle of all * telemetry components, including the OpenTelemetry SDK, metric exporters, and various metric sources. */ -public class AutoMQTelemetryManager { +public class AutoMQTelemetryManager implements Reconfigurable { private static final Logger LOGGER = LoggerFactory.getLogger(AutoMQTelemetryManager.class); + // Reconfigurable config keys + public static final String EXPORTER_URI_CONFIG = "s3.telemetry.metrics.exporter.uri"; + public static final Set RECONFIGURABLE_CONFIGS = Set.of(EXPORTER_URI_CONFIG); + // Singleton instance support private static volatile AutoMQTelemetryManager instance; private static final Object LOCK = new Object(); - private final String exporterUri; + private volatile String exporterUri; private final String serviceName; private final String instanceId; private final MetricsExportConfig metricsExportConfig; - private final List metricReaders = new ArrayList<>(); + private DelegatingMetricReader delegatingReader; private final List autoCloseableList; + private final Object reconfigureLock = new Object(); private OpenTelemetrySdk openTelemetrySdk; private YammerMetricsReporter yammerReporter; @@ -199,12 +210,13 @@ private SdkMeterProvider buildMeterProvider() { // Configure exporters from URI MetricsExporterURI exporterURI = buildMetricsExporterURI(exporterUri, metricsExportConfig); + List readers = new ArrayList<>(); for (MetricsExporter exporter : exporterURI.getMetricsExporters()) { - MetricReader reader = exporter.asMetricReader(); - metricReaders.add(reader); - SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(meterProviderBuilder, reader, - instrumentType -> metricCardinalityLimit); + readers.add(exporter.asMetricReader()); } + this.delegatingReader = new DelegatingMetricReader(readers); + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(meterProviderBuilder, delegatingReader, + instrumentType -> metricCardinalityLimit); return meterProviderBuilder.build(); } @@ -286,19 +298,40 @@ public void shutdown() { LOGGER.error("Failed to close auto closeable", e); } }); - metricReaders.forEach(metricReader -> { - metricReader.forceFlush(); - try { - metricReader.close(); - } catch (IOException e) { - LOGGER.error("Failed to close metric reader", e); - } - }); if (openTelemetrySdk != null) { openTelemetrySdk.close(); } } + public void reconfigure(String newExporterUri) { + synchronized (reconfigureLock) { + if (newExporterUri == null || newExporterUri.equals(this.exporterUri)) { + LOGGER.debug("Exporter URI unchanged, skipping reconfiguration"); + return; + } + LOGGER.info("Reconfiguring metrics exporter from {} to {}", this.exporterUri, newExporterUri); + this.exporterUri = newExporterUri; + try { + MetricsExporterURI exporterURIParsed = buildMetricsExporterURI(newExporterUri, metricsExportConfig); + List newExporters = exporterURIParsed.getMetricsExporters(); + List newReaders = new ArrayList<>(); + for (MetricsExporter exporter : newExporters) { + newReaders.add(exporter.asMetricReader()); + } + if (this.delegatingReader != null) { + this.delegatingReader.setDelegates(newReaders); + } + LOGGER.info("Metrics exporter reconfiguration completed successfully"); + } catch (Exception e) { + LOGGER.error("Failed to reconfigure metrics exporter with URI: {}", newExporterUri, e); + } + } + } + + public String getExporterUri() { + return this.exporterUri; + } + /** * get YammerMetricsReporter instance. * @@ -327,4 +360,43 @@ public Meter getMeter() { } return this.openTelemetrySdk.getMeter(TelemetryConstants.TELEMETRY_SCOPE_NAME); } + + @Override + public Set reconfigurableConfigs() { + return RECONFIGURABLE_CONFIGS; + } + + @Override + public void validateReconfiguration(Map configs) throws ConfigException { + if (configs.containsKey(EXPORTER_URI_CONFIG)) { + Object value = configs.get(EXPORTER_URI_CONFIG); + String uri = value instanceof Password ? ((Password) value).value() + : (value != null ? value.toString() : null); + if (uri != null && !uri.isBlank()) { + try { + MetricsExporterURI.parse(uri, metricsExportConfig); + } catch (Exception e) { + throw new ConfigException(EXPORTER_URI_CONFIG, uri, "Invalid exporter URI: " + e.getMessage()); + } + } + } + } + + @Override + public void reconfigure(Map configs) { + if (configs.containsKey(EXPORTER_URI_CONFIG)) { + Object value = configs.get(EXPORTER_URI_CONFIG); + String newUri; + if (value instanceof Password) { + newUri = ((Password) value).value(); + } else { + newUri = value != null ? value.toString() : null; + } + reconfigure(newUri); + } + } + + @Override + public void configure(Map configs) { + } } diff --git a/automq-metrics/src/main/java/com/automq/opentelemetry/exporter/DelegatingMetricReader.java b/automq-metrics/src/main/java/com/automq/opentelemetry/exporter/DelegatingMetricReader.java new file mode 100644 index 0000000000..6ef6bfe727 --- /dev/null +++ b/automq-metrics/src/main/java/com/automq/opentelemetry/exporter/DelegatingMetricReader.java @@ -0,0 +1,129 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.opentelemetry.exporter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.CollectionRegistration; +import io.opentelemetry.sdk.metrics.export.MetricReader; + +public class DelegatingMetricReader implements MetricReader { + private static final Logger LOGGER = LoggerFactory.getLogger(DelegatingMetricReader.class); + private static final long SHUTDOWN_TIMEOUT_SECONDS = 10; + + private volatile List delegates; + private volatile CollectionRegistration collectionRegistration; + private final Object lock = new Object(); + + public DelegatingMetricReader(List initialDelegates) { + this.delegates = new ArrayList<>(initialDelegates); + } + + public void setDelegates(List newDelegates) { + synchronized (lock) { + List oldDelegates = this.delegates; + if (collectionRegistration != null) { + for (MetricReader reader : newDelegates) { + reader.register(collectionRegistration); + } + } + this.delegates = new ArrayList<>(newDelegates); + if (oldDelegates != null && !oldDelegates.isEmpty()) { + List shutdownResults = new ArrayList<>(); + for (MetricReader oldDelegate : oldDelegates) { + try { + shutdownResults.add(oldDelegate.shutdown()); + } catch (Exception e) { + LOGGER.warn("Error shutting down old delegate", e); + } + } + CompletableResultCode.ofAll(shutdownResults) + .join(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + } + } + + public List getDelegates() { + return Collections.unmodifiableList(delegates); + } + + @Override + public void register(CollectionRegistration registration) { + synchronized (lock) { + this.collectionRegistration = registration; + for (MetricReader delegate : delegates) { + delegate.register(registration); + } + } + } + + @Override + public CompletableResultCode forceFlush() { + List current = delegates; + if (current.isEmpty()) { + return CompletableResultCode.ofSuccess(); + } + List codes = new ArrayList<>(); + for (MetricReader reader : current) { + codes.add(reader.forceFlush()); + } + return CompletableResultCode.ofAll(codes); + } + + @Override + public CompletableResultCode shutdown() { + synchronized (lock) { + List current = delegates; + if (current.isEmpty()) { + return CompletableResultCode.ofSuccess(); + } + List codes = new ArrayList<>(); + for (MetricReader reader : current) { + codes.add(reader.shutdown()); + } + return CompletableResultCode.ofAll(codes); + } + } + + @Override + public void close() throws IOException { + shutdown().join(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + List current = delegates; + if (!current.isEmpty()) { + // should be consistent for all the MetricReader + return current.get(0).getAggregationTemporality(instrumentType); + } + return AggregationTemporality.CUMULATIVE; + } +} diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index f2b84572b2..cfb20efb3f 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -19,6 +19,7 @@ package kafka.server import kafka.autobalancer.config.{AutoBalancerControllerConfig, AutoBalancerMetricsReporterConfig} import kafka.automq.backpressure.BackPressureConfig +import com.automq.opentelemetry.AutoMQTelemetryManager import java.util import java.util.{Collections, Properties} @@ -103,7 +104,8 @@ object DynamicBrokerConfig { DynamicRemoteLogConfig.ReconfigurableConfigs ++ AutoBalancerControllerConfig.RECONFIGURABLE_CONFIGS.asScala ++ AutoBalancerMetricsReporterConfig.RECONFIGURABLE_CONFIGS.asScala ++ - BackPressureConfig.RECONFIGURABLE_CONFIGS.asScala + BackPressureConfig.RECONFIGURABLE_CONFIGS.asScala ++ + AutoMQTelemetryManager.RECONFIGURABLE_CONFIGS.asScala private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG) private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff( @@ -275,6 +277,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging kafkaServer match { case brokerServer: BrokerServer => addReconfigurable(brokerServer.backPressureManager) + if (brokerServer.sharedServer.telemetryManager != null) { + addReconfigurable(brokerServer.sharedServer.telemetryManager) + } case _ => } // AutoMQ inject end diff --git a/tests/kafkatest/tests/core/automq_telemetry_test.py b/tests/kafkatest/tests/core/automq_telemetry_test.py index 1b7ea6942d..c0bf8bf207 100644 --- a/tests/kafkatest/tests/core/automq_telemetry_test.py +++ b/tests/kafkatest/tests/core/automq_telemetry_test.py @@ -278,3 +278,40 @@ def _logs_uploaded(): finally: self.num_brokers = original_num_brokers self._stop_kafka() + + @cluster(num_nodes=4) + def test_dynamic_prometheus_exporter_reconfiguration(self): + """Verify that the Prometheus metrics exporter can be dynamically reconfigured to a new port.""" + initial_port = 9464 + new_port = 9465 + + server_overrides = [ + ["s3.telemetry.metrics.exporter.uri", f"prometheus://0.0.0.0:{initial_port}"] + ] + + self._start_kafka(server_overrides=server_overrides) + + try: + self._wait_for_metrics_available(port=initial_port) + + for node in self.kafka.nodes: + output = self._fetch_metrics(node, port=initial_port) + self._assert_prometheus_metrics(output) + self.logger.info("Verified metrics on initial port %d", initial_port) + + self.logger.info("Reconfiguring telemetry exporter URI to port %d", new_port) + config_command = ( + f"{self.kafka.path.script('kafka-configs.sh', self.kafka.nodes[0])} " + f"--bootstrap-server {self.kafka.bootstrap_servers()} " + f"--entity-type brokers --entity-default " + f"--alter --add-config s3.telemetry.metrics.exporter.uri=prometheus://0.0.0.0:{new_port}" + ) + self.kafka.nodes[0].account.ssh(config_command) + self._wait_for_metrics_available(port=new_port, timeout_sec=60) + for node in self.kafka.nodes: + output = self._fetch_metrics(node, port=new_port) + self._assert_prometheus_metrics(output) + self.logger.info("Verified metrics on new port %d after dynamic reconfiguration", new_port) + + finally: + self._stop_kafka()