Skip to content

Transactional pg_logical_emit_message() events are emitted twice after connector restart #2004

@atorik

Description

@atorik

What Debezium connector do you use and what version?

main(HEAD)


What is the connector configuration?

% cat docker-min-no-table.json
{
  "name": "test-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "topic.prefix": "example",
    "slot.name": "emit_log_message",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "no_tables",
    "publication.name": "notable",
    "snapshot.mode": "no_data"
  }
}

What is the captured database version and mode of deployment?

E.g. on-premises, with a specific cloud provider, etc.

on-premise


What behavior do you expect?

When a message is emitted using pg_logical_emit_message() with transactional = true before the connector is stopped, the message should be delivered exactly once after the connector is restarted.

According to the Debezium documentation, the PostgreSQL source connector supports Exactly Once Delivery:

https://debezium.io/documentation/reference/stable/configuration/eos.html


What behavior do you see?

After restarting the connector, the same transactional logical message is delivered twice.

For example, a message emitted via:

=# BEGIN;
=# SELECT * FROM pg_logical_emit_message(true, 'test1', 'aaa');
=# COMMIT;

is published twice to the Kafka topic after connector restart.


Do you see the same behaviour using the latest released Debezium version?

Yes.


Do you have the connector logs, ideally from start till finish?

You might be asked later to provide DEBUG/TRACE level log.

Yes. I can provide them if needed.


How to reproduce the issue using our tutorial deployment?

Start the PostgreSQL connector using the configuration above:

$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @docker-min-no-table.json

Emit a transactional logical message:

=# BEGIN; SELECT * FROM pg_logical_emit_message(true, 'test1', 'aaa'); COMMIT;

Stop the connector.

Restart the connector.

Consume from the message topic:

$ docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic example.message

Observe that two identical records with prefix test1 are present in the topic, although only one message was emitted:

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"prefix"}],"optional":false,"name":"io.debezium.connector.postgresql.MessageKey","version":1},"payload":{"prefix":"test1"}}      {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"},{"type":"string","optional":true,"field":"origin"},{"type":"int64","optional":true,"field":"origin_lsn"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","version":1,"field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"prefix"},{"type":"bytes","optional":true,"field":"content"}],"optional":false,"name":"io.debezium.connector.postgresql.Message","version":1,"field":"message"}],"optional":false,"name":"io.debezium.connector.postgresql.MessageValue","version":1},"payload":{"op":"m","ts_ms":1780131651282,"source":{"version":"3.5.1.Final","connector":"postgresql","name":"example","ts_ms":1780131650920,"snapshot":"false","db":"postgres","sequence":"[\"5549461504\",\"5549461624\"]","ts_us":1780131650920271,"ts_ns":1780131650920271000,"schema":"","table":"","txId":33926,"lsn":5549461624,"xmin":null,"origin":null,"origin_lsn":null},"transaction":null,"message":{"prefix":"test1","content":"YWFh"}}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"prefix"}],"optional":false,"name":"io.debezium.connector.postgresql.MessageKey","version":1},"payload":{"prefix":"test1"}}      {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"},{"type":"string","optional":true,"field":"origin"},{"type":"int64","optional":true,"field":"origin_lsn"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","version":1,"field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"prefix"},{"type":"bytes","optional":true,"field":"content"}],"optional":false,"name":"io.debezium.connector.postgresql.Message","version":1,"field":"message"}],"optional":false,"name":"io.debezium.connector.postgresql.MessageValue","version":1},"payload":{"op":"m","ts_ms":1780131741234,"source":{"version":"3.5.1.Final","connector":"postgresql","name":"example","ts_ms":1780131650920,"snapshot":"false","db":"postgres","sequence":"[\"5549461504\",\"5549461624\"]","ts_us":1780131650920271,"ts_ns":1780131650920271000,"schema":"","table":"","txId":33926,"lsn":5549461624,"xmin":null,"origin":null,"origin_lsn":null},"transaction":null,"message":{"prefix":"test1","content":"YWFh"}}}

I plan to submit a PR.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status
    To triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions