Fix object is already being uploaded error#441
Conversation
ggVGc
left a comment
There was a problem hiding this comment.
Looks fine to me for solving the immediate concern, but as mentioned in comments there are maybe some things we could improve in the future.
A more general comment:
I think the implementation using a lock and exceptions for communicating expected errors like a duplicate upload is quite messy/wrong. Without having looked too close at the rest of the code, my expectation would be that an upload is attempted, and the result is either an error (because of an already existing upload, or something else) or a Future representing the expected success of the upload. The result of the Future could also be a type containing an error case, but unless it is coming from java, I don't think there should be exceptions or Try involved.
|
|
||
| private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum): Future[Done] = upload | ||
| private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum, source: String): Future[Done] = | ||
| log.info(s"ETC facade internal upload attempt source=$source station=${station.id} hash=${hash.id} path=${srcPath.getFileName}") |
There was a problem hiding this comment.
Looks like a good opportunity to add structured logging, as we did here: ICOS-Carbon-Portal/meta#338 (comment)
| val srcPath = getObjectSource(fn.station, etcMeta.hashSum) | ||
| Files.move(file, srcPath, REPLACE_EXISTING) | ||
| uploadDataObject(srcPath, fn.station, etcMeta.hashSum) | ||
| uploadDataObject(srcPath, fn.station, etcMeta.hashSum, "live") |
There was a problem hiding this comment.
I would prefer to use an enum instead of raw strings, mainly to easily get an overview of what sources exist from a single point of truth. Something like:
private enum UploadSource(val name: String) {
case Live extends UploadSource("live")
case Retry extends UploadSource("retry")
}|
|
||
| def lock(hash: Sha256Sum): Try[Done] = synchronized{ | ||
| if(locked.contains(hash)) Failure(new CpDataException(s"Object ${hash.id} $msg")) | ||
| if(locked.contains(hash)) Failure(new UploadAlreadyInProgress(s"Object ${hash.id} $msg")) |
There was a problem hiding this comment.
The $msg part here is weird to me, and I don't see why it should be coming from the outside (and we only set it to one value in one place). The whole point of this lock is to prevent duplicate uploads, so I think it could just be this instead:
private[upload] class ObjectLock() {
private[this] val locked = Set.empty[Sha256Sum]
def lock(hash: Sha256Sum): Try[Done] = synchronized{
if(locked.contains(hash)) Failure(UploadAlreadyInProgress(hash.id))
else{
locked.add(hash)
Success(Done)
}
}
def unlock(hash: Sha256Sum): Done = synchronized{
locked.remove(hash)
Done
}
}
final case class UploadAlreadyInProgress(objectId: String)
extends Exception(s"Object ${objectId} is currently already being uploaded")|
|
||
| class FacadeServiceTests extends AnyFunSpec: | ||
|
|
||
| describe("isAlreadyUploading"): |
There was a problem hiding this comment.
In my opinion, these tests are testing too specific implementation details. Preferably we would test the actual upload path, but as it looks now, that includes quite a lot of non-abstracted side-effects unfortunately, so would require some work.
| def isAlreadyUploading(err: Throwable): Boolean = err match | ||
| case null => false | ||
| case _: UploadAlreadyInProgress => true | ||
| case _ => isAlreadyUploading(err.getCause) |
There was a problem hiding this comment.
In this case, we are throwing away the information about the initial exception, I think? That's probably fine, but seems like a situation that should not happen, so would preferably not even be represented as a case that can be produced.
There was a problem hiding this comment.
Yes, that was unnecessarily complex.
| it("returns false for null"): | ||
| assert(!FacadeService.isAlreadyUploading(null)) |
There was a problem hiding this comment.
Is this testing a case that can happen, or is expected?
| appendError(s"Error while uploading $uploadedObj : " + UploadResult.extractMessage(err)) | ||
| log.error(err, s"ETC facade error while uploading $uploadedObj") | ||
| if FacadeService.isAlreadyUploading(err) then | ||
| log.info(s"ETC facade upload for $uploadedObj is already in progress, skipping duplicate attempt") |
There was a problem hiding this comment.
When/why does a duplicate upload actually happen? Is any error response reported?
There was a problem hiding this comment.
I don't know, but I hope that the improved log format will help better understand the issue.
This error is logged in the ETC facade log which is used by ETC to monitor upload issues caused by the data and metadata they provide us. However, this error comes from our code, ETC cannot do anything about it, and this change stops printing the error in this log as it is noise to them. It will still be logged as INFO in the
cpdatalog.I also added a source information to the log so we can know if an issue comes from an initial upload ("live") or a retry ("retry").