Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ a data-centric view on Debezium components.
**Disclaimer**: This project is still in early development stage and should not be used in production.

## Platform Architecture
The platform is composed of two main components:
The platform is composed of the following main components:

1. Conductor: The back-end component which provides a set of APIs to orchestrate and control Debezium deployments.
2. Stage: The front-end component which provides a user interface to interact with the Conductor.
3. Monitoring: Built-in pipeline monitoring powered by OpenTelemetry and Prometheus. Debezium Server instances export metrics via OpenTelemetry to an OTel Collector, which exposes them to Prometheus.


### Conductor Architecture
Expand Down Expand Up @@ -83,6 +84,25 @@ If you are using minikube on Mac, you need also to run the `minikube tunnel` com
> **_NOTE:_**
If you are using Windows, add `127.0.0.1 platform.debezium.io` to `C:\Windows\System32\drivers\etc\hosts`.

Install the [OpenTelemetry Operator](https://github.com/open-telemetry/opentelemetry-operator) for pipeline monitoring:

```shell
helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
helm install opentelemetry-operator open-telemetry/opentelemetry-operator \
-n opentelemetry-operator-system --create-namespace \
--set admissionWebhooks.certManager.enabled=false \
--set admissionWebhooks.autoGenerateCert.enabled=true
```

> **_NOTE:_** The above command uses Helm auto-generated self-signed certificates for the operator webhooks, which is suitable for development but not for production (certificates expire after 365 days without auto-renewal). For production environments, use a proper certificate management solution such as [cert-manager](https://cert-manager.io/) or provide your own certificates. See the [OTel Operator Helm chart documentation](https://github.com/open-telemetry/opentelemetry-helm-charts/tree/main/charts/opentelemetry-operator) for all available options.

Install the [Prometheus Operator](https://github.com/prometheus-operator/prometheus-operator) for metrics scraping:

```shell
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install kube-prometheus-stack prometheus-community/kube-prometheus-stack -n monitoring --create-namespace
```

Create a dedicated namespace

```shell
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.config;

import java.util.Optional;

/**
* Configuration group for monitoring and observability features.
* <p>
* This interface defines the configuration structure for enabling and configuring
* monitoring capabilities in the Debezium Platform, including OpenTelemetry integration
* for metrics collection and export.
* </p>
*
* @author Debezium Authors
*/
public interface MonitoringConfigGroup {
Comment thread
Naros marked this conversation as resolved.

/**
* Returns the OpenTelemetry configuration group.
*
* @return the OpenTelemetry configuration settings
*/
OtelConfigGroup otel();

/**
* Configuration group for OpenTelemetry (OTel) integration.
* <p>
* Defines settings for enabling and configuring OpenTelemetry metrics collection
* and export. When enabled, the platform will expose metrics that can be sent to an
* OpenTelemetry Collector endpoint.
* </p>
*/
interface OtelConfigGroup {
/**
* Indicates whether OpenTelemetry metrics collection is enabled.
*
* @return {@code true} if OpenTelemetry is enabled, {@code false} otherwise
*/
boolean enabled();

/**
* Returns the OpenTelemetry Collector endpoint URL.
* <p>
* This endpoint is used to export metrics to an OpenTelemetry Collector.
* </p>
*
* @return an {@link Optional} containing the endpoint URL if configured, or empty if not set
*/
Optional<String> endpoint();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public interface PipelineConfigGroup {
ServerConfigGroup server();

Map<String, String> labels();

MonitoringConfigGroup monitoring();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import io.debezium.operator.api.model.runtime.Runtime;
import io.debezium.operator.api.model.runtime.RuntimeApiBuilder;
import io.debezium.operator.api.model.runtime.RuntimeBuilder;
import io.debezium.operator.api.model.runtime.metrics.JmxExporterBuilder;
import io.debezium.operator.api.model.runtime.metrics.MetricsBuilder;
import io.debezium.operator.api.model.runtime.metrics.Metrics;
import io.debezium.operator.api.model.source.Offset;
import io.debezium.operator.api.model.source.OffsetBuilder;
import io.debezium.operator.api.model.source.SchemaHistory;
Expand Down Expand Up @@ -81,10 +80,14 @@ public class PipelineMapper {

final PipelineConfigGroup pipelineConfigGroup;
final TableNameResolver tableNameResolver;
final Metrics metrics;

public PipelineMapper(PipelineConfigGroup pipelineConfigGroup, TableNameResolver tableNameResolver) {
public PipelineMapper(PipelineConfigGroup pipelineConfigGroup,
TableNameResolver tableNameResolver,
Metrics metrics) {
this.pipelineConfigGroup = pipelineConfigGroup;
this.tableNameResolver = tableNameResolver;
this.metrics = metrics;
}

public DebeziumServer map(PipelineFlat pipeline) {
Expand Down Expand Up @@ -151,11 +154,7 @@ private Quarkus createQuarkus(PipelineFlat pipeline) {
private Runtime createRuntime() {
return new RuntimeBuilder()
.withApi(new RuntimeApiBuilder().withEnabled().build())
.withMetrics(new MetricsBuilder()
.withJmxExporter(new JmxExporterBuilder()
.withEnabled()
.build())
.build())
.withMetrics(metrics)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.operator.metrics;

import jakarta.enterprise.context.ApplicationScoped;

import io.debezium.operator.api.model.runtime.metrics.JmxExporterBuilder;
import io.debezium.operator.api.model.runtime.metrics.MetricsBuilder;
import io.debezium.platform.config.PipelineConfigGroup;

/**
* Strategy for configuring JMX metrics exporter.
* JMX exporter is typically always enabled for monitoring Java applications.
*/
@ApplicationScoped
public class JmxExporterStrategy implements MetricsExporterStrategy {

@Override
public boolean isApplicable(PipelineConfigGroup config) {
// JMX is always enabled by default for backward compatibility
// Remove once the Monitor feature is completed e2e
return true;
}

@Override
public void apply(MetricsBuilder metricsBuilder, PipelineConfigGroup config) {
metricsBuilder.withJmxExporter(
new JmxExporterBuilder()
.withEnabled()
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.operator.metrics;

import io.debezium.operator.api.model.runtime.metrics.MetricsBuilder;
import io.debezium.platform.config.PipelineConfigGroup;

/**
* Strategy interface for configuring different metrics exporters.
* Implementations of this interface encapsulate the logic for determining
* whether a specific metrics exporter should be enabled and how to configure it.
*/
public interface MetricsExporterStrategy {

/**
* Determines if this metrics exporter should be enabled based on the pipeline configuration.
*
* @param config the pipeline configuration
* @return true if this exporter should be applied, false otherwise
*/
boolean isApplicable(PipelineConfigGroup config);

/**
* Applies this exporter's configuration to the metrics builder.
*
* @param metricsBuilder the metrics builder to configure
* @param config the pipeline configuration containing exporter-specific settings
*/
void apply(MetricsBuilder metricsBuilder, PipelineConfigGroup config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.operator.metrics;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;

import io.debezium.operator.api.model.runtime.metrics.Metrics;
import io.debezium.operator.api.model.runtime.metrics.MetricsBuilder;
import io.debezium.platform.config.PipelineConfigGroup;

@ApplicationScoped
public class MetricsProducer {

private final Instance<MetricsExporterStrategy> strategies;
private final PipelineConfigGroup config;

@Inject
public MetricsProducer(Instance<MetricsExporterStrategy> strategies, PipelineConfigGroup config) {
this.strategies = strategies;
this.config = config;
}

@Produces
Metrics metrics() {
var metricsBuilder = new MetricsBuilder();

strategies.stream()
.filter(strategy -> strategy.isApplicable(config))
.forEach(strategy -> strategy.apply(metricsBuilder, config));

return metricsBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.operator.metrics;

import jakarta.enterprise.context.ApplicationScoped;

import io.debezium.operator.api.model.runtime.metrics.MetricsBuilder;
import io.debezium.operator.api.model.runtime.metrics.OpenTelemetryBuilder;
import io.debezium.operator.api.model.runtime.metrics.OtelCollectorBuilder;
import io.debezium.platform.config.PipelineConfigGroup;

/**
* Strategy for configuring OpenTelemetry metrics exporter.
* This exporter is enabled based on configuration and can optionally
* include a collector endpoint.
*/
@ApplicationScoped
public class OpenTelemetryExporterStrategy implements MetricsExporterStrategy {

@Override
public boolean isApplicable(PipelineConfigGroup config) {
return config.monitoring().otel().enabled();
}

@Override
public void apply(MetricsBuilder metricsBuilder, PipelineConfigGroup config) {
var otelBuilder = new OpenTelemetryBuilder()
.withEnabled();

config.monitoring().otel().endpoint()
.filter(e -> !e.isBlank())
.ifPresent(endpoint -> otelBuilder.withCollector(
new OtelCollectorBuilder()
.withEndpoint(endpoint)
.build()));

metricsBuilder.withOpenTelemetry(otelBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pipeline:
labels: {}
server:
image: ${PIPELINE_SERVER_IMAGE:}
monitoring:
otel:
enabled: ${PIPELINE_MONITORING_OTEL_ENABLED:false}
endpoint: ${PIPELINE_MONITORING_OTEL_ENDPOINT:http://localhost:4318}
sources:
database:
connection:
Expand Down
Loading
Loading