Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ class BeamModulePlugin implements Plugin<Project> {
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.26"
// [bomupgrader] determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.104.0"
def google_cloud_spanner_version = "6.107.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this version to be tracked at all now or can we remove this line?

def google_code_gson_version = "2.10.1"
def google_oauth_clients_version = "1.34.1"
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
Expand Down Expand Up @@ -763,8 +763,6 @@ class BeamModulePlugin implements Plugin<Project> {
// libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.74.0",
google_cloud_secret_manager : "com.google.cloud:google-cloud-secretmanager", // google_cloud_platform_libraries_bom sets version
// TODO(#35868) remove pinned google_cloud_spanner_bom after tests or upstream fixed
google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_cloud_tink : "com.google.crypto.tink:tink:1.19.0",
Expand Down
14 changes: 2 additions & 12 deletions sdks/java/bom/gcp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,7 @@ apply from: '../common.gradle'

dependencies {
api platform(project(":sdks:java:bom"))
api platform(project.library.java.google_cloud_spanner_bom)
api platform(project.library.java.google_cloud_platform_libraries_bom) {
// TODO(https://github.com/apache/beam/issues/37328) remove exclude and google_cloud_spanner_bom after upstream and/or tests fixed
exclude group: "com.google.cloud", module: "google-cloud-spanner"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1"
}
api platform(project.library.java.google_cloud_platform_libraries_bom)
constraints {
api project.library.java.guava
}
Expand All @@ -42,4 +32,4 @@ publishing {
artifactId = 'beam-sdks-java-google-cloud-platform-bom'
}
}
}
}
14 changes: 2 additions & 12 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,7 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform"
ext.summary = "IO library to read and write Google Cloud Platform systems from Beam."

dependencies {
implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom)) {
// TODO(https://github.com/apache/beam/issues/35868) remove exclude after upstream and/or tests fixed
exclude group: "com.google.cloud", module: "google-cloud-spanner"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1"
}
implementation(enforcedPlatform(library.java.google_cloud_spanner_bom))
implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom))
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(":runners:core-java")
implementation project(path: ":sdks:java:core", configuration: "shadow")
Expand Down Expand Up @@ -358,4 +348,4 @@ task postCommit {
description = "Integration tests of GCP connectors using the DirectRunner."
dependsOn integrationTest
dependsOn integrationTestKms
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
Expand All @@ -38,6 +39,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.options.ValueProvider;
Expand All @@ -61,6 +63,9 @@ public class SpannerAccessor implements AutoCloseable {
*/
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";

private static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION =
java.time.Duration.ofMinutes(5);

/** Instance ID to use when connecting to an experimental host. */
public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default";

Expand Down Expand Up @@ -113,6 +118,11 @@ public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) {
static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
SpannerOptions.Builder builder = SpannerOptions.newBuilder();

// TODO(https://github.com/apache/beam/issues/37451) Disable gRPC gcp extension which was
// causing the application thread to stall.
// Remove this once Spanner fixes the hanging issue
builder.disableGrpcGcpExtension();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the root cause known/is there a timeline for us to remove this? If so, could you please file a Beam issue tracking the fix and reference it in a TODO here?


Set<Code> retryableCodes = new HashSet<>();
if (spannerConfig.getRetryableCodes() != null) {
retryableCodes.addAll(spannerConfig.getRetryableCodes());
Expand Down Expand Up @@ -265,6 +275,15 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
builder.setCredentials(credentials.get());
}

ValueProvider<java.time.Duration> waitForSessionCreationDuration =
spannerConfig.getWaitForSessionCreationDuration();
java.time.Duration waitDuration =
Optional.ofNullable(waitForSessionCreationDuration)
.map(ValueProvider::get)
.orElse(DEFAULT_SESSION_WAIT_DURATION);
builder.setSessionPoolOption(
SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to the hangs?

Copy link
Author

@sakthivelmanii sakthivelmanii Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is related to DEADLINE EXCEEDED in dataflow workers. b/462499883


return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public String getHostValue() {

public abstract @Nullable ValueProvider<Credentials> getCredentials();

public abstract @Nullable ValueProvider<java.time.Duration> getWaitForSessionCreationDuration();

abstract Builder toBuilder();

public static SpannerConfig create() {
Expand Down Expand Up @@ -189,6 +191,9 @@ abstract Builder setExecuteStreamingSqlRetrySettings(

abstract Builder setPlainText(ValueProvider<Boolean> plainText);

abstract Builder setWaitForSessionCreationDuration(
ValueProvider<java.time.Duration> waitForSessionCreationDuration);

public abstract SpannerConfig build();
}

Expand Down Expand Up @@ -389,4 +394,22 @@ public SpannerConfig withUsingPlainTextChannel(ValueProvider<Boolean> plainText)
public SpannerConfig withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* @param waitForSessionCreationDuration
* @return {@link SpannerConfig}
* <p>Sets the wait time for multiplexed session to be available while creating a database
* client. Setting this will block the {@link com.google.cloud.spanner.DatabaseClient}
* creation. By default, We will be setting 5 mins as minimum wait time.
*/
public SpannerConfig withWaitForSessionCreationDuration(
ValueProvider<java.time.Duration> waitForSessionCreationDuration) {
return toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build();
}

public SpannerConfig withWaitForSessionCreationDuration(
java.time.Duration waitForSessionCreationDuration) {
return withWaitForSessionCreationDuration(
ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration));
}
}
Loading