Skip to content

Commit

Permalink
removed Future.boundedTraverse() & moved common test functions to Tes…
Browse files Browse the repository at this point in the history
…tKit
  • Loading branch information
simerplaha committed Nov 19, 2021
1 parent 2e093fe commit 5d1b402
Show file tree
Hide file tree
Showing 155 changed files with 383 additions and 608 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import org.scalamock.scalatest.MockFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import swaydb.Error.Segment.ExceptionHandler
import swaydb.effect.Base._
import swaydb.effect.{IOStrategy, Reserve}
import swaydb.testkit.RunThis._
import swaydb.testkit.TestKit._
import swaydb.{Error, IO}

import scala.annotation.tailrec
Expand Down
2 changes: 1 addition & 1 deletion core-cache/src/test/scala/swaydb/core/cache/LazySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.scalamock.scalatest.MockFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import swaydb.IO
import swaydb.effect.Base._
import swaydb.testkit.RunThis._
import swaydb.testkit.TestKit._

import scala.collection.parallel.CollectionConverters._
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package swaydb.config.compaction

import swaydb.config.compaction.CompactionConfig.CompactionParallelism

import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext

Expand All @@ -26,166 +24,27 @@ object CompactionConfig {
def create(resetCompactionPriorityAtInterval: Int,
actorExecutionContext: ExecutorService,
compactionExecutionContext: ExecutorService,
levelZeroFlattenParallelism: Int,
levelZeroMergeParallelism: Int,
multiLevelTaskParallelism: Int,
levelSegmentAssignmentParallelism: Int,
groupedSegmentDefragParallelism: Int,
defragmentedSegmentParallelism: Int,
pushStrategy: PushStrategy): CompactionConfig =
CompactionConfig(
resetCompactionPriorityAtInterval = resetCompactionPriorityAtInterval,
actorExecutionContext = ExecutionContext.fromExecutorService(actorExecutionContext),
compactionExecutionContext = ExecutionContext.fromExecutorService(compactionExecutionContext),
levelZeroFlattenParallelism = levelZeroFlattenParallelism,
levelZeroMergeParallelism = levelZeroMergeParallelism,
multiLevelTaskParallelism = multiLevelTaskParallelism,
levelSegmentAssignmentParallelism = levelSegmentAssignmentParallelism,
groupedSegmentDefragParallelism = groupedSegmentDefragParallelism,
defragmentedSegmentParallelism = defragmentedSegmentParallelism,
pushStrategy = pushStrategy
)

def apply(resetCompactionPriorityAtInterval: Int,
actorExecutionContext: ExecutionContext,
compactionExecutionContext: ExecutionContext,
levelZeroFlattenParallelism: Int,
levelZeroMergeParallelism: Int,
multiLevelTaskParallelism: Int,
levelSegmentAssignmentParallelism: Int,
groupedSegmentDefragParallelism: Int,
defragmentedSegmentParallelism: Int,
pushStrategy: PushStrategy): CompactionConfig =
if (resetCompactionPriorityAtInterval <= 0)
throw new Exception(s"Invalid resetCompactionPriorityAtInterval $resetCompactionPriorityAtInterval. Should be greater than zero.")
else if (levelZeroFlattenParallelism <= 0)
throw new Exception(s"Invalid levelZeroFlattenParallelism $levelZeroFlattenParallelism. Should be greater than zero.")
else if (levelZeroMergeParallelism <= 0)
throw new Exception(s"Invalid levelZeroMergeParallelism $levelZeroMergeParallelism. Should be greater than zero.")
else if (multiLevelTaskParallelism <= 0)
throw new Exception(s"Invalid multiLevelTaskParallelism $multiLevelTaskParallelism. Should be greater than zero.")
else if (levelSegmentAssignmentParallelism <= 0)
throw new Exception(s"Invalid levelSegmentAssignmentParallelism $levelSegmentAssignmentParallelism. Should be greater than zero.")
else if (groupedSegmentDefragParallelism <= 0)
throw new Exception(s"Invalid groupedSegmentDefragParallelism $groupedSegmentDefragParallelism. Should be greater than zero.")
else if (defragmentedSegmentParallelism <= 0)
throw new Exception(s"Invalid defragmentedSegmentParallelism $defragmentedSegmentParallelism. Should be greater than zero.")
else
new CompactionConfig(
resetCompactionPriorityAtInterval = resetCompactionPriorityAtInterval,
actorExecutionContext = actorExecutionContext,
compactionExecutionContext = compactionExecutionContext,
levelZeroFlattenParallelism = levelZeroFlattenParallelism,
levelZeroMergeParallelism = levelZeroMergeParallelism,
multiLevelTaskParallelism = multiLevelTaskParallelism,
levelSegmentAssignmentParallelism = levelSegmentAssignmentParallelism,
groupedSegmentDefragParallelism = groupedSegmentDefragParallelism,
defragmentedSegmentParallelism = defragmentedSegmentParallelism,
pushStrategy = pushStrategy
)

object CompactionParallelism {
def availableProcessors(): CompactionParallelism =
new CompactionParallelism {
val cores = Runtime.getRuntime.availableProcessors()

override def levelZeroFlattenParallelism: Int =
cores

override def levelZeroMergeParallelism: Int =
cores

override def levelSegmentAssignmentParallelism: Int =
cores

override def groupedSegmentDefragParallelism: Int =
cores

override def defragmentedSegmentParallelism: Int =
cores

override def multiLevelTaskParallelism: Int =
cores
}
}

trait CompactionParallelism {
/**
* LevelZero can have multiple overlapping log files. This sets
* the parallelism for each group of overlapping group of key-values.
*
* Eg: if there are logs with the following keys
* Log1 - 10 - 20
* Log2 - 30 - 40
* Log3 - 1 - 40
*
* The above when flattened/assigned will create two groups of overlapping key-values.
* Group1 - 10 - 20
* - 1 - 20
*
* Group2 - 30 - 40
* - 21 - 40
*
* This configuration sets the parallelism of each group to execute merge.
* The parallelism of merge per group is set via [[levelZeroMergeParallelism]].
*/
def levelZeroFlattenParallelism: Int

/**
* Each group (as mentioned in [[levelZeroFlattenParallelism]]) can contain
* multiple stacks of overlapping key-values. For example if there are 4 sets
* of overlapping key-values
*
* key-values1 - 10 - 20
* key-values2 - 1 - 20
* key-values3 - 3-5
* key-values4 2 - 10
*
* This will perform merge by in groups of 2
*
* The parallelism will be executed as follows.
* key-values-1 & key-values-2 ----
* - | ---- merged-key-Values-1-2
* - | --------> final-merged-key-values-1-2-3-4
* - | ---- merged-key-values-3-4
* key-values-3 & key-values-4 ----
*
*/
def levelZeroMergeParallelism: Int

/**
* Compaction can assign merge tasks to multiple Levels.
*
* Eg: if a flattened log file from LevelZero ([[levelZeroFlattenParallelism]])
* results in multiple groups that can be compacted into lower levels Level1, Level2 & Level3
* then we can control this multi level concurrency via this configuration.
*/
def multiLevelTaskParallelism: Int

/**
* Compaction can submit merge where there are multiple Segments
* overlapping the new key-values. This sets the number of Segments
* to merge concurrently.
*/
def levelSegmentAssignmentParallelism: Int

/**
* Applies to Segments of format [[swaydb.config.SegmentFormat.Grouped]]
* where each Segment store a group of key-values per Segment file.
*
* Each group can be defragmented & merged in parallel after the new key-values
* are assigned to their respective groups.
*/
def groupedSegmentDefragParallelism: Int

/**
* The above [[groupedSegmentDefragParallelism]] can result in multiple groups
* of new key-values and remote Segment instance which can be grouped to create
* new [[swaydb.config.SegmentFormat.Grouped]] Segments.
*/
def defragmentedSegmentParallelism: Int

}
}
/**
* Configures Compaction strategy.
Expand All @@ -199,10 +58,4 @@ object CompactionConfig {
case class CompactionConfig private(resetCompactionPriorityAtInterval: Int,
actorExecutionContext: ExecutionContext,
compactionExecutionContext: ExecutionContext,
levelZeroFlattenParallelism: Int,
levelZeroMergeParallelism: Int,
multiLevelTaskParallelism: Int,
levelSegmentAssignmentParallelism: Int,
groupedSegmentDefragParallelism: Int,
defragmentedSegmentParallelism: Int,
pushStrategy: PushStrategy) extends CompactionParallelism
pushStrategy: PushStrategy)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import swaydb.core.segment.block.BlockCache
import swaydb.core.segment.cache.sweeper.MemorySweeper
import swaydb.core.{TestBase, TestCaseSweeper}
import swaydb.utils.StorageUnits._
import swaydb.testkit.TestKit._

class BlockCachePerformanceSpec extends TestBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package swaydb.core.segment

import swaydb.Benchmark
import swaydb.config.SegmentRefCacheLife
import swaydb.config.compaction.CompactionConfig.CompactionParallelism
import swaydb.core.TestCaseSweeper._
import swaydb.core.TestData._
import swaydb.core.file.ForceSaveApplier
Expand Down Expand Up @@ -51,7 +50,6 @@ class SegmentReadPerformanceSpec extends TestBase {
implicit val keyOrder = KeyOrder.default
implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long
implicit val ec = TestExecutionContext.executionContext
implicit val compactionParallelism: CompactionParallelism = CompactionParallelism.availableProcessors()

val keyValuesCount = 1000000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package swaydb.core.series.map

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import swaydb.effect.Base._
import swaydb.testkit.TestKit._

class ProbeLimitHashMap extends LimitHashMapSpec {
def createMap[K, V >: Null](limit: Integer) = LimitHashMap[K, V](limit, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ import swaydb.config.repairAppendix.AppendixRepairStrategy._
import swaydb.config.repairAppendix.{AppendixRepairStrategy, OverlappingSegmentsException, SegmentInfoUnTyped}
import swaydb.config.{ForceSave, MMAP}
import swaydb.core.file.ForceSaveApplier
import swaydb.core.file.sweeper.bytebuffer.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.file.sweeper.FileSweeper
import swaydb.core.file.sweeper.bytebuffer.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.level.AppendixLogCache
import swaydb.core.log.serialiser.LogEntryWriter
import swaydb.core.log.{Log, LogEntry}
import swaydb.core.segment.Segment
import swaydb.core.segment.cache.sweeper.MemorySweeper
import swaydb.effect.{Effect, Extension}
import swaydb.effect.Effect
import swaydb.slice.Slice
import swaydb.slice.SliceIOImplicits._
import swaydb.slice.order.KeyOrder
import swaydb.utils.Extension
import swaydb.utils.StorageUnits._

import java.nio.file.Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package swaydb.core.tool
import swaydb.Glass
import swaydb.IOValues._
import swaydb.config.MMAP
import swaydb.config.compaction.CompactionConfig.CompactionParallelism
import swaydb.config.compaction.LevelThrottle
import swaydb.config.repairAppendix.{AppendixRepairStrategy, OverlappingSegmentsException}
import swaydb.core.CommonAssertions._
Expand All @@ -31,20 +30,19 @@ import swaydb.effect.Effect._
import swaydb.slice.Slice
import swaydb.slice.order.{KeyOrder, TimeOrder}
import swaydb.testkit.RunThis._
import swaydb.testkit.TestKit._
import swaydb.utils.OperatingSystem
import swaydb.utils.StorageUnits._

import java.nio.file.NoSuchFileException
import scala.concurrent.duration.{Duration, DurationInt}
import scala.util.Random


class AppendixRepairerSpec extends TestBase {

implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default
implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long
implicit val ec = TestExecutionContext.executionContext
implicit val compactionParallelism: CompactionParallelism = CompactionParallelism.availableProcessors()

"AppendixRepair" should {
"fail if the input path does not exist" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package swaydb.core.compaction.task.assigner

import swaydb.config.compaction.CompactionConfig.CompactionParallelism
import swaydb.config.compaction.PushStrategy
import swaydb.core.compaction.task.CompactionTask
import swaydb.core.compaction.task.CompactionTask.CompactLogs
Expand All @@ -28,8 +27,8 @@ import swaydb.core.segment.assigner.Assignable
import swaydb.core.segment.data.merge.KeyValueMerger
import swaydb.core.segment.data.merge.stats.MergeStats
import swaydb.core.segment.data.{KeyValue, Memory}
import swaydb.slice.{MaxKey, Slice}
import swaydb.slice.order.{KeyOrder, TimeOrder}
import swaydb.slice.{MaxKey, Slice}
import swaydb.utils.{Aggregator, Futures, NonEmptyList}

import java.util
Expand All @@ -53,8 +52,7 @@ case object LevelZeroTaskAssigner {

def run(source: LevelZero,
pushStrategy: PushStrategy,
lowerLevels: NonEmptyList[Level])(implicit ec: ExecutionContext,
parallelism: CompactionParallelism): Future[CompactLogs] = {
lowerLevels: NonEmptyList[Level])(implicit ec: ExecutionContext): Future[CompactLogs] = {
implicit val keyOrder: KeyOrder[Slice[Byte]] = source.keyOrder
implicit val timeOrder: TimeOrder[Slice[Byte]] = source.timeOrder
implicit val functionStore: FunctionStore = source.functionStore
Expand Down Expand Up @@ -91,11 +89,10 @@ case object LevelZeroTaskAssigner {
def flatten(input: IterableOnce[LevelZeroLog])(implicit ec: ExecutionContext,
keyOrder: KeyOrder[Slice[Byte]],
timerOrder: TimeOrder[Slice[Byte]],
functionStore: FunctionStore,
parallelism: CompactionParallelism): Future[Iterable[Assignable.Collection]] =
functionStore: FunctionStore): Future[Iterable[Assignable.Collection]] =
Future(createStacks(input)) flatMap {
stacks =>
Futures.traverseBounded(parallelism.levelZeroFlattenParallelism, stacks.values().asScala.map(_.stack))(mergeStack) map {
Future.traverse(stacks.values().asScala.map(_.stack))(mergeStack) map {
collection =>
collection map {
keyValues =>
Expand Down Expand Up @@ -319,8 +316,7 @@ case object LevelZeroTaskAssigner {
def mergeStack(stack: Iterable[Either[LevelZeroLog, Iterable[Memory]]])(implicit ec: ExecutionContext,
keyOrder: KeyOrder[Slice[Byte]],
timerOrder: TimeOrder[Slice[Byte]],
functionStore: FunctionStore,
parallelism: CompactionParallelism): Future[Iterable[Memory]] =
functionStore: FunctionStore): Future[Iterable[Memory]] =
if (stack.isEmpty)
Futures.emptyIterable
else if (stack.size == 1)
Expand All @@ -332,7 +328,7 @@ case object LevelZeroTaskAssigner {
Future.successful(keyValues)
}
else
Futures.traverseBounded(parallelism.levelZeroMergeParallelism, stack.grouped(2).toList) {
Future.traverse(stack.grouped(2).toList) {
group =>
if (group.size == 1)
Future.successful(group.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package swaydb.core.compaction.throttle

import com.typesafe.scalalogging.LazyLogging
import swaydb.DefActor
import swaydb.config.compaction.CompactionConfig.CompactionParallelism
import swaydb.core.compaction.Compactor
import swaydb.core.compaction.throttle.behaviour._
import swaydb.core.file.sweeper.FileSweeper
Expand All @@ -37,8 +36,7 @@ object ThrottleCompactor {
def apply(context: ThrottleCompactorContext)(implicit self: DefActor[ThrottleCompactor],
behaviorWakeUp: BehaviorWakeUp,
fileSweeper: FileSweeper.On,
ec: ExecutionContext,
parallelism: CompactionParallelism): ThrottleCompactor =
ec: ExecutionContext): ThrottleCompactor =
new ThrottleCompactor(
context = context,
currentFuture = Future.unit,
Expand All @@ -51,8 +49,7 @@ private[core] class ThrottleCompactor private(@volatile private var context: Thr
@volatile private var currentFutureExecuted: Boolean)(implicit self: DefActor[ThrottleCompactor],
behaviour: BehaviorWakeUp,
fileSweeper: FileSweeper.On,
executionContext: ExecutionContext,
parallelism: CompactionParallelism) extends Compactor with LazyLogging {
executionContext: ExecutionContext) extends Compactor with LazyLogging {
@inline private def tailWakeUpCall(nextFuture: => Future[ThrottleCompactorContext]): Unit =
//tail Future only if current future (wakeUp) was ignore else ignore. Not point tailing the same message.
if (currentFutureExecuted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ private[core] object ThrottleCompactorCreator extends CompactorCreator with Lazy
self = self,
behaviorWakeUp = BehaviorWakeUp,
fileSweeper = fileSweeper,
ec = config.compactionExecutionContext,
parallelism = config
ec = config.compactionExecutionContext
)
).onPreTerminate {
case (impl, _) =>
Expand Down
Loading

0 comments on commit 5d1b402

Please sign in to comment.