Skip to content

Commit

Permalink
docs: improvements on channels and move pimping objects to its own pa…
Browse files Browse the repository at this point in the history
…ckage
  • Loading branch information
tassiluca committed Feb 29, 2024
1 parent 1d85ecb commit bc68884
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.github.tassiLuca.analyzer.commons.client.ui

import io.github.tassiLuca.analyzer.commons.client.AppController
import io.github.tassiLuca.pimping.ScalaSwingFacade.createPanel
import io.github.tassiLuca.pimping.ScalaSwingFacade.given

import java.awt.BorderLayout
import javax.swing.*
import javax.swing.table.DefaultTableModel

class MainFrame(controller: AppController) extends JFrame:
import io.github.tassiLuca.utils.ScalaSwingFacade.{*, given}

setTitle("GitHub Organization Analyzer")
setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.github.tassiLuca.analyzer.commons.lib.{Repository, RepositoryReport}
import io.github.tassiLuca.boundaries.either
import io.github.tassiLuca.boundaries.either.?
import io.github.tassiLuca.boundaries.EitherConversions.given
import io.github.tassiLuca.utils.ChannelsPimping.tryable
import io.github.tassiLuca.pimping.ChannelsPimping.tryable

private class BasicAnalyzer(repositoryService: RepositoryService) extends Analyzer:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.tassiLuca.analyzer.lib

import gears.async.{Async, Future, Listener, ReadableChannel, UnboundedChannel}
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
import io.github.tassiLuca.utils.ChannelsPimping.{Terminable, Terminated}
import io.github.tassiLuca.pimping.ChannelsPimping.{Terminable, Terminated}

import scala.annotation.tailrec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import gears.async.{Async, Future}
import io.github.tassiLuca.analyzer.commons.lib.{Repository, RepositoryReport}
import io.github.tassiLuca.boundaries.either
import io.github.tassiLuca.boundaries.either.?
import io.github.tassiLuca.utils.ChannelsPimping.tryable
import io.github.tassiLuca.boundaries.EitherConversions.given
import io.github.tassiLuca.pimping.ChannelsPimping.tryable

private class IncrementalAnalyzer(repositoryService: RepositoryService) extends Analyzer:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.tassiLuca.analyzer.lib

import gears.async.{Async, ReadableChannel}
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
import io.github.tassiLuca.utils.ChannelsPimping.Terminable
import io.github.tassiLuca.pimping.ChannelsPimping.Terminable

/** A service exposing functions to retrieve data from a central hosting repository service. */
trait RepositoryService:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.tassiLuca.utils
package io.github.tassiLuca.pimping

import gears.async.{Async, Channel, ReadableChannel, SendableChannel}

Expand All @@ -8,15 +8,20 @@ import scala.util.{Failure, Success, Try}

object ChannelsPimping:

/** A token to be sent to a channel to signal that it has been terminated. */
case object Terminated

type Terminated = Terminated.type

type Terminable[T] = T | Terminated

extension [T](c: SendableChannel[Terminable[T]]) def terminate()(using Async): Unit = c.send(Terminated)
extension [T](c: SendableChannel[Terminable[T]])
/** Terminates this channel, i.e. send to it a [[Terminated]] token. */
def terminate()(using Async): Unit = c.send(Terminated)

extension [T: ClassTag](c: ReadableChannel[Terminable[T]])

/** Blocking consume channel items, executing the given function [[f]] for each element. */
@tailrec
def foreach[U](f: T => U)(using Async): Unit = c.read() match
case Left(Channel.Closed) => ()
Expand All @@ -25,6 +30,7 @@ object ChannelsPimping:
case Terminated => ()
case v: T => f(v); foreach(f)

/** @return a [[Seq]] containing channel items, after having them read. This is a blocking operation. */
def toSeq(using Async): Seq[T] =
var results = Seq[T]()
c.foreach(t => results = results :+ t)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.tassiLuca.utils
package io.github.tassiLuca.pimping

import java.awt.{Component, FlowLayout, LayoutManager}
import javax.swing.{JComponent, JFrame, JPanel, WindowConstants}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.github.tassiLuca.utils
package io.github.tassiLuca.pimping

import gears.async.default.given
import gears.async.TaskSchedule.Every
import gears.async.{Async, Future, SendableChannel, Task, UnboundedChannel}
import io.github.tassiLuca.utils.ChannelsPimping.{Terminable, Terminated}
import io.github.tassiLuca.pimping.ChannelsPimping.Terminable
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

Expand Down
49 changes: 33 additions & 16 deletions docs/content/docs/03-channels.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Channels as a communication primitive

The fourth, yet not mentioned, abstraction of both Kotlin Coroutines and Scala Gears is the **channel**.
Channels represent the **primitive communication and coordination means** to exchange `Future` (or coroutines in the case of Kotlink) results. They are, at least conceptually, very similar to a queue where it is possible to send (and receive) data -- basically, exploiting the ***producer-consumer*** pattern.
Channels represent the **primitive communication and coordination means** to exchange `Future` (or coroutines in the case of Kotlin) results. They are, at least conceptually, very similar to a queue where it is possible to send (and receive) data -- basically, exploiting the ***producer-consumer*** pattern.

