diff --git a/docs/io-cdc-debezium.md b/docs/io-cdc-debezium.md index 04e356664f4f..f26fa2598d62 100644 --- a/docs/io-cdc-debezium.md +++ b/docs/io-cdc-debezium.md @@ -16,28 +16,28 @@ The Debezium source connector pulls messages from MySQL or PostgreSQL and persis The configuration of the Debezium source connector has the following properties. -| Name | Required | Default | Description | -|------|----------|---------|-------------| -| `task.class` | true | null | A source task class that is implemented in Debezium. | -| `database.hostname` | true | null | The address of a database server. | -| `database.port` | true | null | The port number of a database server.| -| `database.user` | true | null | The name of a database user that has the required privileges. | -| `database.password` | true | null | The password for a database user that has the required privileges. | -| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | -| `database.server.name` | true | null | The logical name of a database server/cluster, which forms a namespace and is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | -| `database.whitelist` | false | null | A list of all databases hosted by this server that is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | -| `key.converter` | true | null | The converter provided by Kafka Connect to convert the record key. | -| `value.converter` | true | null | The converter provided by Kafka Connect to convert the record value. | -| `database.history` | true | null | The name of the database history class. | -| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | +| Name | Required | Default | Description | +|---------------------------------------|----------|---------|-------------| +| `task.class` | true | null | A source task class that is implemented in Debezium. | +| `database.hostname` | true | null | The address of a database server. | +| `database.port` | true | null | The port number of a database server.| +| `database.user` | true | null | The name of a database user that has the required privileges. | +| `database.password` | true | null | The password for a database user that has the required privileges. | +| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | +| `topic.prefix` | true | null | The logical name of a database server/cluster, which forms a namespace and is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | +| `database.include.list` | false | null | A list of all databases hosted by this server that is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | +| `key.converter` | true | null | The converter provided by Kafka Connect to convert the record key. | +| `value.converter` | true | null | The converter provided by Kafka Connect to convert the record value. | +| `database.history` | true | null | The name of the database history class. | +| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | | `database.history.pulsar.service.url` | false| null | Pulsar cluster service URL for history topic.

