Skip to content

Commit

Permalink
Renamed LazyList to Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
propensive committed Jan 24, 2025
1 parent 7ac6dea commit 5504599
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 153 deletions.
2 changes: 1 addition & 1 deletion src/core/soundness+turbulence-core.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export turbulence.{Aggregable, Compression, CompressionAlgorithm, Eof, Err, Spoo
SimpleWritable, Stdio, StreamError, Tap, Writable, Zlib, stream, read, writeTo,
deduplicate, rate, multiplexWith, regulate, cluster, parallelMap, multiplex, multiplexer, defer,
pulsar, gzip, gunzip, discard, compress, decompress, shred, chunked, take, spool, strict,
Conduit, inputStream, LazyListOutputStream}
Conduit, inputStream, StreamOutputStream}

package stdioSources:
export turbulence.stdioSources.mute
Expand Down
110 changes: 55 additions & 55 deletions src/core/turbulence-core.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import symbolism.*
import vacuous.*

extension [ValueType](value: ValueType)
def stream[ElementType](using readable: ValueType is Readable by ElementType): LazyList[ElementType] =
def stream[ElementType](using readable: ValueType is Readable by ElementType): Stream[ElementType] =
readable.stream(value)

inline def read[ResultType]: ResultType =
Expand Down Expand Up @@ -76,56 +76,56 @@ package stdioSources:

Stdio(stdout, stderr, stdin, termcapDefinitions.xterm256)

extension [ElementType](stream: LazyList[ElementType])
def deduplicate: LazyList[ElementType] =
def recur(last: ElementType, stream: LazyList[ElementType]): LazyList[ElementType] =
stream.flow(LazyList()):
extension [ElementType](stream: Stream[ElementType])
def deduplicate: Stream[ElementType] =
def recur(last: ElementType, stream: Stream[ElementType]): Stream[ElementType] =
stream.flow(Stream()):
if last == head then recur(last, tail) else head #:: recur(head, tail)

