-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
Search before asking
- I searched in the issues and found nothing similar.
Paimon version
PaimonVersion: 0.9
Storage: LocalFile
Compute Engine
FlinkVersion: 1.17.2
Minimal reproduce step
When using Paimon for partial column updates, we mistakenly wrote the primary key to the sequence group. This caused some interesting phenomena. If the version column of the sequence group is null and an update occurs, a ParquetDecodingException will result. Below is the process we reproduced in our local test.
- Create Table DDL
CREATE TABLE if not exists test.paimon_test_partical
(
order_id BIGINT,
order_key STRING,
send_order_name STRING,
get_order_name STRING,
send_order_version BIGINT,
get_order_version BIGINT,
PRIMARY KEY (order_id,order_key) NOT ENFORCED
)
WITH
(
'connector' = 'paimon',
'bucket' = '1',
'file.format' = 'parquet',
'file.compression' = 'zstd',
'merge-engine' = 'partial-update',
'fields.send_order_version.sequence-group' = 'send_order_name,order_key',
'fields.get_order_version.sequence-group' = 'get_order_name',
'snapshot.num-retained.min' = '1',
'snapshot.num-retained.max' = '1',
'full-compaction.delta-commits' = '10'
);
- Batch Insert SQL
- First, execute the first statement to write
insert into test.paimon_test_partical(order_id,order_key,send_order_name,get_order_name,get_order_version)
values
(1,'a','test_send','test_get',1766632140),
(2,'a','test_send','test_get',1766632140),
(3,'a','test_send','test_get',1766632140),
(1,'b','test_send','test_get',1766632140),
(2,'b','test_send','test_get',1766632140),
(3,'b','test_send','test_get',1766632140),
(1,'c','test_send','test_get',1766632140),
(2,'c','test_send','test_get',1766632140),
(3,'c','test_send','test_get',1766632140)
QueryResult:
- Then write a portion of the data to overwrite.
insert into test.paimon_test_partical(order_id,order_key,send_order_name,get_order_name,get_order_version)
values
(1,'a','test_send1','test_get2',1766632143),
(2,'a','test_send1','test_get2',1766632143),
(3,'a','test_send1','test_get2',1766632143),
(1,'b','test_send1','test_get2',1766632143),
(2,'b','test_send1','test_get2',1766632143),
(3,'b','test_send1','test_get2',1766632143),
(1,'c','test_send1','test_get2',1766632143),
(2,'c','test_send1','test_get2',1766632143),
(3,'c','test_send1','test_get2',1766632143)
QueryResult
- ExceptionInfo
Caused by: org.apache.paimon.shade.org.apache.parquet.io.ParquetDecodingException: Failed to read 4 bytes
at org.apache.paimon.format.parquet.reader.AbstractColumnReader.readDataBuffer(AbstractColumnReader.java:276) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.format.parquet.reader.BytesColumnReader.readBinary(BytesColumnReader.java:86) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.format.parquet.reader.BytesColumnReader.readBatch(BytesColumnReader.java:51) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.format.parquet.reader.BytesColumnReader.readBatch(BytesColumnReader.java:32) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.format.parquet.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:189) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.format.parquet.ParquetReaderFactory$ParquetReader.nextBatch(ParquetReaderFactory.java:338) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.format.parquet.ParquetReaderFactory$ParquetReader.readBatch(ParquetReaderFactory.java:309) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.io.FileRecordReader.readBatch(FileRecordReader.java:47) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.paimon.flink.source.FileStoreSourceSplitReader.fetch(FileStoreSourceSplitReader.java:115) ~[paimon-flink-1.17-0.9.0.jar:0.9.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.17.2.jar:1.17.2]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[flink-connector-files-1.17.2.jar:1.17.2]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) ~[flink-connector-files-1.17.2.jar:1.17.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_471]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_471]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_471]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_471]
... 1 more
What doesn't meet your expectations?
- If the primary key column can be placed in a sequence group, should syntax validation be added?
- Why can the data be merged after removing the order key from the sequence-group, but the merge process sets the send_order_name value to empty, like this:
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!