**Note**: If `database.history.pulsar.service.url` is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as `client_auth_plugin` and `client_auth_params`.| -| `pulsar.service.url` | true | null | Pulsar cluster service URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url http://pulsar:8080 sources localrun --source-config-file $PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar cluster.| -| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | -| `mongodb.hosts` | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). | -| `mongodb.name` | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. | -| `mongodb.user` | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | -| `mongodb.password` | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | -| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. | +| `pulsar.service.url` | true | null | Pulsar cluster service URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url http://pulsar:8080 sources localrun --source-config-file $PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar cluster.| +| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | +| `mongodb.hosts` | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). | +| `mongodb.name` | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. | +| `mongodb.user` | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | +| `mongodb.password` | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | +| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. | @@ -59,13 +59,13 @@ You can use one of the following methods to create a configuration file. "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", - "database.server.name": "dbserver1", - "database.whitelist": "inventory", + "topic.prefix": "dbserver1", + "database.include.list": "inventory", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.history.pulsar.topic": "history-topic", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "pulsar.service.url": "pulsar://127.0.0.1:6650", "offset.storage.topic": "offset-topic" } @@ -92,15 +92,15 @@ You can use one of the following methods to create a configuration file. database.user: "debezium" database.password: "dbz" database.server.id: "184054" - database.server.name: "dbserver1" - database.whitelist: "inventory" + topic.prefix: "dbserver1" + database.include.list: "inventory" database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.history.pulsar.topic: "history-topic" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG - key.converter: "org.apache.kafka.connect.json.JsonConverter" - value.converter: "org.apache.kafka.connect.json.JsonConverter" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -142,7 +142,7 @@ This example shows how to change the data of a MySQL table using the Pulsar Debe --name debezium-mysql-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' + --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","topic.prefix": "dbserver1","database.include.list": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' ``` * Use the **YAML** configuration file as shown previously. @@ -229,8 +229,8 @@ You can use one of the following methods to create a configuration file. "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", - "database.server.name": "dbserver1", - "schema.whitelist": "inventory", + "topic.prefix": "dbserver1", + "schema.include.list": "inventory", "pulsar.service.url": "pulsar://127.0.0.1:6650" } } @@ -256,8 +256,8 @@ You can use one of the following methods to create a configuration file. database.user: "postgres" database.password: "postgres" database.dbname: "postgres" - database.server.name: "dbserver1" - schema.whitelist: "inventory" + topic.prefix: "dbserver1" + schema.include.list: "inventory" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -293,7 +293,7 @@ This example shows how to change the data of a PostgreSQL table using the Pulsar --name debezium-postgres-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","database.server.name": "dbserver1","plugin.name": "pgoutput","schema.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","topic.prefix": "dbserver1","plugin.name": "pgoutput","schema.include.list": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` * Use the **YAML** configuration file as shown previously. @@ -364,7 +364,7 @@ You need to create a configuration file before using the Pulsar Debezium connect "mongodb.user": "debezium", "mongodb.password": "dbz", "mongodb.task.id": "1", - "database.whitelist": "inventory", + "database.include.list": "inventory", "pulsar.service.url": "pulsar://127.0.0.1:6650" } } @@ -390,7 +390,7 @@ You need to create a configuration file before using the Pulsar Debezium connect mongodb.user: "debezium" mongodb.password: "dbz" mongodb.task.id: "1" - database.whitelist: "inventory" + database.include.list: "inventory" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -435,7 +435,7 @@ This example shows how to change the data of a MongoDB table using the Pulsar De --name debezium-mongodb-source \ --tenant public \ --namespace default \ - --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.include.list": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` * Use the **YAML** configuration file as shown previously. diff --git a/docs/io-debezium-source.md b/docs/io-debezium-source.md index f5fdce1a4bee..9c60c5586e92 100644 --- a/docs/io-debezium-source.md +++ b/docs/io-debezium-source.md @@ -16,25 +16,25 @@ The Debezium source connector pulls messages from MySQL or PostgreSQL and persis The configuration of the Debezium source connector has the following properties. -| Name | Required | Default | Description | -|------|----------|---------|-------------| -| `task.class` | true | null | A source task class that implemented in Debezium. | -| `database.hostname` | true | null | The address of a database server. | -| `database.port` | true | null | The port number of a database server.| -| `database.user` | true | null | The name of a database user that has the required privileges. | -| `database.password` | true | null | The password for a database user that has the required privileges. | -| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | -| `database.server.name` | true | null | The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | -| `database.whitelist` | false | null | A list of all databases hosted by this server which is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | -| `key.converter` | true | null | The converter provided by Kafka Connect to convert record key. | -| `value.converter` | true | null | The converter provided by Kafka Connect to convert record value. | -| `database.history` | true | null | The name of the database history class. | -| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | -| `database.history.pulsar.service.url` | false| null | Pulsar cluster service URL for history topic.

**Note**: If `database.history.pulsar.service.url` is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as `client_auth_plugin` and `client_auth_params`.| -| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | -| `json-with-envelope` | false | false | Present the message that only consists of payload. | +| Name | Required | Default | Description | +|-----------------------------------------|----------|---------|-------------| +| `task.class` | true | null | A source task class that implemented in Debezium. | +| `database.hostname` | true | null | The address of a database server. | +| `database.port` | true | null | The port number of a database server.| +| `database.user` | true | null | The name of a database user that has the required privileges. | +| `database.password` | true | null | The password for a database user that has the required privileges. | +| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | +| `topic.prefix` | true | null | The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | +| `database.include.list` | false | null | A list of all databases hosted by this server which is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | +| `key.converter` | true | null | The converter provided by Kafka Connect to convert record key. | +| `value.converter` | true | null | The converter provided by Kafka Connect to convert record value. | +| `database.history` | true | null | The name of the database history class. | +| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | +| `database.history.pulsar.service.url` | false| null | Pulsar cluster service URL for history topic.

