diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/file/FileSourceRetryHandler.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/file/FileSourceRetryHandler.java index e35a7f88..4d307e0f 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/file/FileSourceRetryHandler.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/file/FileSourceRetryHandler.java @@ -15,13 +15,11 @@ public FileSourceRetryHandler(long initialDuration, long totalDuration) { @Override protected void checkIfShouldRetryException(GorException e) { if (e.getMessage().contains("Stale file handle")) { - // Stale file handle errors generally require retries on a higher level as the - // file needs to be reopened. - throw new GorResourceException("Stale file handle", "", e); + // We generally use RetryInputStream (which reopens the stream on retry), } else if (e.getCause() instanceof FileNotFoundException fe) { - throw GorResourceException.fromIOException(fe, ""); + throw e; } else if (e.getCause() instanceof FileSystemException fe) { - throw GorResourceException.fromIOException(fe, ""); + throw e; } } } diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/RetryStreamSourceWrapper.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/RetryStreamSourceWrapper.java index 69c952ef..0eb1c1da 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/RetryStreamSourceWrapper.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/RetryStreamSourceWrapper.java @@ -170,11 +170,9 @@ public int read(byte[] b, int off, int len) throws IOException { try { return super.read(b, off, len); } catch (IOException e) { - StreamUtils.tryClose(in); - in = reopen(); - throw GorResourceException.fromIOException(e, "").retry(); + throw GorResourceException.fromIOException(e, getPath()).retry(); } - }); + }, this::reopen); } @Override @@ -183,22 +181,21 @@ public long skip(long n) throws IOException { try { return super.skip(n); } catch (IOException e) { - StreamUtils.tryClose(in); - in = reopen(); - throw GorResourceException.fromIOException(e, "").retry(); + throw GorResourceException.fromIOException(e, getPath()).retry(); } - }); + }, this::reopen); } /** * NB: If reopening the stream fails - it is not retried. */ - private InputStream reopen() { + private void reopen() { + StreamUtils.tryClose(in); // Need to open it using the outer super class open (and be careful NOT to warp it again) if (length == null) { - return RetryStreamSourceWrapper.super.open(start + getPosition()); + in = RetryStreamSourceWrapper.super.open(start + getPosition()); } else { - return RetryStreamSourceWrapper.super.open(start + getPosition(), length - getPosition()); + in = RetryStreamSourceWrapper.super.open(start + getPosition(), length - getPosition()); } } diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerBase.java b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerBase.java index 85bb7e24..c1cbf1b7 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerBase.java +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerBase.java @@ -9,9 +9,17 @@ public abstract class RetryHandlerBase { - public abstract T perform(Action action); + public T perform(Action action) { + return perform(action, null); + } + + public void perform(ActionVoid action) { + perform(action, null); + } + + public abstract T perform(Action action, ActionVoid preRetryOp); - public abstract void perform(ActionVoid action); + public abstract void perform(ActionVoid action, ActionVoid preRetryOp); public interface Action { T perform(); diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java index 6b274385..e7d48fdc 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedRetries.java @@ -54,7 +54,7 @@ public RetryHandlerWithFixedRetries(long retryInitialSleepMs, long retryMaxSleep } @Override - public T perform(Action action) { + public T perform(Action action, ActionVoid preRetryOp) { int tries = 0; long sleepMs = 0; Throwable lastException = null; @@ -70,13 +70,17 @@ public T perform(Action action) { sleepMs = sleep(retries, tries, sleepMs, e); log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e); + + if (preRetryOp != null) { + preRetryOp.perform(); + } } } throw new GorSystemException("Giving up after " + tries + " retries", lastException); } @Override - public void perform(ActionVoid action) { + public void perform(ActionVoid action, ActionVoid preRetryOp) { int tries = 0; long sleepMs = 0; Throwable lastException = null; @@ -93,6 +97,10 @@ public void perform(ActionVoid action) { sleepMs = sleep(retries, tries, sleepMs, e); log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e); + + if (preRetryOp != null) { + preRetryOp.perform(); + } } } throw new GorSystemException("Giving up after " + tries + " tries.", lastException); diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java index 34495e8e..70da70f3 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/RetryHandlerWithFixedWait.java @@ -21,7 +21,7 @@ public RetryHandlerWithFixedWait(long initialDuration, long totalDuration) { protected Random rand = new Random(); - public T perform(Action action) { + public T perform(Action action, ActionVoid preRetryOp) { assert initialDuration <= totalDuration; int tries = 0; @@ -40,6 +40,10 @@ public T perform(Action action) { accumulatedDuration += sleep(e, tries, initialDuration); log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e); + + if (preRetryOp != null) { + preRetryOp.perform(); + } } } @@ -49,7 +53,7 @@ public T perform(Action action) { ); } - public void perform(ActionVoid action) { + public void perform(ActionVoid action, ActionVoid preRetryOp) { assert initialDuration <= totalDuration; int tries = 0; @@ -69,6 +73,10 @@ public void perform(ActionVoid action) { accumulatedDuration += sleep(e, tries, initialDuration); log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e); + + if (preRetryOp != null) { + preRetryOp.perform(); + } } }