Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions base/src/main/java/org/gorpipe/exceptions/GorSystemException.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,4 @@ public GorSystemException(String message, Throwable cause) {
}


@Override
public String toString() {
return this.getCause() != null ? this.getCause().toString() : super.toString();
}

@Override
public void printStackTrace(PrintStream s) {
if (this.getCause() != null) {
this.getCause().printStackTrace(s);
} else {
super.printStackTrace(s);
}
}

@Override
public void printStackTrace(PrintWriter s) {
if (this.getCause() != null) {
this.getCause().printStackTrace(s);
} else {
super.printStackTrace(s);
}
}

@Override
public StackTraceElement[] getStackTrace() {
return this.getCause() != null ? this.getCause().getStackTrace() : super.getStackTrace();
}

// Note: Will not overwrite setStackTrace and fillInStackTrace, and calling those will not have any
// affect on the reported stacktrace (which will always be from the underlying exception).
// If you wish to change the stacktrace, change the underlying exception.
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface S3Configuration extends Config {

@Documentation("S3 max driver retry")
@Key("gor.s3.conn.retries")
@DefaultValue("3")
@DefaultValue("0")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no retry ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are retrying using the GOR driver at least 3 times, so if we include the S3 driver retries, it is just too much. Using the GOR Driver retry gives better control over how and when we retry.

int connectionRetries();

@Documentation("S3 validate after inactivity (millis)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ protected void checkIfShouldRetryException(GorException e) {

var cause = ExceptionUtilities.getUnderlyingCause(e);

if (cause instanceof FileNotFoundException
|| cause instanceof FileSystemException) {
if (cause instanceof FileNotFoundException || cause instanceof FileSystemException) {
throw e;
} else if (cause instanceof S3Exception awsException) {
var detail = awsException.getMessage();
Expand Down
13 changes: 6 additions & 7 deletions drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.gorpipe.gor.driver.providers.stream.sources.StreamSource;
import org.gorpipe.gor.table.util.PathUtils;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -180,13 +180,12 @@ private InputStream openWithFileSystem(RequestRange range) {
}

private InputStream openRequest(GetObjectRequest request) {
if (asyncClient != null) {
return new AbortingInputStream(asyncClient.getObject(request, AsyncResponseTransformer.toBlockingInputStream()).join(), request);
}

try {
if (asyncClient != null) {
return new AbortingInputStream(asyncClient.getObject(request, AsyncResponseTransformer.toBlockingInputStream()).join(), request);
}
return new AbortingInputStream(client.getObject(request), request);
} catch (SdkClientException e) {
} catch (SdkException e) {
throw new GorResourceException("Failed to open S3 object: " + sourceReference.getUrl(), getPath().toString(), e).retry();
}
}
Expand All @@ -211,7 +210,7 @@ private S3SourceMetadata createMetaData(String bucket, String key) {
} else {
try {
objectMetaResponse = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build());
} catch (SdkClientException e) {
} catch (SdkException e) {
throw new GorResourceException("Failed to load metadata for " + bucket + "/" + key, getPath().toString(), e).retry();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,17 @@ public GorDriverConfig config() {
}

private void throwWithSourceName(Exception e, String sourcename) {

if (e instanceof GorException) {
throw (GorException)e;
}

if (sourcename == null) {
throw new GorResourceException("Gor driver sourcename is null", "", e);
} else if (e.getMessage() == null) {
throw new GorSystemException("Gor driver - message is null", e);
}

if (e instanceof GorException) {
throw (GorException)e;
}

throw new GorResourceException("Cannot create iterator for datasource: " + sourcename + " Cause: "
+ e.getMessage(), sourcename, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ public <T> T perform(Action<T> action) {
return action.perform();
} catch (GorRetryException e) {
if (!e.isRetry()) throw e;
checkIfShouldRetryException(e);

tries++;
lastException = e;
checkIfShouldRetryException(e);
sleepMs = sleep(retries, tries, sleepMs, e);

log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e);
}
}
throw new GorSystemException("Giving up after " + tries + " retries", lastException);
Expand All @@ -75,18 +78,21 @@ public <T> T perform(Action<T> action) {
@Override
public void perform(ActionVoid action) {
int tries = 0;
long lastSleepMs = 0;
long sleepMs = 0;
Throwable lastException = null;
while (tries <= retries) {
try {
action.perform();
return;
} catch (GorRetryException e) {
if (!e.isRetry()) throw e;
checkIfShouldRetryException(e);

tries++;
lastException = e;
checkIfShouldRetryException(e);
lastSleepMs = sleep(retries, tries, lastSleepMs, e);
sleepMs = sleep(retries, tries, sleepMs, e);

log.warn("Retrying gor action after " + sleepMs + "ms, retry " + tries, e);
}
}
throw new GorSystemException("Giving up after " + tries + " tries.", lastException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ public <T> T perform(Action<T> action) {
return action.perform();
} catch (GorRetryException e) {
if (!e.isRetry()) throw e;
checkIfShouldRetryException(e);

tries++;
lastException = e;
checkIfShouldRetryException(e);
accumulatedDuration += sleep(e, tries, initialDuration);

log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e);
}
}

Expand All @@ -59,10 +62,13 @@ public void perform(ActionVoid action) {
return;
} catch (GorRetryException e) {
if (!e.isRetry()) throw e;
checkIfShouldRetryException(e);

tries++;
lastException = e;
checkIfShouldRetryException(e);
accumulatedDuration += sleep(e, tries, initialDuration);

log.warn("Retrying gor action after " + accumulatedDuration + "ms, retry " + tries, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class RefSeqFromConfig(ipath : String, fileReader : FileReader) extends RefSeq {
if( !f.isPresent ) {
if (!notfoundmap.contains(chrFilePath)) {
notfoundmap.add(chrFilePath)
log.info("Warning: Reference build " + path + "\n\nReference file "+chrFilePath+" does not exist", chrFilePath)
log.warn("Reference build " + path + "\n\nReference file "+chrFilePath+" does not exist", chrFilePath)
}
'N'
} else {
Expand All @@ -140,7 +140,7 @@ class RefSeqFromConfig(ipath : String, fileReader : FileReader) extends RefSeq {
val l = f.get().read(buff, 0, buffLength)
lufo.addObject(buffKey, buff)
if( l == -1 ) {
log.info("Trying to read "+chr+":"+pos+" from reference file " + chrFilePath + " of length "+f.get.length()+" from offset " + offset)
log.warn("Trying to read "+chr+":"+pos+" from reference file " + chrFilePath + " of length "+f.get.length()+" from offset " + offset)
return 'N'
}
refByteToChar(buff(pos - offset - 1))
Expand All @@ -150,7 +150,7 @@ class RefSeqFromConfig(ipath : String, fileReader : FileReader) extends RefSeq {
case ioex: IOException =>
throw new GorResourceException("Reference build " + path + " inaccessible", path, ioex)
case ex: Exception => {
log.warn("Warning: Reference build " + path + "\n\n"+ex.getMessage)
log.warn(String.format("Returning 'N' for reference build %s (%s:%d)", path, chr, pos), ex)
}
'N'
}
Expand Down
Loading