Skip to content

Commit

Permalink
storing minMaxFunctionId in SegmentRef and bumped version to 0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
simerplaha committed Nov 4, 2020
1 parent 68a81da commit e23fb0d
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 63 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/swaydb/core/build/BuildValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object BuildValidator {
IO.failed(Exception.MissingBuildInfo(Build.fileName, thisVersion.version))

case previous @ Build.Info(previousVersion, previousDataType) =>
val isValid = previousVersion.major >= 0 && previousVersion.minor >= 16 && previousVersion.revision >= 0
val isValid = previousVersion.major >= 0 && previousVersion.minor >= 17 && previousVersion.revision >= 0
if (!isValid)
IO.failed(IncompatibleVersions(previous.version.version, thisVersion.version))
else if (previousDataType != dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ package swaydb.core.map.serializer
import swaydb.IO
import swaydb.core.data.{Time, Value}
import swaydb.core.io.reader.Reader
import swaydb.core.util.Bytes
import swaydb.core.util.{Bytes, MinMax}
import swaydb.core.util.Times._
import swaydb.data.slice.{ReaderBase, Slice, SliceOption}
import swaydb.data.util.ByteSizeOf
import swaydb.data.util.Options.OptionsImplicits

import scala.annotation.implicitNotFound
import scala.collection.mutable
Expand Down Expand Up @@ -405,6 +406,50 @@ private[core] object ValueSerializer {
}
}

implicit object MinMaxSerialiser extends ValueSerializer[Option[MinMax[Slice[Byte]]]] {
override def write(minMax: Option[MinMax[Slice[Byte]]], bytes: Slice[Byte]): Unit =
minMax match {
case Some(minMax) =>
bytes addUnsignedInt minMax.min.size
bytes addAll minMax.min
minMax.max match {
case Some(max) =>
bytes addUnsignedInt max.size
bytes addAll max

case None =>
bytes addUnsignedInt 0
}

case None =>
bytes addUnsignedInt 0
}

override def read(reader: ReaderBase[Byte]): Option[MinMax[Slice[Byte]]] = {
val minIdSize = reader.readUnsignedInt()
if (minIdSize == 0)
None
else {
val minId = reader.read(minIdSize)
val maxIdSize = reader.readUnsignedInt()
val maxId = if (maxIdSize == 0) None else Some(reader.read(maxIdSize))
Some(MinMax(minId, maxId))
}
}

override def bytesRequired(minMax: Option[MinMax[Slice[Byte]]]): Int =
minMax match {
case Some(minMax) =>
Bytes.sizeOfUnsignedInt(minMax.min.size) +
minMax.min.size +
Bytes.sizeOfUnsignedInt(minMax.max.valueOrElse(_.size, 0)) +
minMax.max.valueOrElse(_.size, 0)

case None =>
1
}
}

def writeBytes[T](value: T)(implicit serializer: ValueSerializer[T]): Slice[Byte] = {
val bytesRequired = ValueSerializer.bytesRequired(value)
val bytes = Slice.of[Byte](bytesRequired)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ protected case object PersistentSegmentMany {
minKey = singleton.minKey,
maxKey = singleton.maxKey,
nearestPutDeadline = singleton.nearestPutDeadline,
minMaxFunctionId = singleton.minMaxFunctionId,
blockRef = blockRef,
segmentIO = segmentIO,
valuesReaderCacheable = singleton.valuesUnblockedReader,
Expand Down Expand Up @@ -304,6 +305,7 @@ protected case object PersistentSegmentMany {
minKey = minKey,
maxKey = maxKey,
nearestPutDeadline = None,
minMaxFunctionId = None,
blockRef = listSegmentRef,
segmentIO = segmentIO,
valuesReaderCacheable = None,
Expand Down Expand Up @@ -405,6 +407,7 @@ protected case object PersistentSegmentMany {
minKey = minKey,
maxKey = maxKey,
//ListSegment does not store deadline. This is stored at the higher Level.
minMaxFunctionId = None,
nearestPutDeadline = None,
blockRef = listSegmentRef,
segmentIO = segmentIO,
Expand Down Expand Up @@ -593,12 +596,14 @@ protected case class PersistentSegmentMany(file: DBFile,
.floor(key)
.existsC(_.mightContainKey(key))

/**
* [[PersistentSegmentMany]] is not aware of [[minMaxFunctionId]].
* It should be deferred to [[SegmentRef]].
*/
override def mightContainFunction(key: Slice[Byte]): Boolean =
segmentRefs exists (_.mightContainKey(key))
minMaxFunctionId exists {
minMaxFunctionId =>
MinMax.contains(
key = key,
minMax = minMaxFunctionId
)(FunctionStore.order)
}

def get(key: Slice[Byte], threadState: ThreadReadState): PersistentOption =
segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,7 @@ protected object PersistentSegmentOne {
createdInLevel = createdInLevel,
minKey = segment.minKey,
maxKey = segment.maxKey,
minMaxFunctionId =
segment match {
case _: TransientSegment.Remote =>
None

case one: TransientSegment.One =>
one.minMaxFunctionId
},
minMaxFunctionId = segment.minMaxFunctionId,
segmentSize = segment.segmentSize,
nearestExpiryDeadline = segment.nearestPutDeadline,
valuesReaderCacheable = segment.valuesUnblockedReader,
Expand Down Expand Up @@ -131,6 +124,7 @@ protected object PersistentSegmentOne {
minKey = minKey,
maxKey = maxKey,
nearestPutDeadline = nearestExpiryDeadline,
minMaxFunctionId = minMaxFunctionId,
blockRef = segmentBlockRef,
segmentIO = segmentIO,
valuesReaderCacheable = valuesReaderCacheable,
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/swaydb/core/segment/SegmentRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private[core] case object SegmentRef extends LazyLogging {
minKey: Slice[Byte],
maxKey: MaxKey[Slice[Byte]],
nearestPutDeadline: Option[Deadline],
minMaxFunctionId: Option[MinMax[Slice[Byte]]],
blockRef: BlockRefReader[SegmentBlock.Offset],
segmentIO: SegmentIO,
valuesReaderCacheable: Option[UnblockedReader[ValuesBlock.Offset, ValuesBlock]],
Expand Down Expand Up @@ -123,6 +124,7 @@ private[core] case object SegmentRef extends LazyLogging {
maxKey = maxKey,
minKey = minKey,
nearestPutDeadline = nearestPutDeadline,
minMaxFunctionId = minMaxFunctionId,
skipList = skipList,
segmentBlockCache = segmentBlockCache
)
Expand Down Expand Up @@ -1485,6 +1487,7 @@ private[core] class SegmentRef(val path: Path,
val maxKey: MaxKey[Slice[Byte]],
val minKey: Slice[Byte],
val nearestPutDeadline: Option[Deadline],
val minMaxFunctionId: Option[MinMax[Slice[Byte]]],
val skipList: Option[SkipList[SliceOption[Byte], PersistentOption, Slice[Byte], Persistent]],
val segmentBlockCache: SegmentBlockCache)(implicit keyValueMemorySweeper: Option[MemorySweeper.KeyValue],
keyOrder: KeyOrder[Slice[Byte]]) extends SegmentRefOption with LazyLogging {
Expand Down
46 changes: 6 additions & 40 deletions core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import java.nio.file.Paths
import java.util.concurrent.TimeUnit

import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.FileSweeper
import swaydb.core.actor.MemorySweeper
import swaydb.core.actor.{FileSweeper, MemorySweeper}
import swaydb.core.function.FunctionStore
import swaydb.core.io.file.{BlockCache, Effect, ForceSaveApplier}
import swaydb.core.util.{BlockCacheFileIDGenerator, Bytes, Extension, MinMax}
import swaydb.core.map.serializer.ValueSerializer.MinMaxSerialiser
import swaydb.core.util.{BlockCacheFileIDGenerator, Bytes, Extension}
import swaydb.data.MaxKey
import swaydb.data.config.MMAP
import swaydb.data.order.{KeyOrder, TimeOrder}
Expand Down Expand Up @@ -96,22 +96,7 @@ private[core] object SegmentSerialiser {
.addAll(maxKeyBytes)
.addUnsignedLong(segment.nearestPutDeadline.valueOrElse(_.time.toNanos, 0L))

segment.minMaxFunctionId match {
case Some(minMaxFunctionId) =>
bytes addUnsignedInt minMaxFunctionId.min.size
bytes addAll minMaxFunctionId.min
minMaxFunctionId.max match {
case Some(max) =>
bytes addUnsignedInt max.size
bytes addAll max

case None =>
bytes addUnsignedInt 0
}

case None =>
bytes addUnsignedInt 0
}
MinMaxSerialiser.write(segment.minMaxFunctionId, bytes)
}

def read(reader: ReaderBase[Byte],
Expand Down Expand Up @@ -160,17 +145,7 @@ private[core] object SegmentSerialiser {
Some(Deadline((deadlineNanos, TimeUnit.NANOSECONDS)))
}

val minMaxFunctionId = {
val minIdSize = reader.readUnsignedInt()
if (minIdSize == 0)
None
else {
val minId = reader.read(minIdSize)
val maxIdSize = reader.readUnsignedInt()
val maxId = if (maxIdSize == 0) None else Some(reader.read(maxIdSize))
Some(MinMax(minId, maxId))
}
}
val minMaxFunctionId = MinMaxSerialiser.read(reader)

val fileType = Effect.numberFileId(segmentPath)._2

Expand Down Expand Up @@ -205,16 +180,7 @@ private[core] object SegmentSerialiser {
}

val minMaxFunctionIdBytesRequires =
segment.minMaxFunctionId match {
case Some(minMax) =>
Bytes.sizeOfUnsignedInt(minMax.min.size) +
minMax.min.size +
Bytes.sizeOfUnsignedInt(minMax.max.valueOrElse(_.size, 0)) +
minMax.max.valueOrElse(_.size, 0)

case None =>
1
}
MinMaxSerialiser.bytesRequired(segment.minMaxFunctionId)

ByteSizeOf.byte + //formatId
ByteSizeOf.byte + //segmentFormatId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import swaydb.core.segment.format.a.block.sortedindex.SortedIndexBlock
import swaydb.core.segment.format.a.block.values.ValuesBlock
import swaydb.core.segment.merge.MergeStats
import swaydb.core.segment.merge.MergeStats.Persistent
import swaydb.core.segment.{PersistentSegmentMany, PersistentSegmentOne, SegmentRef}
import swaydb.core.util.{Bytes, Collections}
import swaydb.core.segment.{PersistentSegmentMany, PersistentSegmentOne}
import swaydb.core.util.{Bytes, Collections, MinMax}
import swaydb.data.config._
import swaydb.data.order.KeyOrder
import swaydb.data.slice.Slice
Expand Down Expand Up @@ -216,8 +216,12 @@ private[core] case object SegmentBlock extends LazyLogging {
val listKeyValue: Persistent.Builder[Memory, Slice] =
MergeStats.persistent(Slice.newAggregator(segments.size * 2))

var minMaxFunctionId = Option.empty[MinMax[Slice[Byte]]]

segments.foldLeft(0) {
case (offset, segment) =>
minMaxFunctionId = MinMax.minMaxFunctionOption(segment.minMaxFunctionId, minMaxFunctionId)

val segmentSize = segment.segmentSize
listKeyValue addAll segment.toKeyValue(offset, segmentSize)
offset + segmentSize
Expand Down Expand Up @@ -262,9 +266,7 @@ private[core] case object SegmentBlock extends LazyLogging {
TransientSegment.Many(
minKey = segments.head.minKey,
maxKey = segments.last.maxKey,
//minMaxFunctionId is not stored in Many. All functionId request should be deferred
//onto the SegmentRefs itself.
minMaxFunctionId = None,
minMaxFunctionId = minMaxFunctionId,
fileHeader = fileHeader,
nearestPutDeadline = listSegment.nearestPutDeadline,
listSegment = listSegment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ sealed trait TransientSegment {
def maxKey: MaxKey[Slice[Byte]]
def hasEmptyByteSlice: Boolean
def nearestPutDeadline: Option[Deadline]
def minMaxFunctionId: Option[MinMax[Slice[Byte]]]
def segmentSize: Int
}

Expand Down Expand Up @@ -80,6 +81,9 @@ object TransientSegment {
override def nearestPutDeadline: Option[Deadline] =
segmentRef.nearestPutDeadline

override def minMaxFunctionId: Option[MinMax[Slice[Byte]]] =
segmentRef.minMaxFunctionId

override def hasEmptyByteSlice: Boolean =
fileHeader.isEmpty || hasEmptyByteSliceIgnoreHeader

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import java.nio.file.Path
import swaydb.core.actor.MemorySweeper
import swaydb.core.data.{Memory, Persistent, Time, Value}
import swaydb.core.io.reader.Reader
import swaydb.core.map.serializer.ValueSerializer.MinMaxSerialiser
import swaydb.core.segment.format.a.block.binarysearch.BinarySearchIndexBlock
import swaydb.core.segment.format.a.block.bloomfilter.BloomFilterBlock
import swaydb.core.segment.format.a.block.hashindex.HashIndexBlock
Expand All @@ -50,11 +51,15 @@ object TransientSegmentSerialiser {
size: Int): Slice[Memory] =
singleton.maxKey match {
case MaxKey.Fixed(maxKey) =>
val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2))
val minMaxFunctionBytesSize = MinMaxSerialiser.bytesRequired(singleton.minMaxFunctionId)

val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2) + minMaxFunctionBytesSize)
value add 0 //fixed maxKey id
value addUnsignedInt offset
value addUnsignedInt size

MinMaxSerialiser.write(singleton.minMaxFunctionId, value)

/**
* - nearestDeadline is stored so that the parent many segment knows which segment to refresh.
* - minMaxFunctionIds are not stored here. All request for mightContainFunction are deferred onto the SegmentRef itself.
Expand All @@ -69,10 +74,13 @@ object TransientSegmentSerialiser {
)

case MaxKey.Range(fromKey, maxKey) =>
val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2) + fromKey.size)
val minMaxFunctionBytesSize = MinMaxSerialiser.bytesRequired(singleton.minMaxFunctionId)

val value = Slice.of[Byte](ByteSizeOf.byte + (ByteSizeOf.varInt * 2) + minMaxFunctionBytesSize + fromKey.size)
value add 1 //range maxKey id
value addUnsignedInt offset
value addUnsignedInt size
MinMaxSerialiser.write(singleton.minMaxFunctionId, value)
value addAll fromKey

if (singleton.minKey equals maxKey) {
Expand Down Expand Up @@ -107,11 +115,13 @@ object TransientSegmentSerialiser {
if (maxKeyId == 0) {
val segmentOffset = valueReader.readUnsignedInt()
val segmentSize = valueReader.readUnsignedInt()
val minMaxFunctionId = MinMaxSerialiser.read(valueReader)
SegmentRef(
path = path.resolve(s".ref.$segmentOffset"),
minKey = range.fromKey.unslice(),
maxKey = MaxKey.Fixed(range.toKey.unslice()),
nearestPutDeadline = deadline,
minMaxFunctionId = minMaxFunctionId,
blockRef =
BlockRefReader(
ref = reader,
Expand All @@ -129,12 +139,14 @@ object TransientSegmentSerialiser {
} else if (maxKeyId == 1) {
val segmentOffset = valueReader.readUnsignedInt()
val segmentSize = valueReader.readUnsignedInt()
val minMaxFunctionId = MinMaxSerialiser.read(valueReader)
val maxKeyMinKey = valueReader.readRemaining()
SegmentRef(
path = path.resolve(s".ref.$segmentOffset"),
minKey = range.fromKey.unslice(),
maxKey = MaxKey.Range(maxKeyMinKey.unslice(), range.toKey.unslice()),
nearestPutDeadline = deadline,
minMaxFunctionId = minMaxFunctionId,
blockRef =
BlockRefReader(
ref = reader,
Expand Down Expand Up @@ -174,11 +186,13 @@ object TransientSegmentSerialiser {
if (maxKeyId == 0) {
val segmentOffset = valueReader.readUnsignedInt()
val segmentSize = valueReader.readUnsignedInt()
val minMaxFunctionId = MinMaxSerialiser.read(valueReader)
SegmentRef(
path = path.resolve(s".ref.$segmentOffset"),
minKey = put.key,
maxKey = MaxKey.Fixed(put.key.unslice()),
nearestPutDeadline = put.deadline,
minMaxFunctionId = minMaxFunctionId,
blockRef =
BlockRefReader(
ref = reader,
Expand All @@ -196,12 +210,14 @@ object TransientSegmentSerialiser {
} else if (maxKeyId == 1) {
val segmentOffset = valueReader.readUnsignedInt()
val segmentSize = valueReader.readUnsignedInt()
val minMaxFunctionId = MinMaxSerialiser.read(valueReader)
val maxKeyMinKey = valueReader.readRemaining()
SegmentRef(
path = path.resolve(s".ref.$segmentOffset"),
minKey = put.key.unslice(),
maxKey = MaxKey.Range(maxKeyMinKey.unslice(), put.key.unslice()),
nearestPutDeadline = put.deadline,
minMaxFunctionId = minMaxFunctionId,
blockRef =
BlockRefReader(
ref = reader,
Expand Down
Loading

0 comments on commit e23fb0d

Please sign in to comment.