stream.flow(LazyList())(head #:: recur(head, tail))
stream.flow(Stream())(head #:: recur(head, tail))

inline def flow[ResultType](inline termination: => ResultType)
(inline proceed: (head: ElementType, tail: LazyList[ElementType]) ?=> ResultType)
(inline proceed: (head: ElementType, tail: Stream[ElementType]) ?=> ResultType)
: ResultType =
stream match
case head #:: tail => proceed(using head, tail)
case _ => termination

def strict: LazyList[ElementType] = stream.length yet stream
def strict: Stream[ElementType] = stream.length yet stream

def rate[DurationType: GenericDuration: SpecificDuration](duration: DurationType)
(using Monitor, Tactic[AsyncError])
: LazyList[ElementType] =
: Stream[ElementType] =

def recur(stream: LazyList[ElementType], last: Long): LazyList[ElementType] =
stream.flow(LazyList()):
def recur(stream: Stream[ElementType], last: Long): Stream[ElementType] =
stream.flow(Stream()):
val duration2 = SpecificDuration(duration.milliseconds - (System.currentTimeMillis - last))
if duration2.milliseconds > 0 then snooze(duration2)
stream

async(recur(stream, System.currentTimeMillis)).await()

def multiplexWith(that: LazyList[ElementType])(using Monitor): LazyList[ElementType] =
unsafely(LazyList.multiplex(stream, that))
def multiplexWith(that: Stream[ElementType])(using Monitor): Stream[ElementType] =
unsafely(Stream.multiplex(stream, that))

def regulate(tap: Tap)(using Monitor): LazyList[ElementType] =
def regulate(tap: Tap)(using Monitor): Stream[ElementType] =
def defer
(active: Boolean,
stream: LazyList[Some[ElementType] | Tap.Regulation],
stream: Stream[Some[ElementType] | Tap.Regulation],
buffer: List[ElementType])
: LazyList[ElementType] =
: Stream[ElementType] =

recur(active, stream, buffer)

@tailrec
def recur
(active: Boolean,
stream: LazyList[Some[ElementType] | Tap.Regulation],
stream: Stream[Some[ElementType] | Tap.Regulation],
buffer: List[ElementType])
: LazyList[ElementType] =
: Stream[ElementType] =

if active && buffer.nonEmpty then buffer.head #:: defer(true, stream, buffer.tail)
else if stream.isEmpty then LazyList()
else if stream.isEmpty then Stream()
else stream.head match
case Tap.Regulation.Start =>
recur(true, stream.tail, buffer)
Expand All @@ -137,35 +137,35 @@ extension [ElementType](stream: LazyList[ElementType])
if active then other.nn #:: defer(true, stream.tail, Nil)
else recur(false, stream.tail, other.nn :: buffer)

LazyList.defer(recur(true, stream.map(Some(_)).multiplexWith(tap.stream), Nil))
Stream.defer(recur(true, stream.map(Some(_)).multiplexWith(tap.stream), Nil))

def cluster[DurationType: GenericDuration](duration: DurationType, maxSize: Optional[Int] = Unset)
(using Monitor)
: LazyList[List[ElementType]] =
: Stream[List[ElementType]] =

val Limit = maxSize.or(Int.MaxValue)

def recur(stream: LazyList[ElementType], list: List[ElementType], count: Int)
: LazyList[List[ElementType]] =
def recur(stream: Stream[ElementType], list: List[ElementType], count: Int)
: Stream[List[ElementType]] =

count match
case 0 => safely(async(stream.isEmpty).await()) match
case Unset => recur(stream, Nil, 0)
case false => recur(stream.tail, stream.head :: list, count + 1)
case true => LazyList()
case true => Stream()

case Limit =>
list.reverse #:: recur(stream, Nil, 0)

case _ => safely(async(stream.isEmpty).await(duration)) match
case Unset => list.reverse #:: recur(stream, Nil, 0)
case false => recur(stream.tail, stream.head :: list, count + 1)
case true => LazyList(list.reverse)
case true => Stream(list.reverse)

LazyList.defer(recur(stream, Nil, 0))
Stream.defer(recur(stream, Nil, 0))

def parallelMap[ElementType2](lambda: ElementType => ElementType2)(using Monitor)
: LazyList[ElementType2] =
: Stream[ElementType2] =

val out: Spool[ElementType2] = Spool()

Expand All @@ -192,30 +192,30 @@ package lineSeparation:
case "\n" => linefeed
case _: String => adaptiveLinefeed

extension (obj: LazyList.type)
def multiplex[ElemType](streams: LazyList[ElemType]*)(using Monitor)
: LazyList[ElemType] =
extension (obj: Stream.type)
def multiplex[ElemType](streams: Stream[ElemType]*)(using Monitor)
: Stream[ElemType] =

multiplexer(streams*).stream

def multiplexer[ElemType](streams: LazyList[ElemType]*)(using Monitor)
def multiplexer[ElemType](streams: Stream[ElemType]*)(using Monitor)
: Multiplexer[Any, ElemType] =

val multiplexer = Multiplexer[Any, ElemType]()
streams.zipWithIndex.map(_.swap).each(multiplexer.add)
multiplexer

def defer[ElemType](stream: => LazyList[ElemType]): LazyList[ElemType] =
def defer[ElemType](stream: => Stream[ElemType]): Stream[ElemType] =
(null.asInstanceOf[ElemType] #:: stream).tail

def pulsar[DurationType: GenericDuration](duration: DurationType)(using Monitor): LazyList[Unit] =
def pulsar[DurationType: GenericDuration](duration: DurationType)(using Monitor): Stream[Unit] =
val startTime: Long = System.currentTimeMillis

def recur(iteration: Int): LazyList[Unit] =
def recur(iteration: Int): Stream[Unit] =
try
snooze(startTime + duration.milliseconds*iteration)
() #:: pulsar(duration)
catch case err: AsyncError => LazyList()
catch case err: AsyncError => Stream()

recur(0)

Expand Down Expand Up @@ -243,28 +243,28 @@ extension (bytes: Bytes)

out.toByteArray.nn.immutable(using Unsafe)

extension (stream: LazyList[Bytes])
def discard(memory: Memory): LazyList[Bytes] =
def recur(stream: LazyList[Bytes], count: Memory): LazyList[Bytes] = stream.flow(LazyList()):
extension (stream: Stream[Bytes])
def discard(memory: Memory): Stream[Bytes] =
def recur(stream: Stream[Bytes], count: Memory): Stream[Bytes] = stream.flow(Stream()):
if head.memory < count
then recur(tail, count - head.memory) else head.drop(count.long.toInt) #:: tail

recur(stream, memory)

def compress[CompressionType <: CompressionAlgorithm: Compression]: LazyList[Bytes] =
def compress[CompressionType <: CompressionAlgorithm: Compression]: Stream[Bytes] =
summon[Compression].compress(stream)

def decompress[CompressionType <: CompressionAlgorithm: Compression]: LazyList[Bytes] =
def decompress[CompressionType <: CompressionAlgorithm: Compression]: Stream[Bytes] =
summon[Compression].decompress(stream)

def shred(mean: Double, variance: Double)(using Randomization): LazyList[Bytes] =
def shred(mean: Double, variance: Double)(using Randomization): Stream[Bytes] =
stochastic:
given Distribution = Gamma.approximate(mean, variance)

def newArray(): Array[Byte] = new Array[Byte](arbitrary[Double]().toInt.max(1))

def recur(stream: LazyList[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int)
: LazyList[Bytes] =
def recur(stream: Stream[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int)
: Stream[Bytes] =

stream match
case source #:: more =>
Expand All @@ -282,16 +282,16 @@ extension (stream: LazyList[Bytes])
dest.immutable(using Unsafe) #:: recur(more, 0, newArray(), 0)

case _ =>
if destPos == 0 then LazyList()
else LazyList(dest.slice(0, destPos).immutable(using Unsafe))
if destPos == 0 then Stream()
else Stream(dest.slice(0, destPos).immutable(using Unsafe))

recur(stream, 0, newArray(), 0)

def chunked(size: Int, zeroPadding: Boolean = false): LazyList[Bytes] =
def chunked(size: Int, zeroPadding: Boolean = false): Stream[Bytes] =
def newArray(): Array[Byte] = new Array[Byte](size)

def recur(stream: LazyList[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int)
: LazyList[Bytes] =
def recur(stream: Stream[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int)
: Stream[Bytes] =

stream match
case source #:: more =>
Expand All @@ -309,22 +309,22 @@ extension (stream: LazyList[Bytes])
dest.immutable(using Unsafe) #:: recur(more, 0, newArray(), 0)

case _ =>
if destPos == 0 then LazyList()
else LazyList:
if destPos == 0 then Stream()
else Stream:
(if zeroPadding then dest else dest.slice(0, destPos)).immutable(using Unsafe)

recur(stream, 0, newArray(), 0)

def take(memory: Memory): LazyList[Bytes] =
def recur(stream: LazyList[Bytes], count: Memory): LazyList[Bytes] =
stream.flow(LazyList()):
def take(memory: Memory): Stream[Bytes] =
def recur(stream: Stream[Bytes], count: Memory): Stream[Bytes] =
stream.flow(Stream()):
if head.memory < count then head #:: recur(tail, count - head.memory)
else LazyList(head.take(count.long.toInt))
else Stream(head.take(count.long.toInt))

recur(stream, memory)

def inputStream: ji.InputStream = new ji.InputStream:
private var current: LazyList[Bytes] = stream
private var current: Stream[Bytes] = stream
private var offset: Int = 0
private var focus: Bytes = IArray.empty[Byte]

Expand Down
8 changes: 4 additions & 4 deletions src/core/turbulence.Aggregable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import vacuous.*

object Aggregable:
given bytesBytes: Bytes is Aggregable by Bytes = source =>
def recur(buf: ji.ByteArrayOutputStream, source: LazyList[Bytes]): Bytes =
def recur(buf: ji.ByteArrayOutputStream, source: Stream[Bytes]): Bytes =
source.flow(buf.toByteArray().nn.immutable(using Unsafe)):
buf.write(head.mutable(using Unsafe)); recur(buf, tail)

Expand All @@ -42,14 +42,14 @@ object Aggregable:

given stream: [ElementType, ElementType2]
=> (aggregable: ElementType2 is Aggregable by ElementType)
=> LazyList[ElementType2] is Aggregable by ElementType =
element => LazyList(aggregable.aggregate(element))
=> Stream[ElementType2] is Aggregable by ElementType =
element => Stream(aggregable.aggregate(element))

trait Aggregable:
aggregable =>
type Self
type Operand
def aggregate(source: LazyList[Operand]): Self
def aggregate(source: Stream[Operand]): Self

def map[SelfType2](lambda: Self => SelfType2): SelfType2 is Aggregable by Operand = source =>
lambda(aggregable.aggregate(source))
12 changes: 6 additions & 6 deletions src/core/turbulence.Compression.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ import vacuous.*

trait Compression:
type Self <: CompressionAlgorithm
def compress(stream: LazyList[Bytes]): LazyList[Bytes]
def decompress(stream: LazyList[Bytes]): LazyList[Bytes]
def compress(stream: Stream[Bytes]): Stream[Bytes]
def decompress(stream: Stream[Bytes]): Stream[Bytes]

object Compression:
given Gzip is Compression:
def compress(stream: LazyList[Bytes]): LazyList[Bytes] =
def compress(stream: Stream[Bytes]): Stream[Bytes] =
val out = ji.ByteArrayOutputStream()
val out2 = juz.GZIPOutputStream(out)

def recur(stream: LazyList[Bytes]): LazyList[Bytes] = stream match
def recur(stream: Stream[Bytes]): Stream[Bytes] = stream match
case head #:: tail =>
out2.write(head.mutable(using Unsafe))
if out.size == 0 then recur(tail) else
Expand All @@ -47,9 +47,9 @@ object Compression:

case _ =>
out2.close()
if out.size == 0 then LazyList() else LazyList(out.toByteArray().nn.immutable(using Unsafe))
if out.size == 0 then Stream() else Stream(out.toByteArray().nn.immutable(using Unsafe))

recur(stream)

def decompress(stream: LazyList[Bytes]): LazyList[Bytes] =
def decompress(stream: Stream[Bytes]): Stream[Bytes] =
unsafely(juz.GZIPInputStream(stream.inputStream).stream[Bytes])
6 changes: 3 additions & 3 deletions src/core/turbulence.Conduit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ object Conduit:
enum State:
case Data, Clutch, End

class Conduit(input: LazyList[Bytes]):
class Conduit(input: Stream[Bytes]):
private var current: Bytes = if input.isEmpty then Bytes() else input.head
private var stream: LazyList[Bytes] = if input.isEmpty then LazyList() else input.tail
private var stream: Stream[Bytes] = if input.isEmpty then Stream() else input.tail
private var index: Ordinal = Prim
private var done: Int = 0
private var clutch: Boolean = false

private var stream0: LazyList[Bytes] = stream
private var stream0: Stream[Bytes] = stream
private var current0: Bytes = current
private var index0: Ordinal = index
private var done0: Int = done
Expand Down
4 changes: 2 additions & 2 deletions src/core/turbulence.LazyListOutputStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import anticipation.*
import rudiments.*
import vacuous.*

class LazyListOutputStream() extends ji.OutputStream:
class StreamOutputStream() extends ji.OutputStream:
private val buffer: scm.ArrayBuffer[Byte] = scm.ArrayBuffer()
private val chunks: Spool[Bytes] = Spool()

def stream: LazyList[Bytes] = chunks.stream
def stream: Stream[Bytes] = chunks.stream
def write(int: Int): Unit = buffer.append(int.toByte)

override def close(): Unit = flush().also(chunks.stop())
Expand Down
10 changes: 5 additions & 5 deletions src/core/turbulence.Multiplexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ case class Multiplexer[KeyType, ElementType]()(using Monitor):
def close(): Unit = tasks.keys.each(remove(_))

@tailrec
private def pump(key: KeyType, stream: LazyList[ElementType])(using Worker): Unit =
private def pump(key: KeyType, stream: Stream[ElementType])(using Worker): Unit =
if stream.isEmpty then remove(key) else
relent()
queue.put(stream.head)
pump(key, stream.tail)

def add(key: KeyType, stream: LazyList[ElementType]): Unit = tasks(key) = async(pump(key, stream))
def add(key: KeyType, stream: Stream[ElementType]): Unit = tasks(key) = async(pump(key, stream))

private def remove(key: KeyType): Unit = synchronized:
tasks -= key
if tasks.isEmpty then queue.put(Multiplexer.Termination)

def stream: LazyList[ElementType] =
LazyList.continually(queue.take().nn).takeWhile(_ != Multiplexer.Termination)
. asInstanceOf[LazyList[ElementType]]
def stream: Stream[ElementType] =
Stream.continually(queue.take().nn).takeWhile(_ != Multiplexer.Termination)
. asInstanceOf[Stream[ElementType]]
7 changes: 4 additions & 3 deletions src/core/turbulence.Pulsar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import language.experimental.captureChecking

import anticipation.*
import parasite.*
import rudiments.*

class Pulsar[DurationType: GenericDuration](duration: DurationType):
private var continue: Boolean = true
def stop(): Unit = continue = false

def stream(using Monitor): LazyList[Unit] =
if !continue then LazyList() else try
def stream(using Monitor): Stream[Unit] =
if !continue then Stream() else try
snooze(duration)
() #:: stream
catch case err: AsyncError => LazyList()
catch case err: AsyncError => Stream()
Loading

0 comments on commit 5504599

Please sign in to comment.