Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ public FileSourceRetryHandler(long initialDuration, long totalDuration) {
@Override
protected void checkIfShouldRetryException(GorException e) {
if (e.getMessage().contains("Stale file handle")) {
// Stale file handle errors generally require retries on a higher level as the
// file needs to be reopened.
throw new GorResourceException("Stale file handle", "", e);
// We generally use RetryInputStream (which reopens the stream on retry),
} else if (e.getCause() instanceof FileNotFoundException fe) {
throw GorResourceException.fromIOException(fe, "");
throw e;
} else if (e.getCause() instanceof FileSystemException fe) {
throw GorResourceException.fromIOException(fe, "");
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,9 @@ public int read(byte[] b, int off, int len) throws IOException {
try {
return super.read(b, off, len);
} catch (IOException e) {
StreamUtils.tryClose(in);
in = reopen();
throw GorResourceException.fromIOException(e, "").retry();
throw GorResourceException.fromIOException(e, getPath()).retry();
}
});
}, this::reopen);
}

@Override
Expand All @@ -183,22 +181,21 @@ public long skip(long n) throws IOException {
try {
return super.skip(n);
} catch (IOException e) {
StreamUtils.tryClose(in);
in = reopen();
throw GorResourceException.fromIOException(e, "").retry();
throw GorResourceException.fromIOException(e, getPath()).retry();
}
});
}, this::reopen);
}

/**
* NB: If reopening the stream fails - it is not retried.
*/
private InputStream reopen() {
private void reopen() {
StreamUtils.tryClose(in);
// Need to open it using the outer super class open (and be careful NOT to warp it again)
if (length == null) {
return RetryStreamSourceWrapper.super.open(start + getPosition());
in = RetryStreamSourceWrapper.super.open(start + getPosition());
} else {
return RetryStreamSourceWrapper.super.open(start + getPosition(), length - getPosition());
in = RetryStreamSourceWrapper.super.open(start + getPosition(), length - getPosition());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,17 @@

public abstract class RetryHandlerBase {

public abstract <T> T perform(Action<T> action);
public <T> T perform(Action<T> action) {
return perform(action, null);
}

public void perform(ActionVoid action) {
perform(action, null);
}

public abstract <T> T perform(Action<T> action, ActionVoid preRetryOp);

public abstract void perform(ActionVoid action);
public abstract void perform(ActionVoid action, ActionVoid preRetryOp);

public interface Action<T> {
T perform();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public RetryHandlerWithFixedRetries(long retryInitialSleepMs, long retryMaxSleep
}

@Override
public <T> T perform(Action<T> action) {
public <T> T perform(Action<T> action, ActionVoid preRetryOp) {
int tries = 0;
long sleepMs = 0;
Throwable lastException = null;
Expand All @@ -70,13 +70,17 @@ public <T> T perform(Action<T> action) {
sleepMs = sleep(retries, tries, sleepMs, e);

log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e);

if (preRetryOp != null) {
preRetryOp.perform();
}
}
}
throw new GorSystemException("Giving up after " + tries + " retries", lastException);
}

@Override
public void perform(ActionVoid action) {
public void perform(ActionVoid action, ActionVoid preRetryOp) {
int tries = 0;
long sleepMs = 0;
Throwable lastException = null;
Expand All @@ -93,6 +97,10 @@ public void perform(ActionVoid action) {
sleepMs = sleep(retries, tries, sleepMs, e);

log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e);

if (preRetryOp != null) {
preRetryOp.perform();
}
}
}
throw new GorSystemException("Giving up after " + tries + " tries.", lastException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public RetryHandlerWithFixedWait(long initialDuration, long totalDuration) {

protected Random rand = new Random();

public <T> T perform(Action<T> action) {
public <T> T perform(Action<T> action, ActionVoid preRetryOp) {
assert initialDuration <= totalDuration;

int tries = 0;
Expand All @@ -40,6 +40,10 @@ public <T> T perform(Action<T> action) {
accumulatedDuration += sleep(e, tries, initialDuration);

log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e);

if (preRetryOp != null) {
preRetryOp.perform();
}
}
}

Expand All @@ -49,7 +53,7 @@ public <T> T perform(Action<T> action) {
);
}

public void perform(ActionVoid action) {
public void perform(ActionVoid action, ActionVoid preRetryOp) {
assert initialDuration <= totalDuration;

int tries = 0;
Expand All @@ -69,6 +73,10 @@ public void perform(ActionVoid action) {
accumulatedDuration += sleep(e, tries, initialDuration);

log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e);

if (preRetryOp != null) {
preRetryOp.perform();
}
}
}

Expand Down
Loading