Skip to content

Commit

Permalink
Add StatementSource trait to reduce input surface of IndexData.proces…
Browse files Browse the repository at this point in the history
…sTriple
  • Loading branch information
ggVGc committed Feb 3, 2025
1 parent 4e09cee commit 322444a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,19 @@ trait InstanceServer extends AutoCloseable:

end InstanceServer

trait TriplestoreConnection extends AutoCloseable:
trait StatementSource {
def getStatements(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): CloseableIterator[Statement]
def hasStatement(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): Boolean

final def hasStatement(st: Statement): Boolean = st.getSubject() match
case subj: IRI => hasStatement(subj, st.getPredicate(), st.getObject())
case _ => false
}

trait TriplestoreConnection extends AutoCloseable, StatementSource:
def primaryContext: IRI
def readContexts: Seq[IRI]
def factory: ValueFactory

def getStatements(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): CloseableIterator[Statement]
def hasStatement(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): Boolean
def withContexts(primary: IRI, read: Seq[IRI]): TriplestoreConnection

final def withReadContexts(read: Seq[IRI]): TriplestoreConnection =
Expand All @@ -89,10 +95,6 @@ trait TriplestoreConnection extends AutoCloseable:
if readContexts.length == 1 && readContexts.head == primaryContext then this
else withContexts(primaryContext, Seq(primaryContext))

final def hasStatement(st: Statement): Boolean = st.getSubject() match
case subj: IRI => hasStatement(subj, st.getPredicate(), st.getObject())
case _ => false


object TriplestoreConnection:
import Validated.CardinalityExpectation.{AtLeastOne, AtMostOne, ExactlyOne}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package se.lu.nateko.cp.meta.services.sparql.magic

import org.eclipse.rdf4j.model.IRI
import org.eclipse.rdf4j.model.Value
import org.eclipse.rdf4j.model.ValueFactory
import org.eclipse.rdf4j.sail.Sail
import org.roaringbitmap.buffer.BufferFastAggregation
Expand Down Expand Up @@ -240,13 +239,10 @@ class CpIndex(sail: Sail, geo: Future[GeoIndex], data: IndexData) extends ReadWr
sail.accessEagerly:
list.forEach:
case RdfUpdate(Rdf4jStatement(subj, pred, obj), isAssertion) =>
processUpdate(subj, pred, obj, isAssertion)
data.processTriple(subj, pred, obj, isAssertion, vocab)
case _ => ()
list.clear()

private def processUpdate(subj: IRI, pred: IRI, obj: Value, isAssertion: Boolean)(using GlobConn): Unit =
data.processTriple(subj, pred, obj, isAssertion, vocab)

end CpIndex


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap
import se.lu.nateko.cp.meta.core.algo.DatetimeHierarchicalBitmap.DateTimeGeo
import se.lu.nateko.cp.meta.core.algo.{DatetimeHierarchicalBitmap, HierarchicalBitmap}
import se.lu.nateko.cp.meta.core.crypto.Sha256Sum
import se.lu.nateko.cp.meta.instanceserver.{TriplestoreConnection => TSC}
import se.lu.nateko.cp.meta.instanceserver.StatementSource
import se.lu.nateko.cp.meta.services.linkeddata.UriSerializer.Hash
import se.lu.nateko.cp.meta.services.sparql.index.StringHierarchicalBitmap.StringGeo
import se.lu.nateko.cp.meta.services.sparql.index.{FileSizeHierarchicalBitmap, SamplingHeightHierarchicalBitmap, *}
Expand All @@ -18,6 +18,7 @@ import java.time.Instant
import scala.collection.IndexedSeq as IndSeq
import scala.collection.mutable.{AnyRefMap, ArrayBuffer}
import org.slf4j.LoggerFactory
import se.lu.nateko.cp.meta.api.CloseableIterator

