Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ lazy val data = (project in file("."))

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion cross CrossVersion.for3Use2_13,
"ch.qos.logback" % "logback-classic" % "1.1.3",
"ch.qos.logback" % "logback-classic" % "1.4.14",
"io.sentry" % "sentry" % sentryVersion,
"io.sentry" % "sentry-logback" % sentryVersion,
"eu.icoscp" %% "georestheart" % "0.1.1",
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="se.lu.nateko.cp.data.RestheartEtagWarningFilter"/>
<encoder>
<pattern>%date{ISO8601} %-5level [%thread] %X{akkaSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%thread] %X{akkaSource} - %msg %replace(%kvp{}){'(.+)', '[$1]'}%n</pattern>
</encoder>
</appender>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import org.slf4j.LoggerFactory
import se.lu.nateko.cp.data.EtcFacadeConfig
import se.lu.nateko.cp.data.api.ChecksumError
import se.lu.nateko.cp.data.api.CpDataException
Expand All @@ -18,6 +19,7 @@ import se.lu.nateko.cp.data.api.dataFail
import se.lu.nateko.cp.data.formats.TimeSeriesStreams
import se.lu.nateko.cp.data.formats.zip
import se.lu.nateko.cp.data.services.upload.UploadResult
import se.lu.nateko.cp.data.services.upload.UploadAlreadyInProgress
import se.lu.nateko.cp.data.services.upload.UploadService
import se.lu.nateko.cp.data.streams.DigestFlow
import se.lu.nateko.cp.data.streams.ZipEntryFlow
Expand Down Expand Up @@ -82,6 +84,7 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma

private val metaClient = upload.meta
private val log = upload.log
private val structuredLog = LoggerFactory.getLogger(getClass)

Files.createDirectories(Paths.get(config.folder))

Expand Down Expand Up @@ -202,10 +205,17 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma
.flatMap{etcMeta =>
val srcPath = getObjectSource(fn.station, etcMeta.hashSum)
Files.move(file, srcPath, REPLACE_EXISTING)
uploadDataObject(srcPath, fn.station, etcMeta.hashSum)
uploadDataObject(srcPath, fn.station, etcMeta.hashSum, UploadSource.Live)
}

private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum): Future[Done] = upload
private def uploadDataObject(srcPath: Path, station: StationId, hash: Sha256Sum, source: UploadSource): Future[Done] =
structuredLog.atInfo()
.addKeyValue("source", source.name)
.addKeyValue("station", station.id)
.addKeyValue("hash", hash.id)
.addKeyValue("path", srcPath.getFileName)
.log("ETC facade internal upload attempt")
upload
.getEtcSink(hash)
.flatMap(FileIO.fromPath(srcPath).runWith)
.flatMap{res =>
Expand All @@ -216,12 +226,15 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma
}
.transform(
ok => {Files.delete(srcPath); ok},
err => new Exception(s"ETC facade failure during internal object upload. Station $station, object $hash", err)
{
case err: UploadAlreadyInProgress => err
case err => new Exception(s"ETC facade failure during internal object upload. source=${source.name} station=$station, object $hash", err)
}
)

private[etcfacade] def uploadDataObjectHandleErrors(station: StationId, hash: Sha256Sum): Future[Done] =
val srcPath = getObjectSource(station, hash)
uploadDataObject(srcPath, station, hash).andThen(
uploadDataObject(srcPath, station, hash, UploadSource.Retry).andThen(
handleErrors(hash.base64Url)
)

Expand Down Expand Up @@ -258,6 +271,8 @@ class FacadeService(val config: EtcFacadeConfig, upload: UploadService)(using ma
}

private def handleErrors(uploadedObj: String): PartialFunction[Try[Done], Unit] =
case Failure(_: UploadAlreadyInProgress) =>
log.info(s"ETC facade upload for $uploadedObj is already in progress, skipping duplicate attempt")
case Failure(err) =>
appendError(s"Error while uploading $uploadedObj : " + UploadResult.extractMessage(err))
log.error(err, s"ETC facade error while uploading $uploadedObj")
Expand All @@ -273,6 +288,10 @@ object FacadeService:
val ForceEcUploadTime = LocalTime.of(4, 0) //is to be interpreted as UTC time
val OldFileMaxAge = Duration.ofDays(30)

private enum UploadSource(val name: String):
case Live extends UploadSource("live")
case Retry extends UploadSource("retry")

def getUploadMeta(file: EtcFilename, hashSum: Sha256Sum) = EtcUploadMetadata(
hashSum = hashSum,
fileName = file.toString,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package se.lu.nateko.cp.data.services.upload

import akka.Done
import se.lu.nateko.cp.data.api.CpDataException
import se.lu.nateko.cp.meta.core.crypto.Sha256Sum

import scala.collection.mutable.Set
import scala.util.Failure
import scala.util.Success
import scala.util.Try

private[upload] class ObjectLock(msg: String) {
private[upload] class ObjectLock {

private[this] val locked = Set.empty[Sha256Sum]

def lock(hash: Sha256Sum): Try[Done] = synchronized{
if(locked.contains(hash)) Failure(new CpDataException(s"Object ${hash.id} $msg"))
if(locked.contains(hash)) Failure(UploadAlreadyInProgress(hash.id))
else{
locked.add(hash)
Success(Done)
Expand All @@ -26,3 +25,6 @@ private[upload] class ObjectLock(msg: String) {
Done
}
}

final case class UploadAlreadyInProgress(objectId: String)
extends Exception(s"Object $objectId is currently already being uploaded")
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class UploadService(config: UploadConfig, netcdfConf: NetCdfConfig, val meta: Me
val folder = File(config.folder)
private val readonlyFolder: Option[File] = config.readonlyFolder.map(new File(_))

private[this] val objLock = new ObjectLock("is currently already being uploaded")
private[this] val objLock = new ObjectLock

if(!folder.exists) {
assert(folder.mkdirs(), "Failed to create directory " + folder.getAbsolutePath)
Expand Down
Loading