Skip to content

Commit

Permalink
feat(rears): add map
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Feb 28, 2024
1 parent 0f2cc20 commit c850a00
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,26 @@ extension [T](r: ReadableChannel[T])(using Async)
* --------2--------------4------6-------8--------10->
* </pre>
*/
def filter(p: T => Boolean): ReadableChannel[T] = fromNew[T] { c =>
def filter(p: T => Boolean): ReadableChannel[T] = fromNew[T] { emitter =>
val value = r.read().toOption.get
if p(value) then c.send(value)
if p(value) then emitter.send(value)
}

/** @return a new [[ReadableChannel]] whose values are transformed accordingly to the given function [[f]].
*
* Example:
* <pre>
* ----1---2-------3----4---5------6--------7-------->
* | | | | | | |
* ----V---V-------V----V---V------V--------V---------
* map(x => x * x)
* ----|---|-------|----|---|------|--------|---------
* V V V V V V V
* ----1---4-------9----16--25-----36-------49------->
* </pre>
*/
def map[R](f: T => R): ReadableChannel[R] = fromNew[R] { emitter =>
emitter.send(f(r.read().toOption.get))
}

/** @return a new [[ReadableChannel]] whose elements are emitted only after
Expand Down Expand Up @@ -101,7 +118,7 @@ extension [T](r: ReadableChannel[T])(using Async)
* V V V V V V V V
* |---------|-----------|------------T-----
* buffer(n = 3, timespan = 5 seconds)
* |---------|-----------|------------|---->
* |---------|-----------|------------|-----
* V V V
* ------[1, 2, 3]---[4, 5, 6]------[7, 8]->
* </pre>
Expand Down Expand Up @@ -132,7 +149,7 @@ extension [T](r: ReadableChannel[T])(using Async)
* | | | | | | | |
* V V V V V V V V
* ----|--------T--|--------T-------|--------T---
* buffer(timespan = 5 seconds)
* buffer(timespan = 5 seconds)
* -------------|-----------|----------------|---
* V V V
* -------[1, 2, 3, 4]--[5, 6, 7]-----------[8]->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ class PipelineTransformationsTest extends AnyFunSpec with Matchers {
}
}

describe("Mapping a channel") {
it("return a new channel whose values are transformed accordingly to the given function") {
Async.blocking:
val f: Int => Int = x => x * x
val mapped = producer.map(f)
for i <- 1 to 10 do mapped.read() shouldBe Right(f(i))
}
}

describe("Debouncing a channel") {
it("return a new channel whose first item is emitted immediately") {
val span = 1.seconds
Expand Down

0 comments on commit c850a00

Please sign in to comment.