From c30e3ca1409d6f6308c72b82f65c4f4fd4e03000 Mon Sep 17 00:00:00 2001 From: Gisli Magnusson Date: Mon, 12 May 2025 16:27:11 +0000 Subject: [PATCH 1/3] fix(ENGKNOW-2388): Fix S3 retry on openRequest and create meta data. Minor refactorings, better error messages. --- .../java/org/gorpipe/s3/driver/S3RetryHandler.java | 3 +-- .../main/java/org/gorpipe/s3/driver/S3Source.java | 8 ++++---- .../driver/utils/RetryHandlerWithFixedRetries.java | 14 ++++++++++---- .../driver/utils/RetryHandlerWithFixedWait.java | 10 ++++++++-- .../model/gor/iterators/RefSeqFromConfig.scala | 6 +++--- 5 files changed, 26 insertions(+), 15 deletions(-) diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java index cf3f4d9e..1e0298c5 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java @@ -26,8 +26,7 @@ protected void checkIfShouldRetryException(GorException e) { var cause = ExceptionUtilities.getUnderlyingCause(e); - if (cause instanceof FileNotFoundException - || cause instanceof FileSystemException) { + if (cause instanceof FileNotFoundException || cause instanceof FileSystemException) { throw e; } else if (cause instanceof S3Exception awsException) { var detail = awsException.getMessage(); diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java index e7f09aeb..5683fa78 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java @@ -41,7 +41,7 @@ import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; import org.gorpipe.gor.table.util.PathUtils; import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; @@ -186,7 +186,7 @@ private InputStream openRequest(GetObjectRequest request) { try { return new AbortingInputStream(client.getObject(request), request); - } catch (SdkClientException e) { + } catch (SdkException e) { throw new GorResourceException("Failed to open S3 object: " + sourceReference.getUrl(), getPath().toString(), e).retry(); } } @@ -211,7 +211,7 @@ private S3SourceMetadata createMetaData(String bucket, String key) { } else { try { objectMetaResponse = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()); - } catch (SdkClientException e) { + } catch (SdkException e) { throw new GorResourceException("Failed to load metadata for " + bucket + "/" + key, getPath().toString(), e).retry(); } } @@ -302,7 +302,7 @@ public String createDirectory(FileAttribute... attrs) { public String createDirectories(FileAttribute... attrs) { // Files.createDirectory needs elevated access to list all buckets. // try { -// return PathUtils.formatUri(Files.createDirectories(getPath()).toUri()); +// œ1 return PathUtils.formatUri(Files.createDirectories(getPath()).toUri()); // } catch (IOException e) { // throw GorResourceException.fromIOException(e, getPath()).retry(); // } diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java index d7149667..6b274385 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java @@ -63,10 +63,13 @@ public T perform(Action action) { return action.perform(); } catch (GorRetryException e) { if (!e.isRetry()) throw e; + checkIfShouldRetryException(e); + tries++; lastException = e; - checkIfShouldRetryException(e); sleepMs = sleep(retries, tries, sleepMs, e); + + log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e); } } throw new GorSystemException("Giving up after " + tries + " retries", lastException); @@ -75,7 +78,7 @@ public T perform(Action action) { @Override public void perform(ActionVoid action) { int tries = 0; - long lastSleepMs = 0; + long sleepMs = 0; Throwable lastException = null; while (tries <= retries) { try { @@ -83,10 +86,13 @@ public void perform(ActionVoid action) { return; } catch (GorRetryException e) { if (!e.isRetry()) throw e; + checkIfShouldRetryException(e); + tries++; lastException = e; - checkIfShouldRetryException(e); - lastSleepMs = sleep(retries, tries, lastSleepMs, e); + sleepMs = sleep(retries, tries, sleepMs, e); + + log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e); } } throw new GorSystemException("Giving up after " + tries + " tries.", lastException); diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java index 46ad3166..34495e8e 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java @@ -33,10 +33,13 @@ public T perform(Action action) { return action.perform(); } catch (GorRetryException e) { if (!e.isRetry()) throw e; + checkIfShouldRetryException(e); + tries++; lastException = e; - checkIfShouldRetryException(e); accumulatedDuration += sleep(e, tries, initialDuration); + + log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e); } } @@ -59,10 +62,13 @@ public void perform(ActionVoid action) { return; } catch (GorRetryException e) { if (!e.isRetry()) throw e; + checkIfShouldRetryException(e); + tries++; lastException = e; - checkIfShouldRetryException(e); accumulatedDuration += sleep(e, tries, initialDuration); + + log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e); } } diff --git a/model/src/main/scala/org/gorpipe/model/gor/iterators/RefSeqFromConfig.scala b/model/src/main/scala/org/gorpipe/model/gor/iterators/RefSeqFromConfig.scala index 4d48d671..e38c43fd 100644 --- a/model/src/main/scala/org/gorpipe/model/gor/iterators/RefSeqFromConfig.scala +++ b/model/src/main/scala/org/gorpipe/model/gor/iterators/RefSeqFromConfig.scala @@ -131,7 +131,7 @@ class RefSeqFromConfig(ipath : String, fileReader : FileReader) extends RefSeq { if( !f.isPresent ) { if (!notfoundmap.contains(chrFilePath)) { notfoundmap.add(chrFilePath) - log.info("Warning: Reference build " + path + "\n\nReference file "+chrFilePath+" does not exist", chrFilePath) + log.warn("Reference build " + path + "\n\nReference file "+chrFilePath+" does not exist", chrFilePath) } 'N' } else { @@ -140,7 +140,7 @@ class RefSeqFromConfig(ipath : String, fileReader : FileReader) extends RefSeq { val l = f.get().read(buff, 0, buffLength) lufo.addObject(buffKey, buff) if( l == -1 ) { - log.info("Trying to read "+chr+":"+pos+" from reference file " + chrFilePath + " of length "+f.get.length()+" from offset " + offset) + log.warn("Trying to read "+chr+":"+pos+" from reference file " + chrFilePath + " of length "+f.get.length()+" from offset " + offset) return 'N' } refByteToChar(buff(pos - offset - 1)) @@ -150,7 +150,7 @@ class RefSeqFromConfig(ipath : String, fileReader : FileReader) extends RefSeq { case ioex: IOException => throw new GorResourceException("Reference build " + path + " inaccessible", path, ioex) case ex: Exception => { - log.warn("Warning: Reference build " + path + "\n\n"+ex.getMessage) + log.warn(String.format("Returning 'N' for reference build %s (%s:%d)", path, chr, pos), ex) } 'N' } From 0180f5d0a76c3d91eb551761e8e3ef29a7172e98 Mon Sep 17 00:00:00 2001 From: Gisli Magnusson Date: Mon, 12 May 2025 21:50:01 +0000 Subject: [PATCH 2/3] fix(ENGKNOW-2388): Fix S3 retry on openRequest and create meta data. Minor refactorings, better error messages. --- .../src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java index 1e0298c5..e304c4a2 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java @@ -36,8 +36,8 @@ protected void checkIfShouldRetryException(GorException e) { throw new GorResourceException(String.format("Unauthorized. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); } else if (awsException.statusCode() == 403) { throw new GorResourceException(String.format("Access Denied. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); - } else if (awsException.statusCode() == 404) { - throw new GorResourceException(String.format("Not Found. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); +// } else if (awsException.statusCode() == 404) { +// throw new GorResourceException(String.format("Not Found. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); } } else if (cause instanceof SdkClientException) { throw new GorResourceException("Amazon SDK client exception", path, e); From 5b6dce026ea4f378acfc54aa0d6601c4d94a86fc Mon Sep 17 00:00:00 2001 From: Gisli Magnusson Date: Tue, 1 Jul 2025 09:47:58 +0000 Subject: [PATCH 3/3] fix(ENGKNOW-2388): Tune S3Source exception handling. --- .../exceptions/GorSystemException.java | 31 ------------------- .../gorpipe/s3/driver/S3Configuration.java | 2 +- .../org/gorpipe/s3/driver/S3RetryHandler.java | 4 +-- .../java/org/gorpipe/s3/driver/S3Source.java | 9 +++--- .../gor/driver/PluggableGorDriver.java | 9 +++--- 5 files changed, 12 insertions(+), 43 deletions(-) diff --git a/base/src/main/java/org/gorpipe/exceptions/GorSystemException.java b/base/src/main/java/org/gorpipe/exceptions/GorSystemException.java index c0da4dd0..4115406a 100644 --- a/base/src/main/java/org/gorpipe/exceptions/GorSystemException.java +++ b/base/src/main/java/org/gorpipe/exceptions/GorSystemException.java @@ -64,35 +64,4 @@ public GorSystemException(String message, Throwable cause) { } - @Override - public String toString() { - return this.getCause() != null ? this.getCause().toString() : super.toString(); - } - - @Override - public void printStackTrace(PrintStream s) { - if (this.getCause() != null) { - this.getCause().printStackTrace(s); - } else { - super.printStackTrace(s); - } - } - - @Override - public void printStackTrace(PrintWriter s) { - if (this.getCause() != null) { - this.getCause().printStackTrace(s); - } else { - super.printStackTrace(s); - } - } - - @Override - public StackTraceElement[] getStackTrace() { - return this.getCause() != null ? this.getCause().getStackTrace() : super.getStackTrace(); - } - - // Note: Will not overwrite setStackTrace and fillInStackTrace, and calling those will not have any - // affect on the reported stacktrace (which will always be from the underlying exception). - // If you wish to change the stacktrace, change the underlying exception. } diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3Configuration.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3Configuration.java index dc47bb37..aa9fff35 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3Configuration.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3Configuration.java @@ -49,7 +49,7 @@ public interface S3Configuration extends Config { @Documentation("S3 max driver retry") @Key("gor.s3.conn.retries") - @DefaultValue("3") + @DefaultValue("0") int connectionRetries(); @Documentation("S3 validate after inactivity (millis)") diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java index e304c4a2..1e0298c5 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3RetryHandler.java @@ -36,8 +36,8 @@ protected void checkIfShouldRetryException(GorException e) { throw new GorResourceException(String.format("Unauthorized. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); } else if (awsException.statusCode() == 403) { throw new GorResourceException(String.format("Access Denied. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); -// } else if (awsException.statusCode() == 404) { -// throw new GorResourceException(String.format("Not Found. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); + } else if (awsException.statusCode() == 404) { + throw new GorResourceException(String.format("Not Found. Detail: %s. Original message: %s", detail, e.getMessage()), path, e); } } else if (cause instanceof SdkClientException) { throw new GorResourceException("Amazon SDK client exception", path, e); diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java index 5683fa78..827cc70e 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java @@ -180,11 +180,10 @@ private InputStream openWithFileSystem(RequestRange range) { } private InputStream openRequest(GetObjectRequest request) { - if (asyncClient != null) { - return new AbortingInputStream(asyncClient.getObject(request, AsyncResponseTransformer.toBlockingInputStream()).join(), request); - } - try { + if (asyncClient != null) { + return new AbortingInputStream(asyncClient.getObject(request, AsyncResponseTransformer.toBlockingInputStream()).join(), request); + } return new AbortingInputStream(client.getObject(request), request); } catch (SdkException e) { throw new GorResourceException("Failed to open S3 object: " + sourceReference.getUrl(), getPath().toString(), e).retry(); @@ -302,7 +301,7 @@ public String createDirectory(FileAttribute... attrs) { public String createDirectories(FileAttribute... attrs) { // Files.createDirectory needs elevated access to list all buckets. // try { -// œ1 return PathUtils.formatUri(Files.createDirectories(getPath()).toUri()); +// return PathUtils.formatUri(Files.createDirectories(getPath()).toUri()); // } catch (IOException e) { // throw GorResourceException.fromIOException(e, getPath()).retry(); // } diff --git a/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java b/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java index 5db135af..bd69f40b 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java +++ b/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java @@ -315,16 +315,17 @@ public GorDriverConfig config() { } private void throwWithSourceName(Exception e, String sourcename) { + + if (e instanceof GorException) { + throw (GorException)e; + } + if (sourcename == null) { throw new GorResourceException("Gor driver sourcename is null", "", e); } else if (e.getMessage() == null) { throw new GorSystemException("Gor driver - message is null", e); } - if (e instanceof GorException) { - throw (GorException)e; - } - throw new GorResourceException("Cannot create iterator for datasource: " + sourcename + " Cause: " + e.getMessage(), sourcename, e); }