diff --git a/src/core/soundness+turbulence-core.scala b/src/core/soundness+turbulence-core.scala index 4eb6adf..f62d02e 100644 --- a/src/core/soundness+turbulence-core.scala +++ b/src/core/soundness+turbulence-core.scala @@ -21,7 +21,7 @@ export turbulence.{Aggregable, Compression, CompressionAlgorithm, Eof, Err, Spoo SimpleWritable, Stdio, StreamError, Tap, Writable, Zlib, stream, read, writeTo, deduplicate, rate, multiplexWith, regulate, cluster, parallelMap, multiplex, multiplexer, defer, pulsar, gzip, gunzip, discard, compress, decompress, shred, chunked, take, spool, strict, - Conduit, inputStream, LazyListOutputStream} + Conduit, inputStream, StreamOutputStream} package stdioSources: export turbulence.stdioSources.mute diff --git a/src/core/turbulence-core.scala b/src/core/turbulence-core.scala index e4d4336..7ce7aff 100644 --- a/src/core/turbulence-core.scala +++ b/src/core/turbulence-core.scala @@ -32,7 +32,7 @@ import symbolism.* import vacuous.* extension [ValueType](value: ValueType) - def stream[ElementType](using readable: ValueType is Readable by ElementType): LazyList[ElementType] = + def stream[ElementType](using readable: ValueType is Readable by ElementType): Stream[ElementType] = readable.stream(value) inline def read[ResultType]: ResultType = @@ -76,56 +76,56 @@ package stdioSources: Stdio(stdout, stderr, stdin, termcapDefinitions.xterm256) -extension [ElementType](stream: LazyList[ElementType]) - def deduplicate: LazyList[ElementType] = - def recur(last: ElementType, stream: LazyList[ElementType]): LazyList[ElementType] = - stream.flow(LazyList()): +extension [ElementType](stream: Stream[ElementType]) + def deduplicate: Stream[ElementType] = + def recur(last: ElementType, stream: Stream[ElementType]): Stream[ElementType] = + stream.flow(Stream()): if last == head then recur(last, tail) else head #:: recur(head, tail) - stream.flow(LazyList())(head #:: recur(head, tail)) + stream.flow(Stream())(head #:: recur(head, tail)) inline def flow[ResultType](inline termination: => ResultType) - (inline proceed: (head: ElementType, tail: LazyList[ElementType]) ?=> ResultType) + (inline proceed: (head: ElementType, tail: Stream[ElementType]) ?=> ResultType) : ResultType = stream match case head #:: tail => proceed(using head, tail) case _ => termination - def strict: LazyList[ElementType] = stream.length yet stream + def strict: Stream[ElementType] = stream.length yet stream def rate[DurationType: GenericDuration: SpecificDuration](duration: DurationType) (using Monitor, Tactic[AsyncError]) - : LazyList[ElementType] = + : Stream[ElementType] = - def recur(stream: LazyList[ElementType], last: Long): LazyList[ElementType] = - stream.flow(LazyList()): + def recur(stream: Stream[ElementType], last: Long): Stream[ElementType] = + stream.flow(Stream()): val duration2 = SpecificDuration(duration.milliseconds - (System.currentTimeMillis - last)) if duration2.milliseconds > 0 then snooze(duration2) stream async(recur(stream, System.currentTimeMillis)).await() - def multiplexWith(that: LazyList[ElementType])(using Monitor): LazyList[ElementType] = - unsafely(LazyList.multiplex(stream, that)) + def multiplexWith(that: Stream[ElementType])(using Monitor): Stream[ElementType] = + unsafely(Stream.multiplex(stream, that)) - def regulate(tap: Tap)(using Monitor): LazyList[ElementType] = + def regulate(tap: Tap)(using Monitor): Stream[ElementType] = def defer (active: Boolean, - stream: LazyList[Some[ElementType] | Tap.Regulation], + stream: Stream[Some[ElementType] | Tap.Regulation], buffer: List[ElementType]) - : LazyList[ElementType] = + : Stream[ElementType] = recur(active, stream, buffer) @tailrec def recur (active: Boolean, - stream: LazyList[Some[ElementType] | Tap.Regulation], + stream: Stream[Some[ElementType] | Tap.Regulation], buffer: List[ElementType]) - : LazyList[ElementType] = + : Stream[ElementType] = if active && buffer.nonEmpty then buffer.head #:: defer(true, stream, buffer.tail) - else if stream.isEmpty then LazyList() + else if stream.isEmpty then Stream() else stream.head match case Tap.Regulation.Start => recur(true, stream.tail, buffer) @@ -137,22 +137,22 @@ extension [ElementType](stream: LazyList[ElementType]) if active then other.nn #:: defer(true, stream.tail, Nil) else recur(false, stream.tail, other.nn :: buffer) - LazyList.defer(recur(true, stream.map(Some(_)).multiplexWith(tap.stream), Nil)) + Stream.defer(recur(true, stream.map(Some(_)).multiplexWith(tap.stream), Nil)) def cluster[DurationType: GenericDuration](duration: DurationType, maxSize: Optional[Int] = Unset) (using Monitor) - : LazyList[List[ElementType]] = + : Stream[List[ElementType]] = val Limit = maxSize.or(Int.MaxValue) - def recur(stream: LazyList[ElementType], list: List[ElementType], count: Int) - : LazyList[List[ElementType]] = + def recur(stream: Stream[ElementType], list: List[ElementType], count: Int) + : Stream[List[ElementType]] = count match case 0 => safely(async(stream.isEmpty).await()) match case Unset => recur(stream, Nil, 0) case false => recur(stream.tail, stream.head :: list, count + 1) - case true => LazyList() + case true => Stream() case Limit => list.reverse #:: recur(stream, Nil, 0) @@ -160,12 +160,12 @@ extension [ElementType](stream: LazyList[ElementType]) case _ => safely(async(stream.isEmpty).await(duration)) match case Unset => list.reverse #:: recur(stream, Nil, 0) case false => recur(stream.tail, stream.head :: list, count + 1) - case true => LazyList(list.reverse) + case true => Stream(list.reverse) - LazyList.defer(recur(stream, Nil, 0)) + Stream.defer(recur(stream, Nil, 0)) def parallelMap[ElementType2](lambda: ElementType => ElementType2)(using Monitor) - : LazyList[ElementType2] = + : Stream[ElementType2] = val out: Spool[ElementType2] = Spool() @@ -192,30 +192,30 @@ package lineSeparation: case "\n" => linefeed case _: String => adaptiveLinefeed -extension (obj: LazyList.type) - def multiplex[ElemType](streams: LazyList[ElemType]*)(using Monitor) - : LazyList[ElemType] = +extension (obj: Stream.type) + def multiplex[ElemType](streams: Stream[ElemType]*)(using Monitor) + : Stream[ElemType] = multiplexer(streams*).stream - def multiplexer[ElemType](streams: LazyList[ElemType]*)(using Monitor) + def multiplexer[ElemType](streams: Stream[ElemType]*)(using Monitor) : Multiplexer[Any, ElemType] = val multiplexer = Multiplexer[Any, ElemType]() streams.zipWithIndex.map(_.swap).each(multiplexer.add) multiplexer - def defer[ElemType](stream: => LazyList[ElemType]): LazyList[ElemType] = + def defer[ElemType](stream: => Stream[ElemType]): Stream[ElemType] = (null.asInstanceOf[ElemType] #:: stream).tail - def pulsar[DurationType: GenericDuration](duration: DurationType)(using Monitor): LazyList[Unit] = + def pulsar[DurationType: GenericDuration](duration: DurationType)(using Monitor): Stream[Unit] = val startTime: Long = System.currentTimeMillis - def recur(iteration: Int): LazyList[Unit] = + def recur(iteration: Int): Stream[Unit] = try snooze(startTime + duration.milliseconds*iteration) () #:: pulsar(duration) - catch case err: AsyncError => LazyList() + catch case err: AsyncError => Stream() recur(0) @@ -243,28 +243,28 @@ extension (bytes: Bytes) out.toByteArray.nn.immutable(using Unsafe) -extension (stream: LazyList[Bytes]) - def discard(memory: Memory): LazyList[Bytes] = - def recur(stream: LazyList[Bytes], count: Memory): LazyList[Bytes] = stream.flow(LazyList()): +extension (stream: Stream[Bytes]) + def discard(memory: Memory): Stream[Bytes] = + def recur(stream: Stream[Bytes], count: Memory): Stream[Bytes] = stream.flow(Stream()): if head.memory < count then recur(tail, count - head.memory) else head.drop(count.long.toInt) #:: tail recur(stream, memory) - def compress[CompressionType <: CompressionAlgorithm: Compression]: LazyList[Bytes] = + def compress[CompressionType <: CompressionAlgorithm: Compression]: Stream[Bytes] = summon[Compression].compress(stream) - def decompress[CompressionType <: CompressionAlgorithm: Compression]: LazyList[Bytes] = + def decompress[CompressionType <: CompressionAlgorithm: Compression]: Stream[Bytes] = summon[Compression].decompress(stream) - def shred(mean: Double, variance: Double)(using Randomization): LazyList[Bytes] = + def shred(mean: Double, variance: Double)(using Randomization): Stream[Bytes] = stochastic: given Distribution = Gamma.approximate(mean, variance) def newArray(): Array[Byte] = new Array[Byte](arbitrary[Double]().toInt.max(1)) - def recur(stream: LazyList[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int) - : LazyList[Bytes] = + def recur(stream: Stream[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int) + : Stream[Bytes] = stream match case source #:: more => @@ -282,16 +282,16 @@ extension (stream: LazyList[Bytes]) dest.immutable(using Unsafe) #:: recur(more, 0, newArray(), 0) case _ => - if destPos == 0 then LazyList() - else LazyList(dest.slice(0, destPos).immutable(using Unsafe)) + if destPos == 0 then Stream() + else Stream(dest.slice(0, destPos).immutable(using Unsafe)) recur(stream, 0, newArray(), 0) - def chunked(size: Int, zeroPadding: Boolean = false): LazyList[Bytes] = + def chunked(size: Int, zeroPadding: Boolean = false): Stream[Bytes] = def newArray(): Array[Byte] = new Array[Byte](size) - def recur(stream: LazyList[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int) - : LazyList[Bytes] = + def recur(stream: Stream[Bytes], sourcePos: Int, dest: Array[Byte], destPos: Int) + : Stream[Bytes] = stream match case source #:: more => @@ -309,22 +309,22 @@ extension (stream: LazyList[Bytes]) dest.immutable(using Unsafe) #:: recur(more, 0, newArray(), 0) case _ => - if destPos == 0 then LazyList() - else LazyList: + if destPos == 0 then Stream() + else Stream: (if zeroPadding then dest else dest.slice(0, destPos)).immutable(using Unsafe) recur(stream, 0, newArray(), 0) - def take(memory: Memory): LazyList[Bytes] = - def recur(stream: LazyList[Bytes], count: Memory): LazyList[Bytes] = - stream.flow(LazyList()): + def take(memory: Memory): Stream[Bytes] = + def recur(stream: Stream[Bytes], count: Memory): Stream[Bytes] = + stream.flow(Stream()): if head.memory < count then head #:: recur(tail, count - head.memory) - else LazyList(head.take(count.long.toInt)) + else Stream(head.take(count.long.toInt)) recur(stream, memory) def inputStream: ji.InputStream = new ji.InputStream: - private var current: LazyList[Bytes] = stream + private var current: Stream[Bytes] = stream private var offset: Int = 0 private var focus: Bytes = IArray.empty[Byte] diff --git a/src/core/turbulence.Aggregable.scala b/src/core/turbulence.Aggregable.scala index a46fe74..c511321 100644 --- a/src/core/turbulence.Aggregable.scala +++ b/src/core/turbulence.Aggregable.scala @@ -26,7 +26,7 @@ import vacuous.* object Aggregable: given bytesBytes: Bytes is Aggregable by Bytes = source => - def recur(buf: ji.ByteArrayOutputStream, source: LazyList[Bytes]): Bytes = + def recur(buf: ji.ByteArrayOutputStream, source: Stream[Bytes]): Bytes = source.flow(buf.toByteArray().nn.immutable(using Unsafe)): buf.write(head.mutable(using Unsafe)); recur(buf, tail) @@ -42,14 +42,14 @@ object Aggregable: given stream: [ElementType, ElementType2] => (aggregable: ElementType2 is Aggregable by ElementType) - => LazyList[ElementType2] is Aggregable by ElementType = - element => LazyList(aggregable.aggregate(element)) + => Stream[ElementType2] is Aggregable by ElementType = + element => Stream(aggregable.aggregate(element)) trait Aggregable: aggregable => type Self type Operand - def aggregate(source: LazyList[Operand]): Self + def aggregate(source: Stream[Operand]): Self def map[SelfType2](lambda: Self => SelfType2): SelfType2 is Aggregable by Operand = source => lambda(aggregable.aggregate(source)) diff --git a/src/core/turbulence.Compression.scala b/src/core/turbulence.Compression.scala index 06faf9c..f0a0a83 100644 --- a/src/core/turbulence.Compression.scala +++ b/src/core/turbulence.Compression.scala @@ -28,16 +28,16 @@ import vacuous.* trait Compression: type Self <: CompressionAlgorithm - def compress(stream: LazyList[Bytes]): LazyList[Bytes] - def decompress(stream: LazyList[Bytes]): LazyList[Bytes] + def compress(stream: Stream[Bytes]): Stream[Bytes] + def decompress(stream: Stream[Bytes]): Stream[Bytes] object Compression: given Gzip is Compression: - def compress(stream: LazyList[Bytes]): LazyList[Bytes] = + def compress(stream: Stream[Bytes]): Stream[Bytes] = val out = ji.ByteArrayOutputStream() val out2 = juz.GZIPOutputStream(out) - def recur(stream: LazyList[Bytes]): LazyList[Bytes] = stream match + def recur(stream: Stream[Bytes]): Stream[Bytes] = stream match case head #:: tail => out2.write(head.mutable(using Unsafe)) if out.size == 0 then recur(tail) else @@ -47,9 +47,9 @@ object Compression: case _ => out2.close() - if out.size == 0 then LazyList() else LazyList(out.toByteArray().nn.immutable(using Unsafe)) + if out.size == 0 then Stream() else Stream(out.toByteArray().nn.immutable(using Unsafe)) recur(stream) - def decompress(stream: LazyList[Bytes]): LazyList[Bytes] = + def decompress(stream: Stream[Bytes]): Stream[Bytes] = unsafely(juz.GZIPInputStream(stream.inputStream).stream[Bytes]) diff --git a/src/core/turbulence.Conduit.scala b/src/core/turbulence.Conduit.scala index 6ea0158..df71595 100644 --- a/src/core/turbulence.Conduit.scala +++ b/src/core/turbulence.Conduit.scala @@ -28,14 +28,14 @@ object Conduit: enum State: case Data, Clutch, End -class Conduit(input: LazyList[Bytes]): +class Conduit(input: Stream[Bytes]): private var current: Bytes = if input.isEmpty then Bytes() else input.head - private var stream: LazyList[Bytes] = if input.isEmpty then LazyList() else input.tail + private var stream: Stream[Bytes] = if input.isEmpty then Stream() else input.tail private var index: Ordinal = Prim private var done: Int = 0 private var clutch: Boolean = false - private var stream0: LazyList[Bytes] = stream + private var stream0: Stream[Bytes] = stream private var current0: Bytes = current private var index0: Ordinal = index private var done0: Int = done diff --git a/src/core/turbulence.LazyListOutputStream.scala b/src/core/turbulence.LazyListOutputStream.scala index 00870fa..6bb478a 100644 --- a/src/core/turbulence.LazyListOutputStream.scala +++ b/src/core/turbulence.LazyListOutputStream.scala @@ -26,11 +26,11 @@ import anticipation.* import rudiments.* import vacuous.* -class LazyListOutputStream() extends ji.OutputStream: +class StreamOutputStream() extends ji.OutputStream: private val buffer: scm.ArrayBuffer[Byte] = scm.ArrayBuffer() private val chunks: Spool[Bytes] = Spool() - def stream: LazyList[Bytes] = chunks.stream + def stream: Stream[Bytes] = chunks.stream def write(int: Int): Unit = buffer.append(int.toByte) override def close(): Unit = flush().also(chunks.stop()) diff --git a/src/core/turbulence.Multiplexer.scala b/src/core/turbulence.Multiplexer.scala index 5544e9f..7386c5e 100644 --- a/src/core/turbulence.Multiplexer.scala +++ b/src/core/turbulence.Multiplexer.scala @@ -36,18 +36,18 @@ case class Multiplexer[KeyType, ElementType]()(using Monitor): def close(): Unit = tasks.keys.each(remove(_)) @tailrec - private def pump(key: KeyType, stream: LazyList[ElementType])(using Worker): Unit = + private def pump(key: KeyType, stream: Stream[ElementType])(using Worker): Unit = if stream.isEmpty then remove(key) else relent() queue.put(stream.head) pump(key, stream.tail) - def add(key: KeyType, stream: LazyList[ElementType]): Unit = tasks(key) = async(pump(key, stream)) + def add(key: KeyType, stream: Stream[ElementType]): Unit = tasks(key) = async(pump(key, stream)) private def remove(key: KeyType): Unit = synchronized: tasks -= key if tasks.isEmpty then queue.put(Multiplexer.Termination) - def stream: LazyList[ElementType] = - LazyList.continually(queue.take().nn).takeWhile(_ != Multiplexer.Termination) - . asInstanceOf[LazyList[ElementType]] + def stream: Stream[ElementType] = + Stream.continually(queue.take().nn).takeWhile(_ != Multiplexer.Termination) + . asInstanceOf[Stream[ElementType]] diff --git a/src/core/turbulence.Pulsar.scala b/src/core/turbulence.Pulsar.scala index 16239bd..e830b5f 100644 --- a/src/core/turbulence.Pulsar.scala +++ b/src/core/turbulence.Pulsar.scala @@ -20,13 +20,14 @@ import language.experimental.captureChecking import anticipation.* import parasite.* +import rudiments.* class Pulsar[DurationType: GenericDuration](duration: DurationType): private var continue: Boolean = true def stop(): Unit = continue = false - def stream(using Monitor): LazyList[Unit] = - if !continue then LazyList() else try + def stream(using Monitor): Stream[Unit] = + if !continue then Stream() else try snooze(duration) () #:: stream - catch case err: AsyncError => LazyList() + catch case err: AsyncError => Stream() diff --git a/src/core/turbulence.Readable.scala b/src/core/turbulence.Readable.scala index 1534c3a..2870b05 100644 --- a/src/core/turbulence.Readable.scala +++ b/src/core/turbulence.Readable.scala @@ -28,8 +28,8 @@ import symbolism.* import vacuous.* object Readable: - given bytes: Bytes is Readable by Bytes = LazyList(_) - given text: [TextType <: Text] => TextType is Readable by Text = LazyList(_) + given bytes: Bytes is Readable by Bytes = Stream(_) + given text: [TextType <: Text] => TextType is Readable by Text = Stream(_) given encodingAdapter: [SourceType: Readable by Text] => (encoder: CharEncoder) => SourceType is Readable by Bytes = @@ -40,48 +40,48 @@ object Readable: source => decoder.decode(SourceType.stream(source)) - given stream: [ElementType] => LazyList[ElementType] is Readable by ElementType = identity(_) + given stream: [ElementType] => Stream[ElementType] is Readable by ElementType = identity(_) given inCharReader: (stdio: Stdio) => In.type is Readable by Char = in => - def recur(count: Memory): LazyList[Char] = + def recur(count: Memory): Stream[Char] = stdio.reader.read() match - case -1 => LazyList() + case -1 => Stream() case int => int.toChar #:: recur(count + 1.b) - LazyList.defer(recur(0L.b)) + Stream.defer(recur(0L.b)) given inByteReader: (stdio: Stdio) => In.type is Readable by Byte = in => - def recur(count: Memory): LazyList[Byte] = + def recur(count: Memory): Stream[Byte] = stdio.in.read() match - case -1 => LazyList() + case -1 => Stream() case int => int.toByte #:: recur(count + 1.b) - LazyList.defer(recur(0L.b)) + Stream.defer(recur(0L.b)) given reader: [InType <: ji.Reader] => Tactic[StreamError] => InType is Readable by Char = reader => - def recur(count: Memory): LazyList[Char] = + def recur(count: Memory): Stream[Char] = try reader.read() match - case -1 => LazyList() + case -1 => Stream() case int => int.toChar #:: recur(count + 1.b) catch case err: ji.IOException => reader.close() - raise(StreamError(count), LazyList()) + raise(StreamError(count), Stream()) - LazyList.defer(recur(0L.b)) + Stream.defer(recur(0L.b)) given bufferedReader: [InType <: ji.BufferedReader] => Tactic[StreamError] => InType is Readable by Line = reader => - def recur(count: Memory): LazyList[Line] = + def recur(count: Memory): Stream[Line] = try reader.readLine() match - case null => LazyList() + case null => Stream() case line: String => Line(Text(line)) #:: recur(count + line.length.b + 1.b) catch case err: ji.IOException => reader.close() - raise(StreamError(count), LazyList()) + raise(StreamError(count), Stream()) - LazyList.defer(recur(0L.b)) + Stream.defer(recur(0L.b)) given inputStream: [InType <: ji.InputStream] => Tactic[StreamError] @@ -91,9 +91,9 @@ object Readable: given channel: Tactic[StreamError] => jn.channels.ReadableByteChannel is Readable by Bytes = channel => val buf: jn.ByteBuffer = jn.ByteBuffer.wrap(new Array[Byte](1024)).nn - def recur(total: Long): LazyList[Bytes] = + def recur(total: Long): Stream[Bytes] = try channel.read(buf) match - case -1 => LazyList().also(try channel.close() catch case err: Exception => ()) + case -1 => Stream().also(try channel.close() catch case err: Exception => ()) case 0 => recur(total) case count => @@ -105,14 +105,14 @@ object Readable: array.immutable(using Unsafe) #:: recur(total + count) - catch case e: Exception => LazyList(raise(StreamError(total.b), Bytes())) + catch case e: Exception => Stream(raise(StreamError(total.b), Bytes())) - LazyList.defer(recur(0)) + Stream.defer(recur(0)) trait Readable: type Self type Operand - def stream(value: Self): LazyList[Operand] + def stream(value: Self): Stream[Operand] def contramap[SelfType2](lambda: SelfType2 => Self): SelfType2 is Readable by Operand = source => stream(lambda(source)) diff --git a/src/core/turbulence.SimpleWritable.scala b/src/core/turbulence.SimpleWritable.scala index f6470d3..05e178b 100644 --- a/src/core/turbulence.SimpleWritable.scala +++ b/src/core/turbulence.SimpleWritable.scala @@ -22,7 +22,7 @@ trait SimpleWritable[TargetType, ElementType] extends Writable: type Operand = ElementType type Self = TargetType - def write(target: Self, stream: LazyList[ElementType]): Unit = + def write(target: Self, stream: Stream[ElementType]): Unit = stream.flow(())(writeElement(target, head) yet write(target, tail)) def writeElement(target: Self, element: ElementType): Unit diff --git a/src/core/turbulence.Spool.scala b/src/core/turbulence.Spool.scala index 4d53b31..0832d3a 100644 --- a/src/core/turbulence.Spool.scala +++ b/src/core/turbulence.Spool.scala @@ -18,6 +18,8 @@ package turbulence import java.util.concurrent as juc +import rudiments.* + object Spool: private object Termination @@ -28,6 +30,6 @@ class Spool[ItemType](): def put(item: ItemType): Unit = queue.put(item) def stop(): Unit = queue.put(Spool.Termination) - def stream: LazyList[ItemType] = - LazyList.continually(queue.take().nn).takeWhile(_ != Spool.Termination) - . asInstanceOf[LazyList[ItemType]] + def stream: Stream[ItemType] = + Stream.continually(queue.take().nn).takeWhile(_ != Spool.Termination) + . asInstanceOf[Stream[ItemType]] diff --git a/src/core/turbulence.Tap.scala b/src/core/turbulence.Tap.scala index b533daf..9cfd45b 100644 --- a/src/core/turbulence.Tap.scala +++ b/src/core/turbulence.Tap.scala @@ -19,9 +19,10 @@ package turbulence import language.experimental.captureChecking import java.util.concurrent.atomic as juca - import java.util.concurrent as juc +import rudiments.* + object Tap: enum Regulation: case Start, Stop @@ -34,4 +35,4 @@ class Tap(initial: Boolean = true): def pause(): Unit = if flowing.getAndSet(false) then spool.put(Tap.Regulation.Stop) def stop(): Unit = spool.stop() def state(): Boolean = flowing.get - def stream: LazyList[Tap.Regulation] = spool.stream + def stream: Stream[Tap.Regulation] = spool.stream diff --git a/src/core/turbulence.Writable.scala b/src/core/turbulence.Writable.scala index 81d83a1..d250bde 100644 --- a/src/core/turbulence.Writable.scala +++ b/src/core/turbulence.Writable.scala @@ -57,7 +57,7 @@ object Writable: given channel: Tactic[StreamError] => jn.channels.WritableByteChannel is Writable by Bytes = (channel, stream) => @tailrec - def recur(total: Memory, todo: LazyList[jn.ByteBuffer]): Unit = + def recur(total: Memory, todo: Stream[jn.ByteBuffer]): Unit = todo.flow(()): val count = try channel.write(head) catch case e: Exception => -1 @@ -69,7 +69,7 @@ object Writable: trait Writable: type Self type Operand - def write(target: Self, stream: LazyList[Operand]): Unit + def write(target: Self, stream: Stream[Operand]): Unit def contramap[SelfType2](lambda: SelfType2 => Self): SelfType2 is Writable by Operand = (target, stream) => write(lambda(target), stream) diff --git a/src/test/turbulence.Tests.scala b/src/test/turbulence.Tests.scala index 336a273..286d209 100644 --- a/src/test/turbulence.Tests.scala +++ b/src/test/turbulence.Tests.scala @@ -47,13 +47,13 @@ object Tests extends Suite(t"Turbulence tests"): bs <- 1 to 8 do test(t"length tests"): - val stream = string.bytes.grouped(bs).to(LazyList) + val stream = string.bytes.grouped(bs).to(Stream) val result = stream.read[Text] result.bytes.length .assert(_ == string.bytes.length) test(t"roundtrip tests"): - val stream = string.bytes.grouped(bs).to(LazyList) + val stream = string.bytes.grouped(bs).to(Stream) val result = stream.read[Text] result @@ -63,18 +63,18 @@ object Tests extends Suite(t"Turbulence tests"): val qbfBytes = qbf.bytes object Ref: - given Readable[Ref, Text] = ref => LazyList(t"abc", t"def") - given Readable[Ref, Bytes] = ref => LazyList(t"abc".bytes, t"def".bytes) + given Readable[Ref, Text] = ref => Stream(t"abc", t"def") + given Readable[Ref, Bytes] = ref => Stream(t"abc".bytes, t"def".bytes) case class Ref() object Ref2: - given Readable[Ref2, Text] = ref => LazyList(t"abc", t"def") + given Readable[Ref2, Text] = ref => Stream(t"abc", t"def") case class Ref2() object Ref3: - given Readable[Ref3, Bytes] = ref => LazyList(t"abc".bytes, t"def".bytes) + given Readable[Ref3, Bytes] = ref => Stream(t"abc".bytes, t"def".bytes) case class Ref3() @@ -115,41 +115,41 @@ object Tests extends Suite(t"Turbulence tests"): Ref3().read[Bytes].to(List) .assert(_ == t"abcdef".bytes.to(List)) - test(t"Read Text as LazyList[Text]"): - qbf.read[LazyList[Text]].join + test(t"Read Text as Stream[Text]"): + qbf.read[Stream[Text]].join .assert(_ == qbf) test(t"Read Text as Bytes"): qbf.read[Bytes] .assert(_.to(List) == qbfBytes.to(List)) - test(t"Read Text as LazyList[Bytes]"): - qbf.read[LazyList[Bytes]] + test(t"Read Text as Stream[Bytes]"): + qbf.read[Stream[Bytes]] .assert(_.reduce(_ ++ _).to(List) == qbfBytes.to(List)) test(t"Read Bytes as Text"): qbfBytes.read[Text] .assert(_ == qbf) - test(t"Read Bytes as LazyList[Text]"): - qbfBytes.read[LazyList[Text]].join + test(t"Read Bytes as Stream[Text]"): + qbfBytes.read[Stream[Text]].join .assert(_ == qbf) test(t"Read Bytes as Bytes"): qbfBytes.read[Bytes] .assert(_.to(List) == qbfBytes.to(List)) - test(t"Read Bytes as LazyList[Bytes]"): - qbfBytes.read[LazyList[Bytes]] + test(t"Read Bytes as Stream[Bytes]"): + qbfBytes.read[Stream[Bytes]] .assert(_.reduce(_ ++ _).to(List) == qbfBytes.to(List)) // test(t"Read Text as Lines"): - // qbf.read[LazyList[Line]] - // .assert(_ == LazyList(Line(t"The quick brown fox"), Line(t"jumps over the lazy dog"))) + // qbf.read[Stream[Line]] + // .assert(_ == Stream(Line(t"The quick brown fox"), Line(t"jumps over the lazy dog"))) // test(t"Read Bytes as Lines"): - // qbfBytes.read[LazyList[Line]] - // .assert(_ == LazyList(Line(t"The quick brown fox"), Line(t"jumps over the lazy dog"))) + // qbfBytes.read[Stream[Line]] + // .assert(_ == Stream(Line(t"The quick brown fox"), Line(t"jumps over the lazy dog"))) suite(t"Writing tests"): @@ -195,15 +195,15 @@ object Tests extends Suite(t"Turbulence tests"): store() .assert(_ == qbf) - test(t"Write LazyList[Text] to some reference with Text and Bytes instances"): + test(t"Write Stream[Text] to some reference with Text and Bytes instances"): val store = GeneralStore() - LazyList(qbf).writeTo(store) + Stream(qbf).writeTo(store) store() .assert(_ == qbf) - test(t"Write LazyList[Bytes] to some reference with Text and Bytes instances"): + test(t"Write Stream[Bytes] to some reference with Text and Bytes instances"): val store = GeneralStore() - LazyList(qbfBytes).writeTo(store) + Stream(qbfBytes).writeTo(store) store() .assert(_ == qbf) @@ -219,15 +219,15 @@ object Tests extends Suite(t"Turbulence tests"): store() .assert(_ == qbf) - test(t"Write LazyList[Text] to some reference with only a Bytes instance"): + test(t"Write Stream[Text] to some reference with only a Bytes instance"): val store = ByteStore() - LazyList(qbf).writeTo(store) + Stream(qbf).writeTo(store) store() .assert(_ == qbf) - test(t"Write LazyList[Bytes] to some reference with only a Bytes instance"): + test(t"Write Stream[Bytes] to some reference with only a Bytes instance"): val store = ByteStore() - LazyList(qbfBytes).writeTo(store) + Stream(qbfBytes).writeTo(store) store() .assert(_ == qbf) @@ -243,15 +243,15 @@ object Tests extends Suite(t"Turbulence tests"): store() .assert(_ == qbf) - test(t"Write LazyList[Text] to some reference with only a Text instance"): + test(t"Write Stream[Text] to some reference with only a Text instance"): val store = TextStore() - LazyList(qbf).writeTo(store) + Stream(qbf).writeTo(store) store() .assert(_ == qbf) - test(t"Write LazyList[Bytes] to some reference with only a Text instance"): + test(t"Write Stream[Bytes] to some reference with only a Text instance"): val store = TextStore() - LazyList(qbfBytes).writeTo(store) + Stream(qbfBytes).writeTo(store) store() .assert(_ == qbf) @@ -299,15 +299,15 @@ object Tests extends Suite(t"Turbulence tests"): // store() // .assert(_ == qbf) - // test(t"Append LazyList[Text] to some reference with Text and Bytes instances"): + // test(t"Append Stream[Text] to some reference with Text and Bytes instances"): // val store = GeneralStore() - // LazyList(qbf).appendTo(store) + // Stream(qbf).appendTo(store) // store() // .assert(_ == qbf) - // test(t"Append LazyList[Bytes] to some reference with Text and Bytes instances"): + // test(t"Append Stream[Bytes] to some reference with Text and Bytes instances"): // val store = GeneralStore() - // LazyList(qbfBytes).appendTo(store) + // Stream(qbfBytes).appendTo(store) // store() // .assert(_ == qbf) @@ -323,15 +323,15 @@ object Tests extends Suite(t"Turbulence tests"): // store() // .assert(_ == qbf) - // test(t"Append LazyList[Text] to some reference with only a Bytes instance"): + // test(t"Append Stream[Text] to some reference with only a Bytes instance"): // val store = ByteStore() - // LazyList(qbf).appendTo(store) + // Stream(qbf).appendTo(store) // store() // .assert(_ == qbf) - // test(t"Append LazyList[Bytes] to some reference with only a Bytes instance"): + // test(t"Append Stream[Bytes] to some reference with only a Bytes instance"): // val store = ByteStore() - // LazyList(qbfBytes).appendTo(store) + // Stream(qbfBytes).appendTo(store) // store() // .assert(_ == qbf) @@ -347,21 +347,21 @@ object Tests extends Suite(t"Turbulence tests"): // store() // .assert(_ == qbf) - // test(t"Append LazyList[Text] to some reference with only a Text instance"): + // test(t"Append Stream[Text] to some reference with only a Text instance"): // val store = TextStore() - // LazyList(qbf).appendTo(store) + // Stream(qbf).appendTo(store) // store() // .assert(_ == qbf) - // test(t"Append LazyList[Bytes] to some reference with only a Text instance"): + // test(t"Append Stream[Bytes] to some reference with only a Text instance"): // val store = TextStore() - // LazyList(qbfBytes).appendTo(store) + // Stream(qbfBytes).appendTo(store) // store() // .assert(_ == qbf) suite(t"Multiplexer tests"): - val l1 = LazyList(2, 4, 6, 8, 10) - val l2 = LazyList(1, 3, 5, 7, 9) + val l1 = Stream(2, 4, 6, 8, 10) + val l2 = Stream(1, 3, 5, 7, 9) test(t"Check that two multiplexed streams contain all elements"): supervise(l1.multiplexWith(l2).to(Set))