diff --git a/core/src/main/scala/swaydb/core/build/BuildValidator.scala b/core/src/main/scala/swaydb/core/build/BuildValidator.scala index 807682286..6eb30b10b 100644 --- a/core/src/main/scala/swaydb/core/build/BuildValidator.scala +++ b/core/src/main/scala/swaydb/core/build/BuildValidator.scala @@ -71,7 +71,7 @@ object BuildValidator { IO.failed(Exception.MissingBuildInfo(Build.fileName, thisVersion.version)) case previous @ Build.Info(previousVersion, previousDataType) => - val isValid = previousVersion.major >= 0 && previousVersion.minor >= 16 && previousVersion.revision >= 0 + val isValid = previousVersion.major >= 0 && previousVersion.minor >= 17 && previousVersion.revision >= 0 if (!isValid) IO.failed(IncompatibleVersions(previous.version.version, thisVersion.version)) else if (previousDataType != dataType) diff --git a/core/src/main/scala/swaydb/core/map/serializer/ValueSerializer.scala b/core/src/main/scala/swaydb/core/map/serializer/ValueSerializer.scala index 208b41d83..29861740d 100644 --- a/core/src/main/scala/swaydb/core/map/serializer/ValueSerializer.scala +++ b/core/src/main/scala/swaydb/core/map/serializer/ValueSerializer.scala @@ -27,10 +27,11 @@ package swaydb.core.map.serializer import swaydb.IO import swaydb.core.data.{Time, Value} import swaydb.core.io.reader.Reader -import swaydb.core.util.Bytes +import swaydb.core.util.{Bytes, MinMax} import swaydb.core.util.Times._ import swaydb.data.slice.{ReaderBase, Slice, SliceOption} import swaydb.data.util.ByteSizeOf +import swaydb.data.util.Options.OptionsImplicits import scala.annotation.implicitNotFound import scala.collection.mutable @@ -405,6 +406,50 @@ private[core] object ValueSerializer { } } + implicit object MinMaxSerialiser extends ValueSerializer[Option[MinMax[Slice[Byte]]]] { + override def write(minMax: Option[MinMax[Slice[Byte]]], bytes: Slice[Byte]): Unit = + minMax match { + case Some(minMax) => + bytes addUnsignedInt minMax.min.size + bytes addAll minMax.min + minMax.max match { + case Some(max) => + bytes addUnsignedInt max.size + bytes addAll max + + case None => + bytes addUnsignedInt 0 + } + + case None => + bytes addUnsignedInt 0 + } + + override def read(reader: ReaderBase[Byte]): Option[MinMax[Slice[Byte]]] = { + val minIdSize = reader.readUnsignedInt() + if (minIdSize == 0) + None + else { + val minId = reader.read(minIdSize) + val maxIdSize = reader.readUnsignedInt() + val maxId = if (maxIdSize == 0) None else Some(reader.read(maxIdSize)) + Some(MinMax(minId, maxId)) + } + } + + override def bytesRequired(minMax: Option[MinMax[Slice[Byte]]]): Int = + minMax match { + case Some(minMax) => + Bytes.sizeOfUnsignedInt(minMax.min.size) + + minMax.min.size + + Bytes.sizeOfUnsignedInt(minMax.max.valueOrElse(_.size, 0)) + + minMax.max.valueOrElse(_.size, 0) + + case None => + 1 + } + } + def writeBytes[T](value: T)(implicit serializer: ValueSerializer[T]): Slice[Byte] = { val bytesRequired = ValueSerializer.bytesRequired(value) val bytes = Slice.of[Byte](bytesRequired) diff --git a/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala b/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala index 7f5f723f7..60d4c88a0 100644 --- a/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala +++ b/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala @@ -104,6 +104,7 @@ protected case object PersistentSegmentMany { minKey = singleton.minKey, maxKey = singleton.maxKey, nearestPutDeadline = singleton.nearestPutDeadline, + minMaxFunctionId = singleton.minMaxFunctionId, blockRef = blockRef, segmentIO = segmentIO, valuesReaderCacheable = singleton.valuesUnblockedReader, @@ -304,6 +305,7 @@ protected case object PersistentSegmentMany { minKey = minKey, maxKey = maxKey, nearestPutDeadline = None, + minMaxFunctionId = None, blockRef = listSegmentRef, segmentIO = segmentIO, valuesReaderCacheable = None, @@ -405,6 +407,7 @@ protected case object PersistentSegmentMany { minKey = minKey, maxKey = maxKey, //ListSegment does not store deadline. This is stored at the higher Level. + minMaxFunctionId = None, nearestPutDeadline = None, blockRef = listSegmentRef, segmentIO = segmentIO, @@ -593,12 +596,14 @@ protected case class PersistentSegmentMany(file: DBFile, .floor(key) .existsC(_.mightContainKey(key)) - /** - * [[PersistentSegmentMany]] is not aware of [[minMaxFunctionId]]. - * It should be deferred to [[SegmentRef]]. - */ override def mightContainFunction(key: Slice[Byte]): Boolean = - segmentRefs exists (_.mightContainKey(key)) + minMaxFunctionId exists { + minMaxFunctionId => + MinMax.contains( + key = key, + minMax = minMaxFunctionId + )(FunctionStore.order) + } def get(key: Slice[Byte], threadState: ThreadReadState): PersistentOption = segments diff --git a/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala b/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala index 5f16ed788..67d4382f6 100644 --- a/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala +++ b/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala @@ -76,14 +76,7 @@ protected object PersistentSegmentOne { createdInLevel = createdInLevel, minKey = segment.minKey, maxKey = segment.maxKey, - minMaxFunctionId = - segment match { - case _: TransientSegment.Remote => - None - - case one: TransientSegment.One => - one.minMaxFunctionId - }, + minMaxFunctionId = segment.minMaxFunctionId, segmentSize = segment.segmentSize, nearestExpiryDeadline = segment.nearestPutDeadline, valuesReaderCacheable = segment.valuesUnblockedReader, @@ -131,6 +124,7 @@ protected object PersistentSegmentOne { minKey = minKey, maxKey = maxKey, nearestPutDeadline = nearestExpiryDeadline, + minMaxFunctionId = minMaxFunctionId, blockRef = segmentBlockRef, segmentIO = segmentIO, valuesReaderCacheable = valuesReaderCacheable, diff --git a/core/src/main/scala/swaydb/core/segment/SegmentRef.scala b/core/src/main/scala/swaydb/core/segment/SegmentRef.scala index a50458128..2b1b9f9d2 100644 --- a/core/src/main/scala/swaydb/core/segment/SegmentRef.scala +++ b/core/src/main/scala/swaydb/core/segment/SegmentRef.scala @@ -76,6 +76,7 @@ private[core] case object SegmentRef extends LazyLogging { minKey: Slice[Byte], maxKey: MaxKey[Slice[Byte]], nearestPutDeadline: Option[Deadline], + minMaxFunctionId: Option[MinMax[Slice[Byte]]], blockRef: BlockRefReader[SegmentBlock.Offset], segmentIO: SegmentIO, valuesReaderCacheable: Option[UnblockedReader[ValuesBlock.Offset, ValuesBlock]], @@ -123,6 +124,7 @@ private[core] case object SegmentRef extends LazyLogging { maxKey = maxKey, minKey = minKey, nearestPutDeadline = nearestPutDeadline, + minMaxFunctionId = minMaxFunctionId, skipList = skipList, segmentBlockCache = segmentBlockCache ) @@ -1485,6 +1487,7 @@ private[core] class SegmentRef(val path: Path, val maxKey: MaxKey[Slice[Byte]], val minKey: Slice[Byte], val nearestPutDeadline: Option[Deadline], + val minMaxFunctionId: Option[MinMax[Slice[Byte]]], val skipList: Option[SkipList[SliceOption[Byte], PersistentOption, Slice[Byte], Persistent]], val segmentBlockCache: SegmentBlockCache)(implicit keyValueMemorySweeper: Option[MemorySweeper.KeyValue], keyOrder: KeyOrder[Slice[Byte]]) extends SegmentRefOption with LazyLogging { diff --git a/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala b/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala index 03bc51e80..eb8f7408c 100644 --- a/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala +++ b/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala @@ -29,11 +29,11 @@ import java.nio.file.Paths import java.util.concurrent.TimeUnit import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor -import swaydb.core.actor.FileSweeper -import swaydb.core.actor.MemorySweeper +import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.function.FunctionStore import swaydb.core.io.file.{BlockCache, Effect, ForceSaveApplier} -import swaydb.core.util.{BlockCacheFileIDGenerator, Bytes, Extension, MinMax} +import swaydb.core.map.serializer.ValueSerializer.MinMaxSerialiser +import swaydb.core.util.{BlockCacheFileIDGenerator, Bytes, Extension} import swaydb.data.MaxKey import swaydb.data.config.MMAP import swaydb.data.order.{KeyOrder, TimeOrder} @@ -96,22 +96,7 @@ private[core] object SegmentSerialiser { .addAll(maxKeyBytes) .addUnsignedLong(segment.nearestPutDeadline.valueOrElse(_.time.toNanos, 0L)) - segment.minMaxFunctionId match { - case Some(minMaxFunctionId) => - bytes addUnsignedInt minMaxFunctionId.min.size - bytes addAll minMaxFunctionId.min - minMaxFunctionId.max match { - case Some(max) => - bytes addUnsignedInt max.size - bytes addAll max - - case None => - bytes addUnsignedInt 0 - } - - case None => - bytes addUnsignedInt 0 - } + MinMaxSerialiser.write(segment.minMaxFunctionId, bytes) } def read(reader: ReaderBase[Byte], @@ -160,17 +145,7 @@ private[core] object SegmentSerialiser { Some(Deadline((deadlineNanos, TimeUnit.NANOSECONDS))) } - val minMaxFunctionId = { - val minIdSize = reader.readUnsignedInt() - if (minIdSize == 0) - None - else { - val minId = reader.read(minIdSize) - val maxIdSize = reader.readUnsignedInt() - val maxId = if (maxIdSize == 0) None else Some(reader.read(maxIdSize)) - Some(MinMax(minId, maxId)) - } - } + val minMaxFunctionId = MinMaxSerialiser.read(reader) val fileType = Effect.numberFileId(segmentPath)._2 @@ -205,16 +180,7 @@ private[core] object SegmentSerialiser { } val minMaxFunctionIdBytesRequires = - segment.minMaxFunctionId match { - case Some(minMax) => - Bytes.sizeOfUnsignedInt(minMax.min.size) + - minMax.min.size + - Bytes.sizeOfUnsignedInt(minMax.max.valueOrElse(_.size, 0)) + - minMax.max.valueOrElse(_.size, 0) - - case None => - 1 - } + MinMaxSerialiser.bytesRequired(segment.minMaxFunctionId) ByteSizeOf.byte + //formatId ByteSizeOf.byte + //segmentFormatId diff --git a/core/src/main/scala/swaydb/core/segment/format/a/block/segment/SegmentBlock.scala b/core/src/main/scala/swaydb/core/segment/format/a/block/segment/SegmentBlock.scala index 0974d8052..f8f2a3648 100644 --- a/core/src/main/scala/swaydb/core/segment/format/a/block/segment/SegmentBlock.scala +++ b/core/src/main/scala/swaydb/core/segment/format/a/block/segment/SegmentBlock.scala @@ -38,8 +38,8 @@ import swaydb.core.segment.format.a.block.sortedindex.SortedIndexBlock import swaydb.core.segment.format.a.block.values.ValuesBlock import swaydb.core.segment.merge.MergeStats import swaydb.core.segment.merge.MergeStats.Persistent -import swaydb.core.segment.{PersistentSegmentMany, PersistentSegmentOne, SegmentRef} -import swaydb.core.util.{Bytes, Collections} +import swaydb.core.segment.{PersistentSegmentMany, PersistentSegmentOne} +import swaydb.core.util.{Bytes, Collections, MinMax} import swaydb.data.config._ import swaydb.data.order.KeyOrder import swaydb.data.slice.Slice @@ -216,8 +216,12 @@ private[core] case object SegmentBlock extends LazyLogging { val listKeyValue: Persistent.Builder[Memory, Slice] = MergeStats.persistent(Slice.newAggregator(segments.size * 2)) + var minMaxFunctionId = Option.empty[MinMax[Slice[Byte]]] + segments.foldLeft(0) { case (offset, segment) => + minMaxFunctionId = MinMax.minMaxFunctionOption(segment.minMaxFunctionId, minMaxFunctionId) + val segmentSize = segment.segmentSize listKeyValue addAll segment.toKeyValue(offset, segmentSize) offset + segmentSize @@ -262,9 +266,7 @@ private[core] case object SegmentBlock extends LazyLogging { TransientSegment.Many( minKey = segments.head.minKey, maxKey = segments.last.maxKey, - //minMaxFunctionId is not stored in Many. All functionId request should be deferred - //onto the SegmentRefs itself. - minMaxFunctionId = None, + minMaxFunctionId = minMaxFunctionId, fileHeader = fileHeader, nearestPutDeadline = listSegment.nearestPutDeadline, listSegment = listSegment, diff --git a/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegment.scala b/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegment.scala index 62379edb8..1639e7c22 100644 --- a/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegment.scala +++ b/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegment.scala @@ -44,6 +44,7 @@ sealed trait TransientSegment { def maxKey: MaxKey[Slice[Byte]] def hasEmptyByteSlice: Boolean def nearestPutDeadline: Option[Deadline] + def minMaxFunctionId: Option[MinMax[Slice[Byte]]] def segmentSize: Int } @@ -80,6 +81,9 @@ object TransientSegment { override def nearestPutDeadline: Option[Deadline] = segmentRef.nearestPutDeadline + override def minMaxFunctionId: Option[MinMax[Slice[Byte]]] = + segmentRef.minMaxFunctionId + override def hasEmptyByteSlice: Boolean = fileHeader.isEmpty || hasEmptyByteSliceIgnoreHeader diff --git a/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegmentSerialiser.scala b/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegmentSerialiser.scala index 0d48bce8a..6e28e2a37 100644 --- a/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegmentSerialiser.scala +++ b/core/src/main/scala/swaydb/core/segment/format/a/block/segment/data/TransientSegmentSerialiser.scala @@ -29,6 +29,7 @@ import java.nio.file.Path import swaydb.core.actor.MemorySweeper import swaydb.core.data.{Memory, Persistent, Time, Value} import swaydb.core.io.reader.Reader +import swaydb.core.map.serializer.ValueSerializer.MinMaxSerialiser import swaydb.core.segment.format.a.block.binarysearch.BinarySearchIndexBlock import swaydb.core.segment.format.a.block.bloomfilter.BloomFilterBlock import swaydb.core.segment.format.a.block.hashindex.HashIndexBlock @@ -50,11 +51,15 @@ object TransientSegmentSerialiser { size: Int): Slice[Memory] = singleton.maxKey match { case MaxKey.Fixed(maxKey) => - val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2)) + val minMaxFunctionBytesSize = MinMaxSerialiser.bytesRequired(singleton.minMaxFunctionId) + + val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2) + minMaxFunctionBytesSize) value add 0 //fixed maxKey id value addUnsignedInt offset value addUnsignedInt size + MinMaxSerialiser.write(singleton.minMaxFunctionId, value) + /** * - nearestDeadline is stored so that the parent many segment knows which segment to refresh. * - minMaxFunctionIds are not stored here. All request for mightContainFunction are deferred onto the SegmentRef itself. @@ -69,10 +74,13 @@ object TransientSegmentSerialiser { ) case MaxKey.Range(fromKey, maxKey) => - val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2) + fromKey.size) + val minMaxFunctionBytesSize = MinMaxSerialiser.bytesRequired(singleton.minMaxFunctionId) + + val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2) + minMaxFunctionBytesSize + fromKey.size) value add 1 //range maxKey id value addUnsignedInt offset value addUnsignedInt size + MinMaxSerialiser.write(singleton.minMaxFunctionId, value) value addAll fromKey if (singleton.minKey equals maxKey) { @@ -107,11 +115,13 @@ object TransientSegmentSerialiser { if (maxKeyId == 0) { val segmentOffset = valueReader.readUnsignedInt() val segmentSize = valueReader.readUnsignedInt() + val minMaxFunctionId = MinMaxSerialiser.read(valueReader) SegmentRef( path = path.resolve(s".ref.$segmentOffset"), minKey = range.fromKey.unslice(), maxKey = MaxKey.Fixed(range.toKey.unslice()), nearestPutDeadline = deadline, + minMaxFunctionId = minMaxFunctionId, blockRef = BlockRefReader( ref = reader, @@ -129,12 +139,14 @@ object TransientSegmentSerialiser { } else if (maxKeyId == 1) { val segmentOffset = valueReader.readUnsignedInt() val segmentSize = valueReader.readUnsignedInt() + val minMaxFunctionId = MinMaxSerialiser.read(valueReader) val maxKeyMinKey = valueReader.readRemaining() SegmentRef( path = path.resolve(s".ref.$segmentOffset"), minKey = range.fromKey.unslice(), maxKey = MaxKey.Range(maxKeyMinKey.unslice(), range.toKey.unslice()), nearestPutDeadline = deadline, + minMaxFunctionId = minMaxFunctionId, blockRef = BlockRefReader( ref = reader, @@ -174,11 +186,13 @@ object TransientSegmentSerialiser { if (maxKeyId == 0) { val segmentOffset = valueReader.readUnsignedInt() val segmentSize = valueReader.readUnsignedInt() + val minMaxFunctionId = MinMaxSerialiser.read(valueReader) SegmentRef( path = path.resolve(s".ref.$segmentOffset"), minKey = put.key, maxKey = MaxKey.Fixed(put.key.unslice()), nearestPutDeadline = put.deadline, + minMaxFunctionId = minMaxFunctionId, blockRef = BlockRefReader( ref = reader, @@ -196,12 +210,14 @@ object TransientSegmentSerialiser { } else if (maxKeyId == 1) { val segmentOffset = valueReader.readUnsignedInt() val segmentSize = valueReader.readUnsignedInt() + val minMaxFunctionId = MinMaxSerialiser.read(valueReader) val maxKeyMinKey = valueReader.readRemaining() SegmentRef( path = path.resolve(s".ref.$segmentOffset"), minKey = put.key.unslice(), maxKey = MaxKey.Range(maxKeyMinKey.unslice(), put.key.unslice()), nearestPutDeadline = put.deadline, + minMaxFunctionId = minMaxFunctionId, blockRef = BlockRefReader( ref = reader, diff --git a/core/src/test/scala/swaydb/core/segment/SegmentRefGetBehaviorSpec.scala b/core/src/test/scala/swaydb/core/segment/SegmentRefGetBehaviorSpec.scala index 03a60e9ff..9b8697ae8 100644 --- a/core/src/test/scala/swaydb/core/segment/SegmentRefGetBehaviorSpec.scala +++ b/core/src/test/scala/swaydb/core/segment/SegmentRefGetBehaviorSpec.scala @@ -69,6 +69,7 @@ class SegmentRefGetBehaviorSpec extends TestBase with MockFactory { maxKey = MaxKey.Fixed[Slice[Byte]](100), minKey = 0, nearestPutDeadline = None, + minMaxFunctionId = None, skipList = None, segmentBlockCache = null ) @@ -87,6 +88,7 @@ class SegmentRefGetBehaviorSpec extends TestBase with MockFactory { maxKey = MaxKey.Range[Slice[Byte]](90, 100), minKey = 0, nearestPutDeadline = None, + minMaxFunctionId = None, skipList = None, segmentBlockCache = null ) @@ -117,6 +119,7 @@ class SegmentRefGetBehaviorSpec extends TestBase with MockFactory { maxKey = MaxKey.Fixed[Slice[Byte]](Int.MaxValue), minKey = 0, nearestPutDeadline = None, + minMaxFunctionId = None, skipList = None, segmentBlockCache = segmentBlockCache.head ) diff --git a/version.sbt b/version.sbt index ef1f15555..e012c90b6 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.16.3-SNAPSHOT" +version in ThisBuild := "0.17-SNAPSHOT"