From bd62efb752eac5a2afcf2ae3373b311b39bd1da4 Mon Sep 17 00:00:00 2001 From: Jonathan Thiry Date: Thu, 7 May 2026 15:45:52 +0200 Subject: [PATCH 1/2] Fix object is already being uploaded error --- .../services/etcfacade/FacadeService.scala | 23 ++++++++++++++----- .../cp/data/services/upload/ObjectLock.scala | 5 ++-- .../etcfacade/FacadeServiceTests.scala | 22 ++++++++++++++++++ 3 files changed, 42 insertions(+), 8 deletions(-) create mode 100644 src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala diff --git a/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala b/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala index f165bde6c..228c4adb5 100644 --- a/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala +++ b/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala @@ -18,6 +18,7 @@ import se.lu.nateko.cp.data.api.dataFail import se.lu.nateko.cp.data.formats.TimeSeriesStreams import se.lu.nateko.cp.data.formats.zip import se.lu.nateko.cp.data.services.upload.UploadResult +import se.lu.nateko.cp.data.services.upload.UploadAlreadyInProgress import se.lu.nateko.cp.data.services.upload.UploadService import se.lu.nateko.cp.data.streams.DigestFlow import se.lu.nateko.cp.data.streams.ZipEntryFlow @@ -202,10 +203,12 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma .flatMap{etcMeta => val srcPath = getObjectSource(fn.station, etcMeta.hashSum) Files.move(file, srcPath, REPLACE_EXISTING) - uploadDataObject(srcPath, fn.station, etcMeta.hashSum) + uploadDataObject(srcPath, fn.station, etcMeta.hashSum, "live") } - private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum): Future[Done] = upload + private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum, source: String): Future[Done] = + log.info(s"ETC facade internal upload attempt source=$source station=${station.id} hash=${hash.id} path=${srcPath.getFileName}") + upload .getEtcSink(hash) .flatMap(FileIO.fromPath(srcPath).runWith) .flatMap{res => @@ -216,12 +219,12 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma } .transform( ok => {Files.delete(srcPath); ok}, - err => new Exception(s"ETC facade failure during internal object upload. Station $station, object $hash", err) + err => new Exception(s"ETC facade failure during internal object upload. source=$source station=$station, object $hash", err) ) private[etcfacade] def uploadDataObjectHandleErrors(station: StationId, hash: Sha256Sum): Future[Done] = val srcPath = getObjectSource(station, hash) - uploadDataObject(srcPath, station, hash).andThen( + uploadDataObject(srcPath, station, hash, "retry").andThen( handleErrors(hash.base64Url) ) @@ -259,8 +262,11 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma private def handleErrors(uploadedObj: String): PartialFunction[Try[Done], Unit] = case Failure(err) => - appendError(s"Error while uploading $uploadedObj : " + UploadResult.extractMessage(err)) - log.error(err, s"ETC facade error while uploading $uploadedObj") + if FacadeService.isAlreadyUploading(err) then + log.info(s"ETC facade upload for $uploadedObj is already in progress, skipping duplicate attempt") + else + appendError(s"Error while uploading $uploadedObj : " + UploadResult.extractMessage(err)) + log.error(err, s"ETC facade error while uploading $uploadedObj") end FacadeService @@ -355,4 +361,9 @@ object FacadeService: private def packageIsComplete(pack: DailyPackage): Boolean = pack.keysIterator.flatMap(_.slot).toSet.size == 48 + def isAlreadyUploading(err: Throwable): Boolean = err match + case null => false + case _: UploadAlreadyInProgress => true + case _ => isAlreadyUploading(err.getCause) + end FacadeService diff --git a/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala b/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala index 00c55bef0..a1410fa01 100644 --- a/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala +++ b/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala @@ -1,7 +1,6 @@ package se.lu.nateko.cp.data.services.upload import akka.Done -import se.lu.nateko.cp.data.api.CpDataException import se.lu.nateko.cp.meta.core.crypto.Sha256Sum import scala.collection.mutable.Set @@ -14,7 +13,7 @@ private[upload] class ObjectLock(msg: String) { private[this] val locked = Set.empty[Sha256Sum] def lock(hash: Sha256Sum): Try[Done] = synchronized{ - if(locked.contains(hash)) Failure(new CpDataException(s"Object ${hash.id} $msg")) + if(locked.contains(hash)) Failure(new UploadAlreadyInProgress(s"Object ${hash.id} $msg")) else{ locked.add(hash) Success(Done) @@ -26,3 +25,5 @@ private[upload] class ObjectLock(msg: String) { Done } } + +class UploadAlreadyInProgress(message: String) extends Exception(message) diff --git a/src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala b/src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala new file mode 100644 index 000000000..81cdfe87f --- /dev/null +++ b/src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala @@ -0,0 +1,22 @@ +package se.lu.nateko.cp.data.test.services.etcfacade + +import org.scalatest.funspec.AnyFunSpec +import se.lu.nateko.cp.data.services.etcfacade.FacadeService +import se.lu.nateko.cp.data.services.upload.UploadAlreadyInProgress + +class FacadeServiceTests extends AnyFunSpec: + + describe("isAlreadyUploading"): + + it("returns true for direct UploadAlreadyInProgress"): + assert(FacadeService.isAlreadyUploading(new UploadAlreadyInProgress("x"))) + + it("returns true for wrapped UploadAlreadyInProgress"): + val err = new Exception("outer", new Exception("inner", new UploadAlreadyInProgress("x"))) + assert(FacadeService.isAlreadyUploading(err)) + + it("returns false for unrelated exception"): + assert(!FacadeService.isAlreadyUploading(new RuntimeException("boom"))) + + it("returns false for null"): + assert(!FacadeService.isAlreadyUploading(null)) From 32cce00d83500163a7bc43c9c7805c175cf50934 Mon Sep 17 00:00:00 2001 From: Jonathan Thiry Date: Thu, 21 May 2026 13:46:58 +0200 Subject: [PATCH 2/2] Apply suggested improvements to ETC facade uploads --- build.sbt | 2 +- src/main/resources/logback.xml | 2 +- .../services/etcfacade/FacadeService.scala | 38 +++++++++++-------- .../cp/data/services/upload/ObjectLock.scala | 7 ++-- .../data/services/upload/UploadService.scala | 2 +- .../etcfacade/FacadeServiceTests.scala | 22 ----------- 6 files changed, 30 insertions(+), 43 deletions(-) delete mode 100644 src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala diff --git a/build.sbt b/build.sbt index ad136b4e6..ef8f1a191 100644 --- a/build.sbt +++ b/build.sbt @@ -82,7 +82,7 @@ lazy val data = (project in file(".")) libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-slf4j" % akkaVersion cross CrossVersion.for3Use2_13, - "ch.qos.logback" % "logback-classic" % "1.1.3", + "ch.qos.logback" % "logback-classic" % "1.4.14", "io.sentry" % "sentry" % sentryVersion, "io.sentry" % "sentry-logback" % sentryVersion, "eu.icoscp" %% "georestheart" % "0.1.1", diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 46d8fd1bb..a41d0e74a 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -3,7 +3,7 @@ - %date{ISO8601} %-5level [%thread] %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%thread] %X{akkaSource} - %msg %replace(%kvp{}){'(.+)', '[$1]'}%n diff --git a/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala b/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala index 228c4adb5..22585f1f8 100644 --- a/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala +++ b/src/main/scala/se/lu/nateko/cp/data/services/etcfacade/FacadeService.scala @@ -10,6 +10,7 @@ import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.util.ByteString +import org.slf4j.LoggerFactory import se.lu.nateko.cp.data.EtcFacadeConfig import se.lu.nateko.cp.data.api.ChecksumError import se.lu.nateko.cp.data.api.CpDataException @@ -83,6 +84,7 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma private val metaClient = upload.meta private val log = upload.log + private val structuredLog = LoggerFactory.getLogger(getClass) Files.createDirectories(Paths.get(config.folder)) @@ -203,11 +205,16 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma .flatMap{etcMeta => val srcPath = getObjectSource(fn.station, etcMeta.hashSum) Files.move(file, srcPath, REPLACE_EXISTING) - uploadDataObject(srcPath, fn.station, etcMeta.hashSum, "live") + uploadDataObject(srcPath, fn.station, etcMeta.hashSum, UploadSource.Live) } - private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum, source: String): Future[Done] = - log.info(s"ETC facade internal upload attempt source=$source station=${station.id} hash=${hash.id} path=${srcPath.getFileName}") + private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum, source: UploadSource): Future[Done] = + structuredLog.atInfo() + .addKeyValue("source", source.name) + .addKeyValue("station", station.id) + .addKeyValue("hash", hash.id) + .addKeyValue("path", srcPath.getFileName) + .log("ETC facade internal upload attempt") upload .getEtcSink(hash) .flatMap(FileIO.fromPath(srcPath).runWith) @@ -219,12 +226,15 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma } .transform( ok => {Files.delete(srcPath); ok}, - err => new Exception(s"ETC facade failure during internal object upload. source=$source station=$station, object $hash", err) + { + case err: UploadAlreadyInProgress => err + case err => new Exception(s"ETC facade failure during internal object upload. source=${source.name} station=$station, object $hash", err) + } ) private[etcfacade] def uploadDataObjectHandleErrors(station: StationId, hash: Sha256Sum): Future[Done] = val srcPath = getObjectSource(station, hash) - uploadDataObject(srcPath, station, hash, "retry").andThen( + uploadDataObject(srcPath, station, hash, UploadSource.Retry).andThen( handleErrors(hash.base64Url) ) @@ -261,12 +271,11 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma } private def handleErrors(uploadedObj: String): PartialFunction[Try[Done], Unit] = + case Failure(_: UploadAlreadyInProgress) => + log.info(s"ETC facade upload for $uploadedObj is already in progress, skipping duplicate attempt") case Failure(err) => - if FacadeService.isAlreadyUploading(err) then - log.info(s"ETC facade upload for $uploadedObj is already in progress, skipping duplicate attempt") - else - appendError(s"Error while uploading $uploadedObj : " + UploadResult.extractMessage(err)) - log.error(err, s"ETC facade error while uploading $uploadedObj") + appendError(s"Error while uploading $uploadedObj : " + UploadResult.extractMessage(err)) + log.error(err, s"ETC facade error while uploading $uploadedObj") end FacadeService @@ -279,6 +288,10 @@ object FacadeService: val ForceEcUploadTime = LocalTime.of(4, 0) //is to be interpreted as UTC time val OldFileMaxAge = Duration.ofDays(30) + private enum UploadSource(val name: String): + case Live extends UploadSource("live") + case Retry extends UploadSource("retry") + def getUploadMeta(file: EtcFilename, hashSum: Sha256Sum) = EtcUploadMetadata( hashSum = hashSum, fileName = file.toString, @@ -361,9 +374,4 @@ object FacadeService: private def packageIsComplete(pack: DailyPackage): Boolean = pack.keysIterator.flatMap(_.slot).toSet.size == 48 - def isAlreadyUploading(err: Throwable): Boolean = err match - case null => false - case _: UploadAlreadyInProgress => true - case _ => isAlreadyUploading(err.getCause) - end FacadeService diff --git a/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala b/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala index a1410fa01..0642d8ae4 100644 --- a/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala +++ b/src/main/scala/se/lu/nateko/cp/data/services/upload/ObjectLock.scala @@ -8,12 +8,12 @@ import scala.util.Failure import scala.util.Success import scala.util.Try -private[upload] class ObjectLock(msg: String) { +private[upload] class ObjectLock { private[this] val locked = Set.empty[Sha256Sum] def lock(hash: Sha256Sum): Try[Done] = synchronized{ - if(locked.contains(hash)) Failure(new UploadAlreadyInProgress(s"Object ${hash.id} $msg")) + if(locked.contains(hash)) Failure(UploadAlreadyInProgress(hash.id)) else{ locked.add(hash) Success(Done) @@ -26,4 +26,5 @@ private[upload] class ObjectLock(msg: String) { } } -class UploadAlreadyInProgress(message: String) extends Exception(message) +final case class UploadAlreadyInProgress(objectId: String) + extends Exception(s"Object $objectId is currently already being uploaded") diff --git a/src/main/scala/se/lu/nateko/cp/data/services/upload/UploadService.scala b/src/main/scala/se/lu/nateko/cp/data/services/upload/UploadService.scala index 78709d628..7f0eab706 100644 --- a/src/main/scala/se/lu/nateko/cp/data/services/upload/UploadService.scala +++ b/src/main/scala/se/lu/nateko/cp/data/services/upload/UploadService.scala @@ -44,7 +44,7 @@ class UploadService(config: UploadConfig, netcdfConf: NetCdfConfig, val meta: Me val folder = File(config.folder) private val readonlyFolder: Option[File] = config.readonlyFolder.map(new File(_)) - private[this] val objLock = new ObjectLock("is currently already being uploaded") + private[this] val objLock = new ObjectLock if(!folder.exists) { assert(folder.mkdirs(), "Failed to create directory " + folder.getAbsolutePath) diff --git a/src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala b/src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala deleted file mode 100644 index 81cdfe87f..000000000 --- a/src/test/scala/se/lu/nateko/cp/data/test/services/etcfacade/FacadeServiceTests.scala +++ /dev/null @@ -1,22 +0,0 @@ -package se.lu.nateko.cp.data.test.services.etcfacade - -import org.scalatest.funspec.AnyFunSpec -import se.lu.nateko.cp.data.services.etcfacade.FacadeService -import se.lu.nateko.cp.data.services.upload.UploadAlreadyInProgress - -class FacadeServiceTests extends AnyFunSpec: - - describe("isAlreadyUploading"): - - it("returns true for direct UploadAlreadyInProgress"): - assert(FacadeService.isAlreadyUploading(new UploadAlreadyInProgress("x"))) - - it("returns true for wrapped UploadAlreadyInProgress"): - val err = new Exception("outer", new Exception("inner", new UploadAlreadyInProgress("x"))) - assert(FacadeService.isAlreadyUploading(err)) - - it("returns false for unrelated exception"): - assert(!FacadeService.isAlreadyUploading(new RuntimeException("boom"))) - - it("returns false for null"): - assert(!FacadeService.isAlreadyUploading(null))