Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package se.lu.nateko.cp.meta.services.sparql.magic

import scala.language.unsafeNulls

import org.eclipse.rdf4j.model.IRI
import org.eclipse.rdf4j.model.ValueFactory
import org.eclipse.rdf4j.sail.Sail
Expand Down Expand Up @@ -36,10 +34,10 @@ trait ObjSpecific{
}

trait ObjInfo extends ObjSpecific{
def spec: IRI
def submitter: IRI
def station: IRI
def site: IRI
def spec: IRI | Null
def submitter: IRI | Null
def station: IRI | Null
def site: IRI | Null
Comment thread
jonathanthiry marked this conversation as resolved.
def fileName: Option[String]
def sizeInBytes: Option[Long]
def samplingHeightMeters: Option[Float]
Expand All @@ -51,7 +49,7 @@ trait ObjInfo extends ObjSpecific{
}

class CpIndex(sail: Sail, geo: Future[GeoIndex], data: IndexData)(using EnvriConfigs) extends ReadWriteLocking:
private val log = LoggerFactory.getLogger(getClass())
private val log = LoggerFactory.getLogger(getClass()).nn
private val filtering = Filtering(data, geo)

import data.{contMap, stats, objs, initOk, idLookup}
Expand Down Expand Up @@ -89,7 +87,7 @@ class CpIndex(sail: Sail, geo: Future[GeoIndex], data: IndexData)(using EnvriCon
log.info("CpIndex got initialized with non-empty index data to use")
reportDebugInfo()

given factory: ValueFactory = sail.getValueFactory
given factory: ValueFactory = sail.getValueFactory.nn
val vocab = new CpmetaVocab(factory)

private val queue = new ArrayBlockingQueue[RdfUpdate](UpdateQueueSize)
Expand All @@ -100,11 +98,11 @@ class CpIndex(sail: Sail, geo: Future[GeoIndex], data: IndexData)(using EnvriCon
def fetch(req: DataObjectFetch): Iterator[ObjInfo] = readLocked{
//val start = System.currentTimeMillis

val filter = filtering(req.filter).fold(initOk)(BufferFastAggregation.and(_, initOk))
val filter = filtering(req.filter).fold(initOk)(BufferFastAggregation.and(_, initOk).nn)

val idxIter: Iterator[Int] = req.sort match{
case None =>
filter.iterator.asScala.drop(req.offset).map(_.intValue)
filter.iterator.nn.asScala.drop(req.offset).map(_.intValue)
case Some(SortBy(prop, descending)) =>
data.bitmap(prop).iterateSorted(Some(filter), req.offset, descending)
}
Expand All @@ -131,7 +129,7 @@ class CpIndex(sail: Sail, geo: Future[GeoIndex], data: IndexData)(using EnvriCon
}

def getUniqueKeywords(req: DataObjectFetch): Iterable[String] = readLocked {
val objectIds = filtering(req.filter).fold(initOk)(BufferFastAggregation.and(_, initOk))
val objectIds = filtering(req.filter).fold(initOk)(BufferFastAggregation.and(_, initOk).nn)
data.getObjectKeywords(objectIds)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package se.lu.nateko.cp.meta.services.sparql.magic

import scala.language.unsafeNulls

import org.eclipse.rdf4j.model.IRI
import org.roaringbitmap.buffer.{BufferFastAggregation, ImmutableRoaringBitmap, MutableRoaringBitmap}
import se.lu.nateko.cp.meta.services.sparql.index.*
Expand Down Expand Up @@ -34,7 +32,7 @@ class Filtering(data: IndexData, geo: Future[GeoIndex]) {

case Exists(prop) => prop match {
case cp: ContProp => Some(data.bitmap(cp).all)
case optUriProp: OptUriProperty =>
case optUriProp: OptUriProperty =>
// TODO: Not covered by tests, so unsure if it's correct right now.
data.categoryBitmap(optUriProp, Seq(None)) match {
case bm if bm.isEmpty => None
Expand All @@ -53,7 +51,7 @@ class Filtering(data: IndexData, geo: Future[GeoIndex]) {
.collect { case iri: IRI => iri }
.collect { case CpVocab.DataObject(hash, _) => idLookup.get(hash) }
.flatten
Some(ImmutableRoaringBitmap.bitmapOf(objIndices*))
Some(ImmutableRoaringBitmap.bitmapOf(objIndices*).nn)

case CategFilter(category, values) =>
Some(data.categoryBitmap(category, values))
Expand Down Expand Up @@ -94,8 +92,8 @@ class Filtering(data: IndexData, geo: Future[GeoIndex]) {
case Some(Failure(exc)) =>
throw Exception("Geo indexing failed", exc)

private def negate(bm: ImmutableRoaringBitmap) =
if objs.length == 0 then emptyBitmap else ImmutableRoaringBitmap.flip(bm, 0, objs.length.toLong)
private def negate(bm: ImmutableRoaringBitmap): MutableRoaringBitmap =
if objs.length == 0 then emptyBitmap else ImmutableRoaringBitmap.flip(bm, 0, objs.length.toLong).nn

private def collectUnless[T](iter: Iterator[T])(cond: T => Boolean): Option[Seq[T]] = {
var condHappened = false
Expand All @@ -107,8 +105,8 @@ class Filtering(data: IndexData, geo: Future[GeoIndex]) {
}

private def or(bms: Seq[ImmutableRoaringBitmap]): Option[MutableRoaringBitmap] =
if (bms.isEmpty) Some(emptyBitmap) else Some(BufferFastAggregation.or(bms*))
if (bms.isEmpty) Some(emptyBitmap) else Some(BufferFastAggregation.or(bms*).nn)

private def and(bms: Seq[ImmutableRoaringBitmap]): Option[MutableRoaringBitmap] =
if (bms.isEmpty) None else Some(BufferFastAggregation.and(bms*))
if (bms.isEmpty) None else Some(BufferFastAggregation.and(bms*).nn)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package se.lu.nateko.cp.meta.services.sparql.magic.index

import scala.language.unsafeNulls

import org.eclipse.rdf4j.model.IRI
import org.eclipse.rdf4j.model.Literal
import org.eclipse.rdf4j.model.Statement
Expand All @@ -27,7 +25,6 @@ import se.lu.nateko.cp.meta.services.sparql.index.StringHierarchicalBitmap.Strin
import se.lu.nateko.cp.meta.services.sparql.magic.ObjInfo
import se.lu.nateko.cp.meta.utils.parseCommaSepList
import se.lu.nateko.cp.meta.utils.parseJsonStringArray
import se.lu.nateko.cp.meta.utils.rdf4j.===
import se.lu.nateko.cp.meta.utils.rdf4j.Rdf4jStatement
import se.lu.nateko.cp.meta.utils.rdf4j.asString
import se.lu.nateko.cp.meta.utils.rdf4j.toJava
Expand All @@ -46,12 +43,12 @@ final class DataStartGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).dat
final class DataEndGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).dataEnd)
final class SubmStartGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).submissionStart)
final class SubmEndGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).submissionEnd)
final class FileNameGeo(objs: IndSeq[ObjEntry]) extends StringGeo(objs.apply(_).fName)
final class FileNameGeo(objs: IndSeq[ObjEntry]) extends StringGeo(objs.apply(_).nn.fName.nn)

final case class StatKey(spec: IRI, submitter: IRI, station: Option[IRI], site: Option[IRI])
final case class StatEntry(key: StatKey, count: Int)

def emptyBitmap = MutableRoaringBitmap.bitmapOf()
def emptyBitmap: MutableRoaringBitmap = MutableRoaringBitmap.bitmapOf().nn

final class IndexData(nObjects: Int)(
// These members are public only because of serialization, and should not be accessed directly.
Expand All @@ -65,7 +62,7 @@ final class IndexData(nObjects: Int)(
val stats: AnyRefMap[StatKey, MutableRoaringBitmap] = AnyRefMap.empty,
val initOk: MutableRoaringBitmap = emptyBitmap
) extends Serializable:
private val log = LoggerFactory.getLogger(getClass())
private val log = LoggerFactory.getLogger(getClass()).nn

private def dataStartBm = DatetimeHierarchicalBitmap(DataStartGeo(objs))
private def dataEndBm = DatetimeHierarchicalBitmap(DataEndGeo(objs))
Expand All @@ -83,7 +80,7 @@ final class IndexData(nObjects: Int)(

def getObjectKeywords(objectIds: ImmutableRoaringBitmap): Iterable[String] = {
categoryKeys(Keyword).collect {
case keyword if !BufferFastAggregation.and(objectIds, keywordBitmap(Seq(keyword))).isEmpty() =>
case keyword if !BufferFastAggregation.and(objectIds, keywordBitmap(Seq(keyword))).nn.isEmpty() =>
keyword
}
}
Expand All @@ -109,7 +106,7 @@ final class IndexData(nObjects: Int)(
keywordBitmap(values.asInstanceOf[Iterable[Keyword.ValueType]])
case _ => {
val category = categMap(prop)
BufferFastAggregation.or(values.map(v => category.getOrElse(v, emptyBitmap)).toSeq*)
BufferFastAggregation.or(values.map(v => category.getOrElse(v, emptyBitmap)).toSeq*).nn
}
}
}
Expand Down Expand Up @@ -141,7 +138,7 @@ final class IndexData(nObjects: Int)(
val objectMap = categMap(Keyword)
val objects = keywords.flatMap(objectMap.get)

BufferFastAggregation.or(LazyList(specObjects, objects).flatten*)
BufferFastAggregation.or(LazyList(specObjects, objects).flatten*).nn
}

def processUpdate(
Expand All @@ -164,10 +161,13 @@ final class IndexData(nObjects: Int)(
if (filterByEnvri) EnvriResolver.infer(subj.toJava).foreach: envri =>
updateCategSet(EnvriProp, envri, oe.idx, isAssertion)
if (isAssertion) {
if (oe.spec != null) removeStat(oe, initOk)
if (oe.spec != null) {
removeStat(oe, initOk)
}

oe.spec = spec
addStat(oe, initOk)
} else if (spec === oe.spec) {
} else if (spec == oe.spec) {
removeStat(oe, initOk)
oe.spec = null
}
Expand All @@ -183,7 +183,7 @@ final class IndexData(nObjects: Int)(

case `hasName` =>
getDataObject(subj).foreach { oe =>
val fName = obj.stringValue
val fName: String = obj.stringValue.nn
if (isAssertion) oe.fName = fName
else if (oe.fName == fName) { oe.fName = null }
handleContinuousPropUpdate(FileName, fName, oe.idx, isAssertion)
Expand Down Expand Up @@ -310,7 +310,7 @@ final class IndexData(nObjects: Int)(

val directPrevVers: IndexedSeq[Int] =
StatementSource.getStatements(subj, isNextVersionOf, null)
.flatMap(st => getDataObject(st.getObject).map(_.idx))
.flatMap(st => getDataObject(st.getObject.nn).map(_.idx))
.toIndexedSeq

directPrevVers.foreach { oldIdx =>
Expand Down Expand Up @@ -437,7 +437,6 @@ final class IndexData(nObjects: Int)(
val _ = mappings.remove(categ)
}
}

private def getSpecProjectKeywords(spec: IRI)(using CpmetaVocab, StatementSource): Set[String] = {
StatementSource
.getUriValues(spec, summon[CpmetaVocab].hasAssociatedProject)
Expand Down Expand Up @@ -541,7 +540,7 @@ final class IndexData(nObjects: Int)(
case CpVocab.DataObject(hash, prefix) =>
val entry = getObjEntry(hash)
if (entry.prefix == "") {
entry.prefix = prefix.intern()
entry.prefix = prefix.nn.intern().nn
}
Some(entry)

Expand Down Expand Up @@ -575,27 +574,41 @@ private def targetUri(obj: Value, isAssertion: Boolean) =
then obj.asInstanceOf[IRI]
else null

// TODO: Option.scala isn't currently written for explicit-nulls. Maybe changed in later scalac versions?
private def makeOption[A](arg: A | Null) = {
if (arg == null) { None }
else { Some(arg.nn) }
}
Comment thread
jonathanthiry marked this conversation as resolved.

private def keyForDobj(obj: ObjEntry): Option[StatKey] =
if obj.spec == null || obj.submitter == null then None
else
Some(
StatKey(obj.spec, obj.submitter, Option(obj.station), Option(obj.site))
)
(obj.spec, obj.submitter) match {
case (spec: IRI, submitter: IRI) => {
Some(
StatKey(
spec,
submitter,
makeOption(obj.station),
makeOption(obj.site)
)
)
}
case _ => None
}

private def ifDateTime(dt: Value)(mod: Long => Unit): Unit = dt match
case lit: Literal if lit.getDatatype === XSD.DATETIME =>
try mod(Instant.parse(lit.stringValue).toEpochMilli)
case lit: Literal if lit.getDatatype == XSD.DATETIME =>
try mod(Instant.parse(lit.stringValue).nn.toEpochMilli)
catch case _: Throwable => () // ignoring wrong dateTimes
case _ =>

private def ifLong(dt: Value)(mod: Long => Unit): Unit = dt match
case lit: Literal if lit.getDatatype === XSD.LONG =>
case lit: Literal if lit.getDatatype == XSD.LONG =>
try mod(lit.longValue)
catch case _: Throwable => () // ignoring wrong longs
case _ =>

private def ifFloat(dt: Value)(mod: Float => Unit): Unit = dt match
case lit: Literal if lit.getDatatype === XSD.FLOAT =>
case lit: Literal if lit.getDatatype == XSD.FLOAT =>
try mod(lit.floatValue)
catch case _: Throwable => () // ignoring wrong floats
case _ =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package se.lu.nateko.cp.meta.services.sparql.magic.index

import scala.language.unsafeNulls

import org.eclipse.rdf4j.model.{IRI, ValueFactory}
import se.lu.nateko.cp.meta.core.crypto.Sha256Sum
import se.lu.nateko.cp.meta.services.sparql.magic.ObjInfo
Expand All @@ -10,12 +8,12 @@ import java.time.Instant
import scala.compiletime.uninitialized

final class ObjEntry(val hash: Sha256Sum, val idx: Int, var prefix: String) extends ObjInfo with Serializable {
var spec: IRI = uninitialized
var submitter: IRI = uninitialized
var station: IRI = uninitialized
var site: IRI = uninitialized
var spec: IRI | Null = uninitialized
var submitter: IRI | Null = uninitialized
var station: IRI | Null = uninitialized
var site: IRI | Null = uninitialized
var size: Long = -1
var fName: String = ""
var fName: String | Null = ""
var samplingHeight: Float = Float.NaN
var dataStart: Long = Long.MinValue
var dataEnd: Long = Long.MinValue
Expand All @@ -25,15 +23,20 @@ final class ObjEntry(val hash: Sha256Sum, val idx: Int, var prefix: String) exte

private final def dateTimeFromLong(dt: Long): Option[Instant] =
if (dt == Long.MinValue) None
else Some(Instant.ofEpochMilli(dt))
else Some(Instant.ofEpochMilli(dt).nn)

def sizeInBytes: Option[Long] = if (size >= 0) Some(size) else None
def fileName: Option[String] = Option(fName)
def fileName: Option[String] = makeOption(fName)
def samplingHeightMeters: Option[Float] = if (samplingHeight == Float.NaN) None else Some(samplingHeight)
def dataStartTime: Option[Instant] = dateTimeFromLong(dataStart)
def dataEndTime: Option[Instant] = dateTimeFromLong(dataEnd)
def submissionStartTime: Option[Instant] = dateTimeFromLong(submissionStart)
def submissionEndTime: Option[Instant] = dateTimeFromLong(submissionEnd)

def uri(factory: ValueFactory): IRI = factory.createIRI(prefix + hash.base64Url)
def uri(factory: ValueFactory): IRI = factory.createIRI(prefix + hash.base64Url).nn

private def makeOption[T](arg: T | Null) : Option[T] = {
if (arg == null) { None }
else { Some(arg.nn) }
}
}