Skip to content

Commit

Permalink
#276 - transfer bytes.
Browse files Browse the repository at this point in the history
  • Loading branch information
simerplaha committed Nov 4, 2020
1 parent 2cc8ebd commit 68a81da
Show file tree
Hide file tree
Showing 19 changed files with 397 additions and 179 deletions.
22 changes: 15 additions & 7 deletions core/src/main/scala/swaydb/core/io/file/ChannelFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,21 @@ private[file] class ChannelFile(val path: Path,
def append(slice: Iterable[Slice[Byte]]): Unit =
Effect.writeUnclosed(channel, slice)

override def transfer(position: Long, count: Long, transferTo: DBFileType): Long =
Effect.transfer(
position = position,
count = count,
from = channel,
transferTo = transferTo.writeableChannel
)
override def transfer(position: Int, count: Int, transferTo: DBFileType): Int =
transferTo match {
case target: ChannelFile =>
Effect.transfer(
position = position,
count = count,
from = channel,
transferTo = target.writeableChannel
)

case target: MMAPFile =>
val bytes = read(position = position, size = count)
target.append(bytes)
bytes.size
}

def read(position: Int, size: Int): Slice[Byte] = {
val buffer = ByteBuffer.allocate(size)
Expand Down
36 changes: 35 additions & 1 deletion core/src/main/scala/swaydb/core/io/file/DBFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,40 @@ object DBFile extends LazyLogging {
file
}

def mmapWriteAndReadApplier(path: Path,
fileOpenIOStrategy: IOStrategy.ThreadSafe,
autoClose: Boolean,
deleteAfterClean: Boolean,
forceSave: ForceSave.MMAPFiles,
blockCacheFileId: Long,
bufferSize: Int,
applier: DBFile => Unit)(implicit fileSweeper: FileSweeper,
blockCache: Option[BlockCache.State],
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): DBFile = {
val file =
mmapInit(
path = path,
fileOpenIOStrategy = fileOpenIOStrategy,
bufferSize = bufferSize,
blockCacheFileId = blockCacheFileId,
autoClose = autoClose,
forceSave = forceSave,
deleteAfterClean = deleteAfterClean
)

try
applier(file)
catch {
case throwable: Throwable =>
logger.error(s"Failed to write MMAP file with applier. Closing file: $path", throwable)
file.close()
throw throwable
}

file
}

def mmapWriteAndRead(path: Path,
fileOpenIOStrategy: IOStrategy.ThreadSafe,
autoClose: Boolean,
Expand Down Expand Up @@ -424,7 +458,7 @@ class DBFile(val path: Path,
state = blockCache
)

def transfer(position: Long, count: Long, transferTo: DBFile): Long =
def transfer(position: Int, count: Int, transferTo: DBFile): Long =
file.transfer(
position = position,
count = count,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/swaydb/core/io/file/DBFileType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[file] trait DBFileType extends FileSweeperItem {

def append(slice: Iterable[Slice[Byte]]): Unit

def transfer(position: Long, count: Long, transferTo: DBFileType): Long
def transfer(position: Int, count: Int, transferTo: DBFileType): Int

def read(position: Int, size: Int): Slice[Byte]

Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/swaydb/core/io/file/Effect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,11 @@ private[core] object Effect extends LazyLogging {
throw swaydb.Exception.FailedToWriteAllBytes(written, bytes.size, bytes.size)
}

def transfer(position: Long, count: Long, from: FileChannel, transferTo: WritableByteChannel): Long =
from.transferTo(position, count, transferTo)
def transfer(position: Int, count: Int, from: FileChannel, transferTo: WritableByteChannel): Int = {
val transferCount = from.transferTo(position, count, transferTo)
assert(transferCount <= Int.MaxValue, s"$transferCount is not <= ${Int.MaxValue}")
transferCount.toInt
}

def copy(copyFrom: Path,
copyTo: Path): Path =
Expand Down
33 changes: 26 additions & 7 deletions core/src/main/scala/swaydb/core/io/file/MMAPFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,32 @@ private[file] class MMAPFile(val path: Path,
append(slice)
}

override def transfer(position: Long, count: Long, transferTo: DBFileType): Long =
Effect.transfer(
position = position,
count = count,
from = channel,
transferTo = transferTo.writeableChannel
)
override def transfer(position: Int, count: Int, transferTo: DBFileType): Int =
transferTo match {
case _: ChannelFile =>
//TODO - Is forceSave really required here? Can a buffer contain bytes that FileChannel is unaware of?
this.forceSave()

val transferred =
Effect.transfer(
position = position,
count = count,
from = channel,
transferTo = transferTo.writeableChannel
)

assert(transferred == count, s"$transferred != $count")

transferred

case target: MMAPFile =>
val duplicate = buffer.duplicate()
duplicate.position(position)
duplicate.limit(position + count)

target.buffer.put(duplicate)
count
}

def read(position: Int, size: Int): Slice[Byte] =
watchNullPointer {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/swaydb/core/io/reader/FileReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ private[core] class FileReader(val file: DBFile)(implicit val byteOps: ByteOps[B

override def getPosition: Int = position

def transfer(position: Int, count: Int, transferTo: DBFile): Unit =
file.transfer(position = position, count = count, transferTo = transferTo)

override def get() = {
val byte = file get position
position += 1
Expand Down
29 changes: 14 additions & 15 deletions core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,15 @@ protected case object PersistentSegmentMany {
implicit val blockMemorySweeper: Option[MemorySweeper.Block] = blockCache.map(_.sweeper)

val firstSegmentOffset =
segment.headerSize +
segment.segments.head.segmentSize
segment.fileHeader.size +
segment.listSegment.segmentSizeIgnoreHeader

//drop head ignoring the list block.
segment
.segments
.dropHead()
.foldLeft(firstSegmentOffset) {
case (offset, one) =>
val thisSegmentSize = one.segmentSize
case (offset, singleton) =>
val thisSegmentSize = singleton.segmentSize

val blockRef =
BlockRefReader(
Expand All @@ -102,20 +101,20 @@ protected case object PersistentSegmentMany {
val ref =
SegmentRef(
path = file.path.resolve(s".ref.$offset"),
minKey = one.minKey,
maxKey = one.maxKey,
nearestPutDeadline = one.nearestPutDeadline,
minKey = singleton.minKey,
maxKey = singleton.maxKey,
nearestPutDeadline = singleton.nearestPutDeadline,
blockRef = blockRef,
segmentIO = segmentIO,
valuesReaderCacheable = one.valuesUnblockedReader,
sortedIndexReaderCacheable = one.sortedIndexUnblockedReader,
hashIndexReaderCacheable = one.hashIndexUnblockedReader,
binarySearchIndexReaderCacheable = one.binarySearchUnblockedReader,
bloomFilterReaderCacheable = one.bloomFilterUnblockedReader,
footerCacheable = one.footerUnblocked
valuesReaderCacheable = singleton.valuesUnblockedReader,
sortedIndexReaderCacheable = singleton.sortedIndexUnblockedReader,
hashIndexReaderCacheable = singleton.hashIndexUnblockedReader,
binarySearchIndexReaderCacheable = singleton.binarySearchUnblockedReader,
bloomFilterReaderCacheable = singleton.bloomFilterUnblockedReader,
footerCacheable = singleton.footerUnblocked
)

skipList.put(one.minKey, ref)
skipList.put(singleton.minKey, ref)

offset + thisSegmentSize
}
Expand Down
29 changes: 17 additions & 12 deletions core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,31 @@ protected object PersistentSegmentOne {

val formatId = 126.toByte
val formatIdSlice: Slice[Byte] = Slice(formatId)
val formatIdSliceSlice: Slice[Slice[Byte]] = Slice(formatIdSlice)

def apply(file: DBFile,
createdInLevel: Int,
segment: TransientSegment.One)(implicit keyOrder: KeyOrder[Slice[Byte]],
timeOrder: TimeOrder[Slice[Byte]],
functionStore: FunctionStore,
keyValueMemorySweeper: Option[MemorySweeper.KeyValue],
blockCache: Option[BlockCache.State],
fileSweeper: FileSweeper,
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier,
segmentIO: SegmentIO): PersistentSegmentOne =
segment: TransientSegment.Singleton)(implicit keyOrder: KeyOrder[Slice[Byte]],
timeOrder: TimeOrder[Slice[Byte]],
functionStore: FunctionStore,
keyValueMemorySweeper: Option[MemorySweeper.KeyValue],
blockCache: Option[BlockCache.State],
fileSweeper: FileSweeper,
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier,
segmentIO: SegmentIO): PersistentSegmentOne =
PersistentSegmentOne(
file = file,
createdInLevel = createdInLevel,
minKey = segment.minKey,
maxKey = segment.maxKey,
minMaxFunctionId = segment.minMaxFunctionId,
minMaxFunctionId =
segment match {
case _: TransientSegment.Remote =>
None

case one: TransientSegment.One =>
one.minMaxFunctionId
},
segmentSize = segment.segmentSize,
nearestExpiryDeadline = segment.nearestPutDeadline,
valuesReaderCacheable = segment.valuesUnblockedReader,
Expand Down Expand Up @@ -423,5 +429,4 @@ protected case class PersistentSegmentOne(file: DBFile,

def cachedKeyValueSize: Int =
ref.cachedKeyValueSize

}
Loading

0 comments on commit 68a81da

Please sign in to comment.