{{< mermaid >}}
classDiagram
Expand Down Expand Up @@ -36,32 +36,44 @@ classDiagram
`ReadableChannel[+T]` <|-- `Channel[T]`
{{< /mermaid >}}

The channel is defined through three distinct interfaces: `SendableChannel[-T]`, `ReadableChannel[+T]` and `Channel[T]`, where the latter extends from both `SendableChannel` and `ReadableChannel`. Typically, a `Channel` is created and a `SendableChannel` and `ReadableChannel` instances are respectively provided to the producer and the consumer, restricting their access to it. The same, almost identical, design is present also in Kotlin Coroutines where `SendChannel` and `ReceiveChannel` takes respectively over the Gears `SendableChannel` and `ReadableChannel`.
The channel is defined through three distinct interfaces: `SendableChannel[-T]`, `ReadableChannel[+T]` and `Channel[T]`, where the latter extends from both `SendableChannel` and `ReadableChannel`. Typically, a `Channel` is created and a `SendableChannel` and `ReadableChannel` instances are respectively provided to the producer and the consumer, restricting their access to it. The same, almost identical, design is present also in Kotlin Coroutines where `SendChannel` and `ReceiveChannel` take respectively over the Gears `SendableChannel` and `ReadableChannel`.

Moreover, `Channel` inherits from `java.io.Closable`, making them closable objects: once closed, they raise `ChannelClosedException` when attempting to write to them and immediately return a `Left(ChannelClosed)` when attempting to read from them, preventing the consumer from finishing reading all the values sent on the channel before its closing.
Moreover, `Channel` inherits from `java.io.Closable`, making them closable objects: once closed, they raise `ChannelClosedException` when attempting to write to them and immediately return a `Left(Closed)` when attempting to read from them, preventing the consumer from finishing reading all the values sent on the channel before its closing.
This is not the case for Kotlin Coroutines where closing a channel indicates that no more values are coming, but doesn't prevent consuming already sent values. Moreover, in Kotlin is possible to use a regular for loop to receive elements from a channel (blocking the coroutine):
- [example code in kotlin]
- The same behavior can be achieved also in gears pimping the framework with the concept of `Terminable` channel. After all, closing a channel in coroutines is a matter of sending a special token to the channel: the iteration stops as soon as this token is received.
- [code of pimping]


```kotlin
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
for (y in channel) println(y) // blocks until channel is closed
println("Done!")
```

A similar behavior can be achieved also in Gears pimping the framework with the concept of `Terminable` channel. After all, closing a channel in coroutines is a matter of sending a special token to it: the iteration stops as soon as this token is received.

`TBD`

Three types of channels exists:

- **Synchronous Channels**: links a `read` request with a `send` within a _rendezvous_
- `send` (`read`) suspend the process until a consumer `read` (`send`) the value;
- in Kotlin, they are called **Rendezvous Channels**.
- in Kotlin they are called **Rendezvous Channels**.
- **Buffered Channels**: a version of a channel with an internal buffer of fixed size
- `send` suspend the producer process if it is full; otherwise, it appends the value to the buffer, returning immediately;
- `read` suspend if the channel is empty, waiting for a new value.
- **Unbounded Channels**: a version of a channel with an unbounded buffer
- if the programs run out of memory you can get an out-of-memory exception!
- in Kotlin, they are called **Unlimited Channel**.

Kotlin offers also a fourh type: the **Comflated Channel**, where every new element sent to it overwirtes the previously sent one, *never blocking*, so that the receiver gets always the latest element.
- in Kotlin they are called **Unlimited Channel**.

Concerning channels behaviour two things are important to note:
Kotlin offers also a fourth type: the **Conflated Channel**, where every new element sent to it overwirtes the previously sent one, *never blocking*, so that the receiver gets always the latest element.

> 1. Multiple producers can send data to the channel, as well as multiple consumers can read them, **but each element is handled only _once_, by _one_ of them**, i.e. consumers **compete** with each other for sent values.
> 2. Once the element is handled, it is immediately removed from the channel.
Concerning channel behavior, it is important to note that:

> 1. Multiple producers can send data to the channel, as well as multiple consumers can read them, **but each element is handled only _once_, by _one_ of them**, i.e. consumers **compete** with each other for sent values;
> 2. Once the element is handled, it is immediately removed from the channel;
> 3. Fairness: `TBD`
## GitHub organization analyzer example

Expand All @@ -71,11 +83,16 @@ To show channels in action an example has been prepared:
**Idea**: we want to realize a little asynchronous library allowing clients to collect the common statistics about repositories (issues, stars, last release) and contributors of a given GitHub organization.
{{< /hint >}}

Final result:
The final result is a GUI application that, given an organization name, starts the analysis of all its repositories,
listing their information along with all their contributors as soon as they are computed. Moreover, the application allows the user to cancel the current computation at any point in time.

![expected result](../../res/img/analyzer-e2e.png)

As usual, the example has been implemented using monadic `Future`s, as well as Scala gears and Kotlin Coroutines.
As usual, the example has been implemented using monadic `Future`s, as well as using Scala Gears and Kotlin Coroutines.

### Monadic version



### Analyzer and App Controller

Expand Down
Loading

0 comments on commit bc68884

Please sign in to comment.