**Note**: If `database.history.pulsar.service.url` is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as `client_auth_plugin` and `client_auth_params`.| +| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | +| `json-with-envelope` | false | false | Present the message that only consists of payload. | | `database.history.pulsar.reader.config` | false | null | The configs of the reader for the database schema history topic, in the form of a JSON string with key-value pairs. | -| `offset.storage.reader.config` | false | null | The configs of the reader for the kafka connector offsets topic, in the form of a JSON string with key-value pairs. | +| `offset.storage.reader.config` | false | null | The configs of the reader for the kafka connector offsets topic, in the form of a JSON string with key-value pairs. | ### Converter Options @@ -100,13 +100,13 @@ You can use one of the following methods to create a configuration file. "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", - "database.server.name": "dbserver1", - "database.whitelist": "inventory", + "topic.prefix": "dbserver1", + "table.include.list": "inventory", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.history.pulsar.topic": "history-topic", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "offset.storage.topic": "offset-topic" } } @@ -132,15 +132,15 @@ You can use one of the following methods to create a configuration file. database.user: "debezium" database.password: "dbz" database.server.id: "184054" - database.server.name: "dbserver1" - database.whitelist: "inventory" + topic.prefix: "dbserver1" + database.include.list: "inventory" database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.history.pulsar.topic: "history-topic" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG - key.converter: "org.apache.kafka.connect.json.JsonConverter" - value.converter: "org.apache.kafka.connect.json.JsonConverter" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" ## OFFSET_STORAGE_TOPIC_CONFIG offset.storage.topic: "offset-topic" @@ -179,7 +179,7 @@ This example shows how to change the data of a MySQL table using the Pulsar Debe --name debezium-mysql-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' + --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","topic.prefix": "dbserver1","database.include.list": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' ``` :::note @@ -278,10 +278,10 @@ You can use one of the following methods to create a configuration file. "database.user": "postgres", "database.password": "changeme", "database.dbname": "postgres", - "database.server.name": "dbserver1", + "topic.prefix": "dbserver1", "plugin.name": "pgoutput", - "schema.whitelist": "public", - "table.whitelist": "public.users", + "schema.include.list": "public", + "table.include.list": "public.users", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650" } ``` @@ -305,10 +305,12 @@ You can use one of the following methods to create a configuration file. database.user: "postgres" database.password: "changeme" database.dbname: "postgres" - database.server.name: "dbserver1" + topic.prefix: "dbserver1" plugin.name: "pgoutput" - schema.whitelist: "public" - table.whitelist: "public.users" + schema.include.list: "public" + table.include.list: "public.users" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" ## PULSAR_SERVICE_URL_CONFIG database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -349,7 +351,7 @@ This example shows how to change the data of a PostgreSQL table using the Pulsar --name debezium-postgres-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "changeme","database.dbname": "postgres","database.server.name": "dbserver1","plugin.name": "pgoutput","schema.whitelist": "public","table.whitelist": "public.users","pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "changeme","database.dbname": "postgres","topic.prefix": "dbserver1","plugin.name": "pgoutput","schema.include.list": "public","table.include.list": "public.users","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` :::note @@ -441,7 +443,7 @@ You can use one of the following methods to create a configuration file. "mongodb.user": "debezium", "mongodb.password": "dbz", "mongodb.task.id": "1", - "database.whitelist": "inventory", + "database.include.list": "inventory", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650" } ``` @@ -466,7 +468,7 @@ You can use one of the following methods to create a configuration file. mongodb.user: "debezium" mongodb.password: "dbz" mongodb.task.id: "1" - database.whitelist: "inventory" + database.include.list: "inventory" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ``` @@ -509,7 +511,7 @@ This example shows how to change the data of a MongoDB table using the Pulsar De --name debezium-mongodb-source \ --tenant public \ --namespace default \ - --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.whitelist": "inventory","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.include.list": "inventory","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` :::note @@ -585,14 +587,13 @@ Using YAML as an example, you can create a `debezium-oracle-source-config.yaml` "database.user": "dbzuser", "database.password": "dbz", "database.dbname": "XE", - "database.server.name": "XE", + "topic.prefix": "XE", "schema.exclude.list": "system,dbzuser", "snapshot.mode": "initial", "topic.namespace": "public/default", "task.class": "io.debezium.connector.oracle.OracleConnectorTask", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "typeClassName": "org.apache.pulsar.common.schema.KeyValue", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.tcpKeepAlive": "true", "decimal.handling.mode": "double", @@ -619,14 +620,13 @@ configs: database.user: "dbzuser" database.password: "dbz" database.dbname: "XE" - database.server.name: "XE" + topic.prefix: "XE" schema.exclude.list: "system,dbzuser" snapshot.mode: "initial" topic.namespace: "public/default" task.class: "io.debezium.connector.oracle.OracleConnectorTask" - value.converter: "org.apache.kafka.connect.json.JsonConverter" - key.converter: "org.apache.kafka.connect.json.JsonConverter" - typeClassName: "org.apache.pulsar.common.schema.KeyValue" + key.converter: "org.apache.kafka.connect.storage.StringConverter", + value.converter: "org.apache.kafka.connect.storage.StringConverter", database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.tcpKeepAlive: "true" decimal.handling.mode: "double" @@ -655,13 +655,12 @@ Similarly to other connectors, you can use JSON or YAML to configure the connect "database.user": "sa", "database.password": "MyP@ssw0rd!", "database.dbname": "MyTestDB", - "database.server.name": "mssql", + "topic.prefix": "mssql", "snapshot.mode": "schema_only", "topic.namespace": "public/default", "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "typeClassName": "org.apache.pulsar.common.schema.KeyValue", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.tcpKeepAlive": "true", "decimal.handling.mode": "double", @@ -688,13 +687,12 @@ configs: database.user: "sa" database.password: "MyP@ssw0rd!" database.dbname: "MyTestDB" - database.server.name: "mssql" + topic.prefix: "mssql" snapshot.mode: "schema_only" topic.namespace: "public/default" task.class: "io.debezium.connector.sqlserver.SqlServerConnectorTask" - value.converter: "org.apache.kafka.connect.json.JsonConverter" - key.converter: "org.apache.kafka.connect.json.JsonConverter" - typeClassName: "org.apache.pulsar.common.schema.KeyValue" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.tcpKeepAlive: "true" decimal.handling.mode: "double" diff --git a/versioned_docs/version-4.2.x/io-cdc-debezium.md b/versioned_docs/version-4.2.x/io-cdc-debezium.md index 04e356664f4f..f26fa2598d62 100644 --- a/versioned_docs/version-4.2.x/io-cdc-debezium.md +++ b/versioned_docs/version-4.2.x/io-cdc-debezium.md @@ -16,28 +16,28 @@ The Debezium source connector pulls messages from MySQL or PostgreSQL and persis The configuration of the Debezium source connector has the following properties. -| Name | Required | Default | Description | -|------|----------|---------|-------------| -| `task.class` | true | null | A source task class that is implemented in Debezium. | -| `database.hostname` | true | null | The address of a database server. | -| `database.port` | true | null | The port number of a database server.| -| `database.user` | true | null | The name of a database user that has the required privileges. | -| `database.password` | true | null | The password for a database user that has the required privileges. | -| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | -| `database.server.name` | true | null | The logical name of a database server/cluster, which forms a namespace and is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | -| `database.whitelist` | false | null | A list of all databases hosted by this server that is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | -| `key.converter` | true | null | The converter provided by Kafka Connect to convert the record key. | -| `value.converter` | true | null | The converter provided by Kafka Connect to convert the record value. | -| `database.history` | true | null | The name of the database history class. | -| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | +| Name | Required | Default | Description | +|---------------------------------------|----------|---------|-------------| +| `task.class` | true | null | A source task class that is implemented in Debezium. | +| `database.hostname` | true | null | The address of a database server. | +| `database.port` | true | null | The port number of a database server.| +| `database.user` | true | null | The name of a database user that has the required privileges. | +| `database.password` | true | null | The password for a database user that has the required privileges. | +| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | +| `topic.prefix` | true | null | The logical name of a database server/cluster, which forms a namespace and is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | +| `database.include.list` | false | null | A list of all databases hosted by this server that is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | +| `key.converter` | true | null | The converter provided by Kafka Connect to convert the record key. | +| `value.converter` | true | null | The converter provided by Kafka Connect to convert the record value. | +| `database.history` | true | null | The name of the database history class. | +| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | | `database.history.pulsar.service.url` | false| null | Pulsar cluster service URL for history topic.

