diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index bbce5fad82f4..3a69d1177f4a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -1977,11 +1977,6 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
+ changeStreamDatabaseId
+ " has dialect "
+ changeStreamDatabaseDialect);
- LOG.info(
- "The Spanner database "
- + fullPartitionMetadataDatabaseId
- + " has dialect "
- + metadataDatabaseDialect);
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
@@ -2005,6 +2000,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final boolean isMutableChangeStream =
isMutableChangeStream(
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
+ LOG.info("The change stream " + changeStreamName + " is mutable: " + isMutableChangeStream);
final DaoFactory daoFactory =
new DaoFactory(
changeStreamSpannerConfig,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
index 1268c739164f..04aac06caf9c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
@@ -20,6 +20,7 @@
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Struct;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.joda.time.Duration;
/**
@@ -128,6 +129,33 @@ public boolean isProtoChangeRecord() {
&& resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
}
+ /**
+ * Returns the change stream record at the current pointer by parsing the bytes column. It also
+ * updates the timestamp at which the record was read.
+ *
+ *
Should only be used for PostgreSQL databases when the change stream record is delivered as
+ * proto bytes.
+ *
+ * @return a change stream record as a proto or null
+ */
+ public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecordFromBytes() {
+ recordReadAt = Timestamp.now();
+ try {
+ // Use getBytes(0) for the BYTES column returned by read_proto_bytes_ TVF
+ return com.google.spanner.v1.ChangeStreamRecord.parseFrom(
+ resultSet.getBytes(0).toByteArray());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Failed to parse the proto bytes to ChangeStreamRecord proto", e);
+ }
+ }
+
+ /** Returns true if the result set at the current pointer contain only one bytes change record. */
+ public boolean isProtoBytesChangeRecord() {
+ return resultSet.getColumnCount() == 1
+ && !resultSet.isNull(0)
+ && resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.BYTES;
+ }
+
/**
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
* which the record was read.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index 631538646669..79df18c78a9f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -218,18 +218,28 @@ public class ChangeStreamRecordMapper {
* @param resultSet the change stream result set
* @param resultSetMetadata the metadata generated when reading the change stream row
* @return a {@link List} of {@link ChangeStreamRecord} subclasses
+ * @throws InvalidProtocolBufferException
*/
public List toChangeStreamRecords(
PartitionMetadata partition,
ChangeStreamResultSet resultSet,
ChangeStreamResultSetMetadata resultSetMetadata) {
if (this.isPostgres()) {
- // In PostgresQL, change stream records are returned as JsonB.
+ // For `MUTABLE_KEY_RANGE` option, change stream records are returned as protos.
+ if (resultSet.isProtoBytesChangeRecord()) {
+ return Arrays.asList(
+ toChangeStreamRecord(
+ partition, resultSet.getProtoChangeStreamRecordFromBytes(), resultSetMetadata));
+ }
+
+ // For `IMMUTABLE_KEY_RANGE` option, change stream records are returned as
+ // JsonB.
return Collections.singletonList(
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
}
- // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
+ // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are
+ // returned as Protos.
if (resultSet.isProtoChangeRecord()) {
return Arrays.asList(
toChangeStreamRecord(
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
new file mode 100644
index 000000000000..f0c537756851
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestProtoMapper.recordToProto;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ResultSet;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.junit.Test;
+
+public class ChangeStreamResultSetTest {
+
+ @Test
+ public void testGetProtoChangeStreamRecordFromBytes() throws Exception {
+ // 1. Create an expected ChangeStreamRecord proto
+ Timestamp now = Timestamp.now();
+ final HeartbeatRecord heartbeatRecord =
+ new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+ com.google.spanner.v1.ChangeStreamRecord expectedRecord = recordToProto(heartbeatRecord);
+ assertNotNull(expectedRecord);
+
+ // 2. Convert it to bytes (simulating how Spanner PostgreSQL returns it)
+ byte[] protoBytes = expectedRecord.toByteArray();
+
+ // 3. Mock the underlying Spanner ResultSet
+ ResultSet mockResultSet = mock(ResultSet.class);
+ // Simulate column 0 containing the BYTES representation of the proto
+ when(mockResultSet.getBytes(0)).thenReturn(ByteArray.copyFrom(protoBytes));
+
+ // 4. Initialize ChangeStreamResultSet with the mock
+ ChangeStreamResultSet changeStreamResultSet = new ChangeStreamResultSet(mockResultSet);
+
+ // 5. Call the new method and assert it parses correctly
+ // (Note: This assumes you have added getProtoChangeStreamRecordFromBytes to the class)
+ com.google.spanner.v1.ChangeStreamRecord actualRecord =
+ changeStreamResultSet.getProtoChangeStreamRecordFromBytes();
+
+ assertEquals(expectedRecord, actualRecord);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
new file mode 100644
index 000000000000..573ac8259101
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.Statement;
+import com.google.gson.Gson;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end test of Cloud Spanner CDC Source. */
+@RunWith(JUnit4.class)
+public class SpannerChangeStreamPlacementTablePostgresIT {
+
+ @Rule public transient Timeout globalTimeout = Timeout.seconds(3600);
+
+ @ClassRule
+ public static final IntegrationTestEnv ENV =
+ new IntegrationTestEnv(
+ /*isPostgres=*/ true,
+ /*isPlacementTableBasedChangeStream=*/ true,
+ /*host=*/ Optional.empty());
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private static String instanceId;
+ private static String projectId;
+ private static String databaseId;
+ private static String metadataTableName;
+ private static String changeStreamTableName;
+ private static String changeStreamName;
+ private static DatabaseClient databaseClient;
+ private static String host = "https://spanner.googleapis.com";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ projectId = ENV.getProjectId();
+ instanceId = ENV.getInstanceId();
+ databaseId = ENV.getDatabaseId();
+
+ metadataTableName = ENV.getMetadataTableName();
+ changeStreamTableName = ENV.createSingersTable();
+ changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
+ databaseClient = ENV.getDatabaseClient();
+ }
+
+ @Before
+ public void before() {
+ pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setStreaming(true);
+ pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
+ }
+
+ @Test
+ public void testReadSpannerChangeStream() {
+ // Defines how many rows are going to be inserted / updated / deleted in the test
+ final int numRows = 5;
+ // Inserts numRows rows and uses the first commit timestamp as the startAt for reading the
+ // change stream
+ final Pair insertTimestamps = insertRows(numRows);
+ final Timestamp startAt = insertTimestamps.getLeft();
+ // Updates the created rows
+ updateRows(numRows);
+ // Delete the created rows and uses the last commit timestamp as the endAt for reading the
+ // change stream
+ final Pair deleteTimestamps = deleteRows(numRows);
+ final Timestamp endAt = deleteTimestamps.getRight();
+
+ final SpannerConfig spannerConfig =
+ SpannerConfig.create()
+ .withProjectId(projectId)
+ .withInstanceId(instanceId)
+ .withDatabaseId(databaseId)
+ .withHost(ValueProvider.StaticValueProvider.of(host));
+
+ final PCollection tokens =
+ pipeline
+ .apply(
+ SpannerIO.readChangeStream()
+ .withSpannerConfig(spannerConfig)
+ .withChangeStreamName(changeStreamName)
+ .withMetadataDatabase(databaseId)
+ .withMetadataTable(metadataTableName)
+ .withInclusiveStartAt(startAt)
+ .withInclusiveEndAt(endAt))
+ .apply(ParDo.of(new ModsToString()));
+
+ // Each row is composed by the following data
+ //
+ PAssert.that(tokens)
+ .containsInAnyOrder(
+ "INSERT,1,null,null,First Name 1,Last Name 1",
+ "INSERT,2,null,null,First Name 2,Last Name 2",
+ "INSERT,3,null,null,First Name 3,Last Name 3",
+ "INSERT,4,null,null,First Name 4,Last Name 4",
+ "INSERT,5,null,null,First Name 5,Last Name 5",
+ "UPDATE,1,First Name 1,Last Name 1,Updated First Name 1,Updated Last Name 1",
+ "UPDATE,2,First Name 2,Last Name 2,Updated First Name 2,Updated Last Name 2",
+ "UPDATE,3,First Name 3,Last Name 3,Updated First Name 3,Updated Last Name 3",
+ "UPDATE,4,First Name 4,Last Name 4,Updated First Name 4,Updated Last Name 4",
+ "UPDATE,5,First Name 5,Last Name 5,Updated First Name 5,Updated Last Name 5",
+ "DELETE,1,Updated First Name 1,Updated Last Name 1,null,null",
+ "DELETE,2,Updated First Name 2,Updated Last Name 2,null,null",
+ "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null",
+ "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null",
+ "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null");
+ pipeline.run().waitUntilFinish();
+
+ assertMetadataTableHasBeenDropped();
+ }
+
+ private static void assertMetadataTableHasBeenDropped() {
+ try (ResultSet resultSet =
+ databaseClient
+ .singleUse()
+ .executeQuery(Statement.of("SELECT * FROM \"" + metadataTableName + "\""))) {
+ resultSet.next();
+ fail(
+ "The metadata table "
+ + metadataTableName
+ + " should had been dropped, but it still exists");
+ } catch (SpannerException e) {
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ assertTrue(
+ "Error message must contain \"Table not found\"",
+ e.getMessage().contains("relation \"" + metadataTableName + "\" does not exist"));
+ }
+ }
+
+ private static Pair insertRows(int n) {
+ final Timestamp firstCommitTimestamp = insertRow(1);
+ for (int i = 2; i < n; i++) {
+ insertRow(i);
+ }
+ final Timestamp lastCommitTimestamp = insertRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Pair updateRows(int n) {
+ final Timestamp firstCommitTimestamp = updateRow(1);
+ for (int i = 2; i < n; i++) {
+ updateRow(i);
+ }
+ final Timestamp lastCommitTimestamp = updateRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Pair deleteRows(int n) {
+ final Timestamp firstCommitTimestamp = deleteRow(1);
+ for (int i = 2; i < n; i++) {
+ deleteRow(i);
+ }
+ final Timestamp lastCommitTimestamp = deleteRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Timestamp insertRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(
+ Mutation.newInsertBuilder(changeStreamTableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to("First Name " + singerId)
+ .set("LastName")
+ .to("Last Name " + singerId)
+ .build()))
+ .getCommitTimestamp();
+ }
+
+ private static Timestamp updateRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(
+ Mutation.newUpdateBuilder(changeStreamTableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to("Updated First Name " + singerId)
+ .set("LastName")
+ .to("Updated Last Name " + singerId)
+ .build()),
+ Options.tag("app=beam;action=update"))
+ .getCommitTimestamp();
+ }
+
+ private static Timestamp deleteRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(Mutation.delete(changeStreamTableName, Key.of(singerId))),
+ Options.tag("app=beam;action=delete"))
+ .getCommitTimestamp();
+ }
+
+ private static class ModsToString extends DoFn {
+
+ private transient Gson gson;
+
+ @Setup
+ public void setup() {
+ gson = new Gson();
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element DataChangeRecord record, OutputReceiver outputReceiver) {
+ final Mod mod = record.getMods().get(0);
+ final Map keys = gson.fromJson(mod.getKeysJson(), Map.class);
+ final Map oldValues =
+ Optional.ofNullable(mod.getOldValuesJson())
+ .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+ final Map newValues =
+ Optional.ofNullable(mod.getNewValuesJson())
+ .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+
+ final String modsAsString =
+ String.join(
+ ",",
+ record.getModType().toString(),
+ keys.get("SingerId"),
+ oldValues.get("FirstName"),
+ oldValues.get("LastName"),
+ newValues.get("FirstName"),
+ newValues.get("LastName"));
+ final Instant timestamp = new Instant(record.getRecordTimestamp().toSqlTimestamp());
+
+ outputReceiver.outputWithTimestamp(modsAsString, timestamp);
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index b3dd1bef049f..eefa125a1617 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -1039,4 +1039,109 @@ public void testMappingProtoRowToDataChangeRecord() {
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}
+
+ @Test
+ public void testMappingProtoBytesRowToPartitionStartRecord() {
+ final PartitionStartRecord partitionStartRecord =
+ new PartitionStartRecord(
+ Timestamp.MIN_VALUE,
+ "fakeRecordSequence",
+ Arrays.asList("partitionToken1", "partitionToken2"),
+ null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(partitionStartRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(partitionStartRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToPartitionEndRecord() {
+ final PartitionEndRecord partitionEndChange =
+ new PartitionEndRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(partitionEndChange);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(partitionEndChange),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToPartitionEventRecord() {
+ final PartitionEventRecord partitionEventRecord =
+ new PartitionEventRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(partitionEventRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(partitionEventRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToHeartbeatRecord() {
+ final HeartbeatRecord heartbeatRecord =
+ new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(heartbeatRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(heartbeatRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingProtoBytesRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "serverTransactionId",
+ true,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L),
+ new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)),
+ Collections.singletonList(
+ new Mod(
+ "{\"column1\":\"value1\"}",
+ "{\"column2\":\"oldValue2\"}",
+ "{\"column2\":\"newValue2\"}")),
+ ModType.UPDATE,
+ ValueCaptureType.OLD_AND_NEW_VALUES,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
+ recordToProto(dataChangeRecord);
+ assertNotNull(changeStreamRecordProto);
+ ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+
+ when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
+ when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
+ }
}