From 322444ae59c742305cb2c75a1870da91e7c73472 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Valter=20Sundstr=C3=B6m?= Date: Mon, 3 Feb 2025 16:53:24 +0100 Subject: [PATCH] Add StatementSource trait to reduce input surface of IndexData.processTriple --- .../meta/instanceserver/InstanceServer.scala | 18 ++++++++++-------- .../meta/services/sparql/magic/CpIndex.scala | 6 +----- .../sparql/magic/index/IndexData.scala | 17 +++++++++-------- .../services/sparql/index/IndexDataTest.scala | 16 ++++++---------- 4 files changed, 26 insertions(+), 31 deletions(-) diff --git a/src/main/scala/se/lu/nateko/cp/meta/instanceserver/InstanceServer.scala b/src/main/scala/se/lu/nateko/cp/meta/instanceserver/InstanceServer.scala index 195686eee..7f0b53be4 100644 --- a/src/main/scala/se/lu/nateko/cp/meta/instanceserver/InstanceServer.scala +++ b/src/main/scala/se/lu/nateko/cp/meta/instanceserver/InstanceServer.scala @@ -72,13 +72,19 @@ trait InstanceServer extends AutoCloseable: end InstanceServer -trait TriplestoreConnection extends AutoCloseable: +trait StatementSource { + def getStatements(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): CloseableIterator[Statement] + def hasStatement(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): Boolean + + final def hasStatement(st: Statement): Boolean = st.getSubject() match + case subj: IRI => hasStatement(subj, st.getPredicate(), st.getObject()) + case _ => false +} + +trait TriplestoreConnection extends AutoCloseable, StatementSource: def primaryContext: IRI def readContexts: Seq[IRI] def factory: ValueFactory - - def getStatements(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): CloseableIterator[Statement] - def hasStatement(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): Boolean def withContexts(primary: IRI, read: Seq[IRI]): TriplestoreConnection final def withReadContexts(read: Seq[IRI]): TriplestoreConnection = @@ -89,10 +95,6 @@ trait TriplestoreConnection extends AutoCloseable: if readContexts.length == 1 && readContexts.head == primaryContext then this else withContexts(primaryContext, Seq(primaryContext)) - final def hasStatement(st: Statement): Boolean = st.getSubject() match - case subj: IRI => hasStatement(subj, st.getPredicate(), st.getObject()) - case _ => false - object TriplestoreConnection: import Validated.CardinalityExpectation.{AtLeastOne, AtMostOne, ExactlyOne} diff --git a/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/CpIndex.scala b/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/CpIndex.scala index b4e5f962f..79bb038d5 100644 --- a/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/CpIndex.scala +++ b/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/CpIndex.scala @@ -1,7 +1,6 @@ package se.lu.nateko.cp.meta.services.sparql.magic import org.eclipse.rdf4j.model.IRI -import org.eclipse.rdf4j.model.Value import org.eclipse.rdf4j.model.ValueFactory import org.eclipse.rdf4j.sail.Sail import org.roaringbitmap.buffer.BufferFastAggregation @@ -240,13 +239,10 @@ class CpIndex(sail: Sail, geo: Future[GeoIndex], data: IndexData) extends ReadWr sail.accessEagerly: list.forEach: case RdfUpdate(Rdf4jStatement(subj, pred, obj), isAssertion) => - processUpdate(subj, pred, obj, isAssertion) + data.processTriple(subj, pred, obj, isAssertion, vocab) case _ => () list.clear() - private def processUpdate(subj: IRI, pred: IRI, obj: Value, isAssertion: Boolean)(using GlobConn): Unit = - data.processTriple(subj, pred, obj, isAssertion, vocab) - end CpIndex diff --git a/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/index/IndexData.scala b/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/index/IndexData.scala index dedf33bfa..50aec546f 100644 --- a/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/index/IndexData.scala +++ b/src/main/scala/se/lu/nateko/cp/meta/services/sparql/magic/index/IndexData.scala @@ -6,7 +6,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap import se.lu.nateko.cp.meta.core.algo.DatetimeHierarchicalBitmap.DateTimeGeo import se.lu.nateko.cp.meta.core.algo.{DatetimeHierarchicalBitmap, HierarchicalBitmap} import se.lu.nateko.cp.meta.core.crypto.Sha256Sum -import se.lu.nateko.cp.meta.instanceserver.{TriplestoreConnection => TSC} +import se.lu.nateko.cp.meta.instanceserver.StatementSource import se.lu.nateko.cp.meta.services.linkeddata.UriSerializer.Hash import se.lu.nateko.cp.meta.services.sparql.index.StringHierarchicalBitmap.StringGeo import se.lu.nateko.cp.meta.services.sparql.index.{FileSizeHierarchicalBitmap, SamplingHeightHierarchicalBitmap, *} @@ -18,6 +18,7 @@ import java.time.Instant import scala.collection.IndexedSeq as IndSeq import scala.collection.mutable.{AnyRefMap, ArrayBuffer} import org.slf4j.LoggerFactory +import se.lu.nateko.cp.meta.api.CloseableIterator final class DataStartGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).dataStart) final class DataEndGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).dataEnd) @@ -69,7 +70,7 @@ class IndexData(nObjects: Int)( .getOrElseUpdate(prop, new AnyRefMap[prop.ValueType, MutableRoaringBitmap]) .asInstanceOf[AnyRefMap[prop.ValueType, MutableRoaringBitmap]] - def processTriple(subj: IRI, pred: IRI, obj: Value, isAssertion: Boolean, vocab: CpmetaVocab)(using TSC): Unit = { + def processTriple(subj: IRI, pred: IRI, obj: Value, isAssertion: Boolean, vocab: CpmetaVocab)(using statements: StatementSource): Unit = { import vocab.* import vocab.prov.{wasAssociatedWith, startedAtTime, endedAtTime} import vocab.dcterms.hasPart @@ -204,7 +205,7 @@ class IndexData(nObjects: Int)( case _ => else if deprecated.contains(oe.idx) && // this was to prevent needless repo access - !TSC.hasStatement(null, isNextVersionOf, obj) + !statements.hasStatement(null, isNextVersionOf, obj) then deprecated.remove(oe.idx) } @@ -220,7 +221,7 @@ class IndexData(nObjects: Int)( val deprecated = boolBitmap(DeprecationFlag) val directPrevVers: IndexedSeq[Int] = - TSC.getStatements(subj, isNextVersionOf, null) + statements.getStatements(subj, isNextVersionOf, null) .flatMap(st => modForDobj(st.getObject)(_.idx)) .toIndexedSeq @@ -324,8 +325,8 @@ class IndexData(nObjects: Int)( if (isAssertion) hasVarsBm.add(idx) else hasVarsBm.remove(idx) } - private def nextVersCollIsComplete(obj: IRI, vocab: CpmetaVocab)(using TSC): Boolean = - TSC.getStatements(obj, vocab.dcterms.hasPart, null) + private def nextVersCollIsComplete(obj: IRI, vocab: CpmetaVocab)(using statements: StatementSource): Boolean = + statements.getStatements(obj, vocab.dcterms.hasPart, null) .collect: case Rdf4jStatement(_, _, member: IRI) => modForDobj(member): oe => oe.isNextVersion = true @@ -334,8 +335,8 @@ class IndexData(nObjects: Int)( .toIndexedSeq .exists(identity) - private def getIdxsOfPrevVersThroughColl(deprecator: IRI, vocab: CpmetaVocab)(using TSC): Option[Int] = - TSC.getStatements(null, vocab.dcterms.hasPart, deprecator) + private def getIdxsOfPrevVersThroughColl(deprecator: IRI, vocab: CpmetaVocab)(using statements: StatementSource): Option[Int] = + statements.getStatements(null, vocab.dcterms.hasPart, deprecator) .collect { case Rdf4jStatement(CpVocab.NextVersColl(oldHash), _, _) => getObjEntry(oldHash).idx } .toIndexedSeq .headOption diff --git a/src/test/scala/se/lu/nateko/cp/meta/test/services/sparql/index/IndexDataTest.scala b/src/test/scala/se/lu/nateko/cp/meta/test/services/sparql/index/IndexDataTest.scala index 21fcee8f2..0e4e90db3 100644 --- a/src/test/scala/se/lu/nateko/cp/meta/test/services/sparql/index/IndexDataTest.scala +++ b/src/test/scala/se/lu/nateko/cp/meta/test/services/sparql/index/IndexDataTest.scala @@ -1,23 +1,19 @@ package se.lu.nateko.cp.meta.test.services.sparql.index -import org.eclipse.rdf4j.model.{IRI, Statement, Value, ValueFactory} +import org.eclipse.rdf4j.model.{IRI, Statement, Value} import org.scalatest.funspec.AnyFunSpec import se.lu.nateko.cp.meta.api.CloseableIterator import se.lu.nateko.cp.meta.core.crypto.Sha256Sum -import se.lu.nateko.cp.meta.instanceserver.TriplestoreConnection +import se.lu.nateko.cp.meta.instanceserver.StatementSource import se.lu.nateko.cp.meta.services.sparql.magic.index.IndexData import se.lu.nateko.cp.meta.services.{CpVocab, CpmetaVocab} import se.lu.nateko.cp.meta.utils.rdf4j.Loading -// IndexData requires a TriplestoreConnection but in current tests it is not actually used. -private class DummyTSC extends TriplestoreConnection { - override def close(): Unit = ??? - override def primaryContext: IRI = ??? - override def readContexts: Seq[IRI] = ??? - override def factory: ValueFactory = ??? +// IndexData requires a StatementSource but in current tests it is not actually used, +// hence we can leave things unimplemented. +private class DummyStatements extends StatementSource { override def getStatements(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): CloseableIterator[Statement] = ??? override def hasStatement(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): Boolean = ??? - override def withContexts(primary: IRI, read: Seq[IRI]): TriplestoreConnection = ??? } class IndexDataTest extends AnyFunSpec { @@ -36,7 +32,7 @@ class IndexDataTest extends AnyFunSpec { } val data = IndexData(100)() - given TriplestoreConnection = DummyTSC() + given StatementSource = DummyStatements() // Insert hasName triple data.processTriple(subject, vocab.hasName, factory.createIRI("test:name"), true, vocab)