**Note**: If `database.history.pulsar.service.url` is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as `client_auth_plugin` and `client_auth_params`.| -| `pulsar.service.url` | true | null | Pulsar cluster service URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url http://pulsar:8080 sources localrun --source-config-file $PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar cluster.| -| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | -| `mongodb.hosts` | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). | -| `mongodb.name` | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. | -| `mongodb.user` | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | -| `mongodb.password` | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | -| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. | +| `pulsar.service.url` | true | null | Pulsar cluster service URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url http://pulsar:8080 sources localrun --source-config-file $PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar cluster.| +| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | +| `mongodb.hosts` | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). | +| `mongodb.name` | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. | +| `mongodb.user` | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | +| `mongodb.password` | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | +| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. | @@ -59,13 +59,13 @@ You can use one of the following methods to create a configuration file. "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", - "database.server.name": "dbserver1", - "database.whitelist": "inventory", + "topic.prefix": "dbserver1", + "database.include.list": "inventory", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.history.pulsar.topic": "history-topic", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "pulsar.service.url": "pulsar://127.0.0.1:6650", "offset.storage.topic": "offset-topic" } @@ -92,15 +92,15 @@ You can use one of the following methods to create a configuration file. database.user: "debezium" database.password: "dbz" database.server.id: "184054" - database.server.name: "dbserver1" - database.whitelist: "inventory" + topic.prefix: "dbserver1" + database.include.list: "inventory" database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.history.pulsar.topic: "history-topic" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG - key.converter: "org.apache.kafka.connect.json.JsonConverter" - value.converter: "org.apache.kafka.connect.json.JsonConverter" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -142,7 +142,7 @@ This example shows how to change the data of a MySQL table using the Pulsar Debe --name debezium-mysql-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' + --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","topic.prefix": "dbserver1","database.include.list": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' ``` * Use the **YAML** configuration file as shown previously. @@ -229,8 +229,8 @@ You can use one of the following methods to create a configuration file. "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", - "database.server.name": "dbserver1", - "schema.whitelist": "inventory", + "topic.prefix": "dbserver1", + "schema.include.list": "inventory", "pulsar.service.url": "pulsar://127.0.0.1:6650" } } @@ -256,8 +256,8 @@ You can use one of the following methods to create a configuration file. database.user: "postgres" database.password: "postgres" database.dbname: "postgres" - database.server.name: "dbserver1" - schema.whitelist: "inventory" + topic.prefix: "dbserver1" + schema.include.list: "inventory" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -293,7 +293,7 @@ This example shows how to change the data of a PostgreSQL table using the Pulsar --name debezium-postgres-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","database.server.name": "dbserver1","plugin.name": "pgoutput","schema.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","topic.prefix": "dbserver1","plugin.name": "pgoutput","schema.include.list": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` * Use the **YAML** configuration file as shown previously. @@ -364,7 +364,7 @@ You need to create a configuration file before using the Pulsar Debezium connect "mongodb.user": "debezium", "mongodb.password": "dbz", "mongodb.task.id": "1", - "database.whitelist": "inventory", + "database.include.list": "inventory", "pulsar.service.url": "pulsar://127.0.0.1:6650" } } @@ -390,7 +390,7 @@ You need to create a configuration file before using the Pulsar Debezium connect mongodb.user: "debezium" mongodb.password: "dbz" mongodb.task.id: "1" - database.whitelist: "inventory" + database.include.list: "inventory" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -435,7 +435,7 @@ This example shows how to change the data of a MongoDB table using the Pulsar De --name debezium-mongodb-source \ --tenant public \ --namespace default \ - --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.include.list": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` * Use the **YAML** configuration file as shown previously. diff --git a/versioned_docs/version-4.2.x/io-debezium-source.md b/versioned_docs/version-4.2.x/io-debezium-source.md index f5fdce1a4bee..9c60c5586e92 100644 --- a/versioned_docs/version-4.2.x/io-debezium-source.md +++ b/versioned_docs/version-4.2.x/io-debezium-source.md @@ -16,25 +16,25 @@ The Debezium source connector pulls messages from MySQL or PostgreSQL and persis The configuration of the Debezium source connector has the following properties. -| Name | Required | Default | Description | -|------|----------|---------|-------------| -| `task.class` | true | null | A source task class that implemented in Debezium. | -| `database.hostname` | true | null | The address of a database server. | -| `database.port` | true | null | The port number of a database server.| -| `database.user` | true | null | The name of a database user that has the required privileges. | -| `database.password` | true | null | The password for a database user that has the required privileges. | -| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | -| `database.server.name` | true | null | The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | -| `database.whitelist` | false | null | A list of all databases hosted by this server which is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | -| `key.converter` | true | null | The converter provided by Kafka Connect to convert record key. | -| `value.converter` | true | null | The converter provided by Kafka Connect to convert record value. | -| `database.history` | true | null | The name of the database history class. | -| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | -| `database.history.pulsar.service.url` | false| null | Pulsar cluster service URL for history topic.

