Skip to content

Commit

Permalink
Various changes while working on async reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
propensive committed Apr 7, 2024
1 parent a0f259b commit cb53a1e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
6 changes: 6 additions & 0 deletions src/core/funnel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ class Funnel[ItemType]():

class Gun() extends Funnel[Unit]():
def fire(): Unit = put(())

def funnel[ItemType](using DummyImplicit)[ResultType](lambda: Funnel[ItemType] => ResultType)
: ResultType =

val funnel: Funnel[ItemType] = Funnel()
try lambda(funnel) finally funnel.stop()
3 changes: 2 additions & 1 deletion src/core/stdio.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object Out:

object In:
def read(bytes: Array[Byte])(using stdio: Stdio): Int = stdio.read(bytes)
def close()(using stdio: Stdio): Unit = stdio.in.close()

object Stdio:
def apply
Expand Down Expand Up @@ -122,7 +123,7 @@ trait Stdio extends Io:
val err: ji.PrintStream
val in: ji.InputStream

protected[turbulence] lazy val reader: ji.Reader = ji.InputStreamReader(in, "UTF-8")
protected[turbulence] lazy val reader: ji.Reader = ji.InputStreamReader(in)

def write(bytes: Bytes): Unit = out.write(bytes.mutable(using Unsafe), 0, bytes.length)
def print(text: Text): Unit = out.print(text.s)
Expand Down
8 changes: 4 additions & 4 deletions src/core/streaming.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ object Readable:

given reliableInputStream: Readable[ji.InputStream, Bytes] = in =>
val channel: jn.channels.ReadableByteChannel = jn.channels.Channels.newChannel(in).nn
val buf: jn.ByteBuffer = jn.ByteBuffer.wrap(new Array[Byte](4096)).nn
val buf: jn.ByteBuffer = jn.ByteBuffer.wrap(new Array[Byte](1024)).nn

def recur(): LazyList[Bytes] =
try channel.read(buf) match
Expand All @@ -182,7 +182,7 @@ object Readable:

case count =>
buf.flip()
val size: Int = count.min(4096)
val size: Int = count.min(1024)
val array: Array[Byte] = new Array[Byte](size)
buf.get(array)
buf.clear()
Expand All @@ -195,7 +195,7 @@ object Readable:

given inputStream(using streamCut: Raises[StreamError]): Readable[ji.InputStream, Bytes] = in =>
val channel: jn.channels.ReadableByteChannel = jn.channels.Channels.newChannel(in).nn
val buf: jn.ByteBuffer = jn.ByteBuffer.wrap(new Array[Byte](4096)).nn
val buf: jn.ByteBuffer = jn.ByteBuffer.wrap(new Array[Byte](1024)).nn

def recur(total: Long): LazyList[Bytes] =
try channel.read(buf) match
Expand All @@ -204,7 +204,7 @@ object Readable:

case count =>
buf.flip()
val size: Int = count.min(4096)
val size: Int = count.min(1024)
val array: Array[Byte] = new Array[Byte](size)
buf.get(array)
buf.clear()
Expand Down

0 comments on commit cb53a1e

Please sign in to comment.