final class DataStartGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).dataStart)
final class DataEndGeo(objs: IndSeq[ObjEntry]) extends DateTimeGeo(objs(_).dataEnd)
Expand Down Expand Up @@ -69,7 +70,7 @@ class IndexData(nObjects: Int)(
.getOrElseUpdate(prop, new AnyRefMap[prop.ValueType, MutableRoaringBitmap])
.asInstanceOf[AnyRefMap[prop.ValueType, MutableRoaringBitmap]]

def processTriple(subj: IRI, pred: IRI, obj: Value, isAssertion: Boolean, vocab: CpmetaVocab)(using TSC): Unit = {
def processTriple(subj: IRI, pred: IRI, obj: Value, isAssertion: Boolean, vocab: CpmetaVocab)(using statements: StatementSource): Unit = {
import vocab.*
import vocab.prov.{wasAssociatedWith, startedAtTime, endedAtTime}
import vocab.dcterms.hasPart
Expand Down Expand Up @@ -204,7 +205,7 @@ class IndexData(nObjects: Int)(
case _ =>
else if
deprecated.contains(oe.idx) && // this was to prevent needless repo access
!TSC.hasStatement(null, isNextVersionOf, obj)
!statements.hasStatement(null, isNextVersionOf, obj)
then deprecated.remove(oe.idx)
}

Expand All @@ -220,7 +221,7 @@ class IndexData(nObjects: Int)(
val deprecated = boolBitmap(DeprecationFlag)

val directPrevVers: IndexedSeq[Int] =
TSC.getStatements(subj, isNextVersionOf, null)
statements.getStatements(subj, isNextVersionOf, null)
.flatMap(st => modForDobj(st.getObject)(_.idx))
.toIndexedSeq

Expand Down Expand Up @@ -324,8 +325,8 @@ class IndexData(nObjects: Int)(
if (isAssertion) hasVarsBm.add(idx) else hasVarsBm.remove(idx)
}

private def nextVersCollIsComplete(obj: IRI, vocab: CpmetaVocab)(using TSC): Boolean =
TSC.getStatements(obj, vocab.dcterms.hasPart, null)
private def nextVersCollIsComplete(obj: IRI, vocab: CpmetaVocab)(using statements: StatementSource): Boolean =
statements.getStatements(obj, vocab.dcterms.hasPart, null)
.collect:
case Rdf4jStatement(_, _, member: IRI) => modForDobj(member): oe =>
oe.isNextVersion = true
Expand All @@ -334,8 +335,8 @@ class IndexData(nObjects: Int)(
.toIndexedSeq
.exists(identity)

private def getIdxsOfPrevVersThroughColl(deprecator: IRI, vocab: CpmetaVocab)(using TSC): Option[Int] =
TSC.getStatements(null, vocab.dcterms.hasPart, deprecator)
private def getIdxsOfPrevVersThroughColl(deprecator: IRI, vocab: CpmetaVocab)(using statements: StatementSource): Option[Int] =
statements.getStatements(null, vocab.dcterms.hasPart, deprecator)
.collect { case Rdf4jStatement(CpVocab.NextVersColl(oldHash), _, _) => getObjEntry(oldHash).idx }
.toIndexedSeq
.headOption
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
package se.lu.nateko.cp.meta.test.services.sparql.index

import org.eclipse.rdf4j.model.{IRI, Statement, Value, ValueFactory}
import org.eclipse.rdf4j.model.{IRI, Statement, Value}
import org.scalatest.funspec.AnyFunSpec
import se.lu.nateko.cp.meta.api.CloseableIterator
import se.lu.nateko.cp.meta.core.crypto.Sha256Sum
import se.lu.nateko.cp.meta.instanceserver.TriplestoreConnection
import se.lu.nateko.cp.meta.instanceserver.StatementSource
import se.lu.nateko.cp.meta.services.sparql.magic.index.IndexData
import se.lu.nateko.cp.meta.services.{CpVocab, CpmetaVocab}
import se.lu.nateko.cp.meta.utils.rdf4j.Loading

// IndexData requires a TriplestoreConnection but in current tests it is not actually used.
private class DummyTSC extends TriplestoreConnection {
override def close(): Unit = ???
override def primaryContext: IRI = ???
override def readContexts: Seq[IRI] = ???
override def factory: ValueFactory = ???
// IndexData requires a StatementSource but in current tests it is not actually used,
// hence we can leave things unimplemented.
private class DummyStatements extends StatementSource {
override def getStatements(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): CloseableIterator[Statement] = ???
override def hasStatement(subject: IRI | Null, predicate: IRI | Null, obj: Value | Null): Boolean = ???
override def withContexts(primary: IRI, read: Seq[IRI]): TriplestoreConnection = ???
}

class IndexDataTest extends AnyFunSpec {
Expand All @@ -36,7 +32,7 @@ class IndexDataTest extends AnyFunSpec {
}

val data = IndexData(100)()
given TriplestoreConnection = DummyTSC()
given StatementSource = DummyStatements()

// Insert hasName triple
data.processTriple(subject, vocab.hasName, factory.createIRI("test:name"), true, vocab)
Expand Down

0 comments on commit 322444a

Please sign in to comment.