**Note**: If `database.history.pulsar.service.url` is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as `client_auth_plugin` and `client_auth_params`.| -| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | -| `json-with-envelope` | false | false | Present the message that only consists of payload. | +| Name | Required | Default | Description | +|-----------------------------------------|----------|---------|-------------| +| `task.class` | true | null | A source task class that implemented in Debezium. | +| `database.hostname` | true | null | The address of a database server. | +| `database.port` | true | null | The port number of a database server.| +| `database.user` | true | null | The name of a database user that has the required privileges. | +| `database.password` | true | null | The password for a database user that has the required privileges. | +| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. | +| `topic.prefix` | true | null | The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. | +| `database.include.list` | false | null | A list of all databases hosted by this server which is monitored by the connector.

This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. | +| `key.converter` | true | null | The converter provided by Kafka Connect to convert record key. | +| `value.converter` | true | null | The converter provided by Kafka Connect to convert record value. | +| `database.history` | true | null | The name of the database history class. | +| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements.

**Note: this topic is for internal use only and should not be used by consumers.** | +| `database.history.pulsar.service.url` | false| null | Pulsar cluster service URL for history topic.

**Note**: If `database.history.pulsar.service.url` is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as `client_auth_plugin` and `client_auth_params`.| +| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. | +| `json-with-envelope` | false | false | Present the message that only consists of payload. | | `database.history.pulsar.reader.config` | false | null | The configs of the reader for the database schema history topic, in the form of a JSON string with key-value pairs. | -| `offset.storage.reader.config` | false | null | The configs of the reader for the kafka connector offsets topic, in the form of a JSON string with key-value pairs. | +| `offset.storage.reader.config` | false | null | The configs of the reader for the kafka connector offsets topic, in the form of a JSON string with key-value pairs. | ### Converter Options @@ -100,13 +100,13 @@ You can use one of the following methods to create a configuration file. "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", - "database.server.name": "dbserver1", - "database.whitelist": "inventory", + "topic.prefix": "dbserver1", + "table.include.list": "inventory", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.history.pulsar.topic": "history-topic", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "offset.storage.topic": "offset-topic" } } @@ -132,15 +132,15 @@ You can use one of the following methods to create a configuration file. database.user: "debezium" database.password: "dbz" database.server.id: "184054" - database.server.name: "dbserver1" - database.whitelist: "inventory" + topic.prefix: "dbserver1" + database.include.list: "inventory" database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.history.pulsar.topic: "history-topic" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG - key.converter: "org.apache.kafka.connect.json.JsonConverter" - value.converter: "org.apache.kafka.connect.json.JsonConverter" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" ## OFFSET_STORAGE_TOPIC_CONFIG offset.storage.topic: "offset-topic" @@ -179,7 +179,7 @@ This example shows how to change the data of a MySQL table using the Pulsar Debe --name debezium-mysql-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' + --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","topic.prefix": "dbserver1","database.include.list": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}' ``` :::note @@ -278,10 +278,10 @@ You can use one of the following methods to create a configuration file. "database.user": "postgres", "database.password": "changeme", "database.dbname": "postgres", - "database.server.name": "dbserver1", + "topic.prefix": "dbserver1", "plugin.name": "pgoutput", - "schema.whitelist": "public", - "table.whitelist": "public.users", + "schema.include.list": "public", + "table.include.list": "public.users", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650" } ``` @@ -305,10 +305,12 @@ You can use one of the following methods to create a configuration file. database.user: "postgres" database.password: "changeme" database.dbname: "postgres" - database.server.name: "dbserver1" + topic.prefix: "dbserver1" plugin.name: "pgoutput" - schema.whitelist: "public" - table.whitelist: "public.users" + schema.include.list: "public" + table.include.list: "public.users" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" ## PULSAR_SERVICE_URL_CONFIG database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" @@ -349,7 +351,7 @@ This example shows how to change the data of a PostgreSQL table using the Pulsar --name debezium-postgres-source \ --tenant public \ --namespace default \ - --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "changeme","database.dbname": "postgres","database.server.name": "dbserver1","plugin.name": "pgoutput","schema.whitelist": "public","table.whitelist": "public.users","pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "changeme","database.dbname": "postgres","topic.prefix": "dbserver1","plugin.name": "pgoutput","schema.include.list": "public","table.include.list": "public.users","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` :::note @@ -441,7 +443,7 @@ You can use one of the following methods to create a configuration file. "mongodb.user": "debezium", "mongodb.password": "dbz", "mongodb.task.id": "1", - "database.whitelist": "inventory", + "database.include.list": "inventory", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650" } ``` @@ -466,7 +468,7 @@ You can use one of the following methods to create a configuration file. mongodb.user: "debezium" mongodb.password: "dbz" mongodb.task.id: "1" - database.whitelist: "inventory" + database.include.list: "inventory" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ``` @@ -509,7 +511,7 @@ This example shows how to change the data of a MongoDB table using the Pulsar De --name debezium-mongodb-source \ --tenant public \ --namespace default \ - --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.whitelist": "inventory","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"}' + --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.include.list": "inventory","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"}' ``` :::note @@ -585,14 +587,13 @@ Using YAML as an example, you can create a `debezium-oracle-source-config.yaml` "database.user": "dbzuser", "database.password": "dbz", "database.dbname": "XE", - "database.server.name": "XE", + "topic.prefix": "XE", "schema.exclude.list": "system,dbzuser", "snapshot.mode": "initial", "topic.namespace": "public/default", "task.class": "io.debezium.connector.oracle.OracleConnectorTask", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "typeClassName": "org.apache.pulsar.common.schema.KeyValue", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.tcpKeepAlive": "true", "decimal.handling.mode": "double", @@ -619,14 +620,13 @@ configs: database.user: "dbzuser" database.password: "dbz" database.dbname: "XE" - database.server.name: "XE" + topic.prefix: "XE" schema.exclude.list: "system,dbzuser" snapshot.mode: "initial" topic.namespace: "public/default" task.class: "io.debezium.connector.oracle.OracleConnectorTask" - value.converter: "org.apache.kafka.connect.json.JsonConverter" - key.converter: "org.apache.kafka.connect.json.JsonConverter" - typeClassName: "org.apache.pulsar.common.schema.KeyValue" + key.converter: "org.apache.kafka.connect.storage.StringConverter", + value.converter: "org.apache.kafka.connect.storage.StringConverter", database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.tcpKeepAlive: "true" decimal.handling.mode: "double" @@ -655,13 +655,12 @@ Similarly to other connectors, you can use JSON or YAML to configure the connect "database.user": "sa", "database.password": "MyP@ssw0rd!", "database.dbname": "MyTestDB", - "database.server.name": "mssql", + "topic.prefix": "mssql", "snapshot.mode": "schema_only", "topic.namespace": "public/default", "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "typeClassName": "org.apache.pulsar.common.schema.KeyValue", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "database.tcpKeepAlive": "true", "decimal.handling.mode": "double", @@ -688,13 +687,12 @@ configs: database.user: "sa" database.password: "MyP@ssw0rd!" database.dbname: "MyTestDB" - database.server.name: "mssql" + topic.prefix: "mssql" snapshot.mode: "schema_only" topic.namespace: "public/default" task.class: "io.debezium.connector.sqlserver.SqlServerConnectorTask" - value.converter: "org.apache.kafka.connect.json.JsonConverter" - key.converter: "org.apache.kafka.connect.json.JsonConverter" - typeClassName: "org.apache.pulsar.common.schema.KeyValue" + key.converter: "org.apache.kafka.connect.storage.StringConverter" + value.converter: "org.apache.kafka.connect.storage.StringConverter" database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.tcpKeepAlive: "true" decimal.handling.mode: "double"