Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions docs/io-cdc-debezium.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@ The Debezium source connector pulls messages from MySQL or PostgreSQL and persis

The configuration of the Debezium source connector has the following properties.

| Name | Required | Default | Description |
|------|----------|---------|-------------|
| `task.class` | true | null | A source task class that is implemented in Debezium. |
| `database.hostname` | true | null | The address of a database server. |
| `database.port` | true | null | The port number of a database server.|
| `database.user` | true | null | The name of a database user that has the required privileges. |
| `database.password` | true | null | The password for a database user that has the required privileges. |
| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. |
| `database.server.name` | true | null | The logical name of a database server/cluster, which forms a namespace and is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. |
| `database.whitelist` | false | null | A list of all databases hosted by this server that is monitored by the connector.<br /><br /> This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. |
| `key.converter` | true | null | The converter provided by Kafka Connect to convert the record key. |
| `value.converter` | true | null | The converter provided by Kafka Connect to convert the record value. |
| `database.history` | true | null | The name of the database history class. |
| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements. <br /><br />**Note: this topic is for internal use only and should not be used by consumers.** |
| Name | Required | Default | Description |
|---------------------------------------|----------|---------|-------------|
| `task.class` | true | null | A source task class that is implemented in Debezium. |
| `database.hostname` | true | null | The address of a database server. |
| `database.port` | true | null | The port number of a database server.|
| `database.user` | true | null | The name of a database user that has the required privileges. |
| `database.password` | true | null | The password for a database user that has the required privileges. |
| `database.server.id` | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. |
| `topic.prefix` | true | null | The logical name of a database server/cluster, which forms a namespace and is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. |
| `database.include.list` | false | null | A list of all databases hosted by this server that is monitored by the connector.<br /><br /> This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. |
| `key.converter` | true | null | The converter provided by Kafka Connect to convert the record key. |
| `value.converter` | true | null | The converter provided by Kafka Connect to convert the record value. |
| `database.history` | true | null | The name of the database history class. |
| `database.history.pulsar.topic` | true | null | The name of the database history topic where the connector writes and recovers DDL statements. <br /><br />**Note: this topic is for internal use only and should not be used by consumers.** |
| `database.history.pulsar.service.url` | false| null | Pulsar cluster service URL for history topic. <br /><br />**Note**: If `database.history.pulsar.service.url` is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as `client_auth_plugin` and `client_auth_params`.|
| `pulsar.service.url` | true | null | Pulsar cluster service URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url http://pulsar:8080 sources localrun --source-config-file $PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar cluster.|
| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. |
| `mongodb.hosts` | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). |
| `mongodb.name` | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. |
| `mongodb.user` | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
| `mongodb.password` | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. |
| `pulsar.service.url` | true | null | Pulsar cluster service URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url http://pulsar:8080 sources localrun --source-config-file $PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar cluster.|
| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. |
| `mongodb.hosts` | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). |
| `mongodb.name` | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. |
| `mongodb.user` | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
| `mongodb.password` | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. |



Expand All @@ -59,13 +59,13 @@ You can use one of the following methods to create a configuration file.
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
"database.history.pulsar.topic": "history-topic",
"database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"pulsar.service.url": "pulsar://127.0.0.1:6650",
"offset.storage.topic": "offset-topic"
}
Expand All @@ -92,15 +92,15 @@ You can use one of the following methods to create a configuration file.
database.user: "debezium"
database.password: "dbz"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"
topic.prefix: "dbserver1"
database.include.list: "inventory"
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.history.pulsar.topic: "history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"

## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "org.apache.kafka.connect.storage.StringConverter"

## PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: "pulsar://127.0.0.1:6650"
Expand Down Expand Up @@ -142,7 +142,7 @@ This example shows how to change the data of a MySQL table using the Pulsar Debe
--name debezium-mysql-source \
--tenant public \
--namespace default \
--source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}'
--source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","topic.prefix": "dbserver1","database.include.list": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}'
```

* Use the **YAML** configuration file as shown previously.
Expand Down Expand Up @@ -229,8 +229,8 @@ You can use one of the following methods to create a configuration file.
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.server.name": "dbserver1",
"schema.whitelist": "inventory",
"topic.prefix": "dbserver1",
"schema.include.list": "inventory",
"pulsar.service.url": "pulsar://127.0.0.1:6650"
}
}
Expand All @@ -256,8 +256,8 @@ You can use one of the following methods to create a configuration file.
database.user: "postgres"
database.password: "postgres"
database.dbname: "postgres"
database.server.name: "dbserver1"
schema.whitelist: "inventory"
topic.prefix: "dbserver1"
schema.include.list: "inventory"

## PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: "pulsar://127.0.0.1:6650"
Expand Down Expand Up @@ -293,7 +293,7 @@ This example shows how to change the data of a PostgreSQL table using the Pulsar
--name debezium-postgres-source \
--tenant public \
--namespace default \
--source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","database.server.name": "dbserver1","plugin.name": "pgoutput","schema.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
--source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","topic.prefix": "dbserver1","plugin.name": "pgoutput","schema.include.list": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
```

* Use the **YAML** configuration file as shown previously.
Expand Down Expand Up @@ -364,7 +364,7 @@ You need to create a configuration file before using the Pulsar Debezium connect
"mongodb.user": "debezium",
"mongodb.password": "dbz",
"mongodb.task.id": "1",
"database.whitelist": "inventory",
"database.include.list": "inventory",
"pulsar.service.url": "pulsar://127.0.0.1:6650"
}
}
Expand All @@ -390,7 +390,7 @@ You need to create a configuration file before using the Pulsar Debezium connect
mongodb.user: "debezium"
mongodb.password: "dbz"
mongodb.task.id: "1"
database.whitelist: "inventory"
database.include.list: "inventory"

## PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: "pulsar://127.0.0.1:6650"
Expand Down Expand Up @@ -435,7 +435,7 @@ This example shows how to change the data of a MongoDB table using the Pulsar De
--name debezium-mongodb-source \
--tenant public \
--namespace default \
--source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
--source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.include.list": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
```

* Use the **YAML** configuration file as shown previously.
Expand Down
Loading