Skip to content

Commit

Permalink
Split ByteBufferSweeper
Browse files Browse the repository at this point in the history
  • Loading branch information
simerplaha committed Nov 17, 2021
1 parent 31937db commit de0dfb5
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 200 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/swaydb/core/file/DBFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import swaydb.Error.IO.ExceptionHandler
import swaydb.config.ForceSave
import swaydb.core.cache.Cache
import swaydb.core.file.sweeper.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.file.sweeper.{ByteBufferSweeper, FileSweeper, FileSweeperCommand, FileSweeperItem}
import swaydb.core.file.sweeper.{ByteBufferCommand, FileSweeper, FileSweeperCommand, FileSweeperItem}
import swaydb.effect.{Effect, IOStrategy, Reserve}
import swaydb.slice.{Slice, SliceRO}
import swaydb.{Error, IO}
Expand Down Expand Up @@ -335,15 +335,15 @@ class DBFile(val path: Path,
//If the file is already closed, then delete it from disk.
//memory files are never closed so the first statement will always be executed for memory files.
if (deleteAfterClean)
bufferCleaner.actor send ByteBufferSweeper.Command.DeleteFile(path)
bufferCleaner.actor send ByteBufferCommand.DeleteFile(path)
else
file.delete()
}

case None =>
IO {
if (deleteAfterClean)
bufferCleaner.actor send ByteBufferSweeper.Command.DeleteFile(path)
bufferCleaner.actor send ByteBufferCommand.DeleteFile(path)
else
Effect.deleteIfExists(path)
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/swaydb/core/file/MMAPFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package swaydb.core.file

import com.typesafe.scalalogging.LazyLogging
import swaydb.config.ForceSave
import swaydb.core.file.sweeper.ByteBufferSweeper
import swaydb.core.file.sweeper.ByteBufferCommand
import swaydb.core.file.sweeper.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.effect.{Effect, Reserve}
import swaydb.slice.{Slice, SliceRO, Slices}
Expand Down Expand Up @@ -161,7 +161,7 @@ private[file] class MMAPFile(val path: Path,
buffer = null

cleaner.actor send
ByteBufferSweeper.Command.Clean(
ByteBufferCommand.Clean(
buffer = swapBuffer,
hasReference = hasReference _,
forced = forced,
Expand Down Expand Up @@ -352,7 +352,7 @@ private[file] class MMAPFile(val path: Path,
watchNullPointer {
close()
if (deleteAfterClean)
cleaner.actor send ByteBufferSweeper.Command.DeleteFile(path)
cleaner.actor send ByteBufferCommand.DeleteFile(path)
else
Effect.delete(path)
}
Expand Down
129 changes: 129 additions & 0 deletions core/src/main/scala/swaydb/core/file/sweeper/ByteBufferCommand.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package swaydb.core.file.sweeper

import swaydb.config.ForceSave
import swaydb.core.file.ForceSaveApplier
import swaydb.core.file.sweeper.ByteBufferSweeper.{ByteBufferSweeperActor, State}
import swaydb.{Actor, ActorRef}

import java.nio.MappedByteBuffer
import java.nio.file.Path
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

/**
* Actor commands.
*/
sealed trait ByteBufferCommand {
def name: String
}

object ByteBufferCommand {
sealed trait FileCommand extends ByteBufferCommand {
def filePath: Path
}

object Clean {
private val idGenerator = new AtomicLong(0)

def apply(buffer: MappedByteBuffer,
hasReference: () => Boolean,
forced: AtomicBoolean,
filePath: Path,
forceSave: ForceSave.MMAPFiles)(implicit forceSaveApplier: ForceSaveApplier): Clean =
new Clean(
buffer = buffer,
filePath = filePath,
isRecorded = false,
hasReference = hasReference,
forced = forced,
forceSave = forceSave,
forceSaveApplier = forceSaveApplier,
//this id is being used instead of HashCode because nio.FileChannel returns
//the same MappedByteBuffer even after the FileChannel is closed.
id = idGenerator.incrementAndGet()
)
}

/**
* Cleans memory-mapped byte buffer.
*
* @param buffer The memory-mapped ByteBuffer to clean.
* @param filePath ByteBuffer's file path
* @param isRecorded Indicates if the [[State]] has recorded this request.
* @param hasReference Indicates if the ByteBuffer is currently being read by another thread.
* @param id Unique ID of this command.
*/
case class Clean private(buffer: MappedByteBuffer,
filePath: Path,
isRecorded: Boolean,
hasReference: () => Boolean,
forced: AtomicBoolean,
forceSave: ForceSave.MMAPFiles,
forceSaveApplier: ForceSaveApplier,
id: Long) extends FileCommand {
override def name: String = s"Clean: $filePath"
}

sealed trait DeleteCommand extends FileCommand {
def deleteTries: Int

def copyWithDeleteTries(deleteTries: Int): ByteBufferCommand
}

/**
* Deletes a file.
*/
case class DeleteFile(filePath: Path,
deleteTries: Int = 0) extends DeleteCommand {
override def name: String = s"DeleteFile: $filePath"

override def copyWithDeleteTries(deleteTries: Int): ByteBufferCommand =
copy(deleteTries = deleteTries)
}

/**
* Deletes the folder ensuring that file is cleaned first.
*/
case class DeleteFolder(folderPath: Path, filePath: Path, deleteTries: Int = 0) extends DeleteCommand {
override def name: String = s"DeleteFolder. Folder: $folderPath. File: $filePath"

override def copyWithDeleteTries(deleteTries: Int): ByteBufferCommand =
copy(deleteTries = deleteTries)
}

/**
* Checks if the file is cleaned.
*
* [[ByteBufferSweeperActor]] is a timer actor so [[IsClean]] will also get
* executed based on the [[Actor.interval]]. But terminating the Actor and then
* requesting this will return immediate response.
*/
case class IsClean[T](filePath: Path)(val replyTo: ActorRef[Boolean, T]) extends ByteBufferCommand {
override def name: String = s"IsClean: $filePath"
}

/**
* Checks if all files are cleaned.
*
* [[ByteBufferSweeperActor]] is a timer actor so [[IsAllClean]] will also get
* executed based on the [[Actor.interval]]. But terminating the Actor and then
* requesting this will return immediate response.
*/
case class IsAllClean[T](replyTo: ActorRef[Boolean, T]) extends ByteBufferCommand {
override def name: String = s"IsAllClean"
}


object IsTerminated {
def apply[T](replyTo: ActorRef[Boolean, T]): IsTerminated[T] = new IsTerminated(resubmitted = false)(replyTo)
}

/**
* Checks if the actor is terminated and has executed all [[ByteBufferCommand.Clean]] requests.
*
* @note The Actor should be terminated and [[Actor.receiveAllForce()]] should be
* invoked for this to return true otherwise the response is always false.
*/
case class IsTerminated[T] private(resubmitted: Boolean)(val replyTo: ActorRef[Boolean, T]) extends ByteBufferCommand {
override def name: String = s"IsTerminated. resubmitted = $resubmitted"
}
}
Loading

0 comments on commit de0dfb5

Please sign in to comment.