Skip to content

Commit

Permalink
refactor: improve terminable channels
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Feb 29, 2024
1 parent fc00e49 commit b89726f
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.tassiLuca.analyzer.lib

import gears.async.{Async, Future, Listener, ReadableChannel, UnboundedChannel}
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
import io.github.tassiLuca.pimping.ChannelsPimping.{Terminable, Terminated}
import io.github.tassiLuca.pimping.{TerminableChannel, Terminated}

import scala.annotation.tailrec

Expand All @@ -20,7 +20,7 @@ private class GitHubRepositoryService extends RepositoryService:

override def incrementalRepositoriesOf(
organizationName: String,
)(using Async): ReadableChannel[Terminable[Either[String, Repository]]] =
)(using Async): TerminableChannel[Either[String, Repository]] =
incrementalPaginatedRequest(uri"$baseUrl/orgs/$organizationName/repos")

override def contributorsOf(
Expand All @@ -32,7 +32,7 @@ private class GitHubRepositoryService extends RepositoryService:
override def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
)(using Async): ReadableChannel[Terminable[Either[String, Contribution]]] =
)(using Async): TerminableChannel[Either[String, Contribution]] =
incrementalPaginatedRequest(uri"$baseUrl/repos/$organizationName/$repositoryName/contributors")

override def lastReleaseOf(organizationName: String, repositoryName: String)(using Async): Either[String, Release] =
Expand All @@ -54,20 +54,20 @@ private class GitHubRepositoryService extends RepositoryService:

private def incrementalPaginatedRequest[T](
endpoint: Uri,
)(using Reader[T], Async): ReadableChannel[Terminable[Either[String, T]]] =
val channel = UnboundedChannel[Terminable[Either[String, T]]]()
)(using Reader[T], Async): TerminableChannel[Either[String, T]] =
val channel = TerminableChannel.ofUnbounded[Either[String, T]]
@tailrec
def withPagination(next: Option[Uri]): Unit = next match
case None => ()
case None => channel.terminate()
case Some(uri) =>
val response = doRequest(uri)
response.body.map(read[Seq[T]](_)).fold(
errorMessage => channel.send(Left(errorMessage)),
results => results.foreach(r => channel.send(Right(r))),
)
withPagination(nextPage(response))
Future(withPagination(Some(endpoint))).onComplete(Listener((_, _) => channel.send(Terminated)))
channel.asReadable
Future(withPagination(Some(endpoint)))
channel

private def doRequest(endpoint: Uri): Response[Either[String, String]] =
SimpleHttpClient().send(request.get(endpoint))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.github.tassiLuca.boundaries.either
import io.github.tassiLuca.boundaries.either.?
import io.github.tassiLuca.boundaries.EitherConversions.given
import io.github.tassiLuca.pimping.ChannelsPimping.toTry
import io.github.tassiLuca.pimping.TerminableChannelOps.foreach

private class IncrementalAnalyzer(repositoryService: RepositoryService) extends Analyzer:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.tassiLuca.analyzer.lib

import gears.async.{Async, ReadableChannel}
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
import io.github.tassiLuca.pimping.ChannelsPimping.Terminable
import io.github.tassiLuca.pimping.TerminableChannel

/** A service exposing functions to retrieve data from a central hosting repository service. */
trait RepositoryService:
Expand All @@ -17,7 +17,7 @@ trait RepositoryService:
*/
def incrementalRepositoriesOf(
organizationName: String,
)(using Async): ReadableChannel[Terminable[Either[String, Repository]]]
)(using Async): TerminableChannel[Either[String, Repository]]

/** @return [[Right]] with the [[Seq]]uence of [[Contribution]] for the given [[repositoryName]] owned by
* the given [[organizationName]] or a [[Left]] with a explanatory message in case of errors.
Expand All @@ -30,7 +30,7 @@ trait RepositoryService:
def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
)(using Async): ReadableChannel[Terminable[Either[String, Contribution]]]
)(using Async): TerminableChannel[Either[String, Contribution]]

/** @return a [[Right]] with the last [[Release]] of the given [[repositoryName]] owned by [[organizationName]]
* if it exists, or a [[Left]] with a explanatory message in case of errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package io.github.tassiLuca.analyzer.lib
import gears.async.Async
import gears.async.default.given
import io.github.tassiLuca.analyzer.commons.lib.Repository
import io.github.tassiLuca.analyzer.lib.RepositoryService
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import io.github.tassiLuca.pimping.TerminableChannelOps.toSeq

class GitHubServiceTest extends AnyFunSpec with Matchers {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,10 @@
package io.github.tassiLuca.pimping

import gears.async.{Async, Channel, ChannelClosedException, ReadableChannel, SendableChannel}
import io.github.tassiLuca.boundaries.either.?
import io.github.tassiLuca.boundaries.EitherConversions.given
import gears.async.{Channel, ChannelClosedException}

import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.util.boundary.Label
import scala.util.{Failure, Success, Try}

object ChannelsPimping:

/** A token to be sent to a channel to signal that it has been terminated. */
case object Terminated

type Terminated = Terminated.type

type Terminable[T] = T | Terminated

extension [T](c: SendableChannel[Terminable[T]])
/** Terminates this channel, i.e. send to it a [[Terminated]] token. */
def terminate()(using Async): Unit = c.send(Terminated)

extension [T: ClassTag](c: ReadableChannel[Terminable[T]])

/** Blocking consume channel items, executing the given function [[f]] for each element. */
@tailrec
def foreach[U](f: T => U)(using Async): Unit = c.read() match
case Left(Channel.Closed) => ()
case Right(value) =>
value match
case Terminated => ()
case v: T => f(v); foreach(f)

/** @return a [[Seq]] containing channel items, after having them read. This is a blocking operation. */
def toSeq(using Async): Seq[T] =
var results = Seq[T]()
c.foreach(t => results = results :+ t)
results

extension [T](e: Either[Channel.Closed, T])
def toTry(): Try[T] = e match
case Left(Channel.Closed) => Failure(ChannelClosedException())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.github.tassiLuca.pimping

import gears.async.{Async, BufferedChannel, Channel, SyncChannel, UnboundedChannel}

import scala.annotation.tailrec
import scala.reflect.ClassTag

/** A token to be sent to a channel to signal that it has been terminated. */
case object Terminated

type Terminated = Terminated.type

type Terminable[T] = T | Terminated

trait TerminableChannel[T] extends Channel[Terminable[T]]:
def terminate()(using Async): Unit

object TerminableChannel:

def ofSync[T: ClassTag]: TerminableChannel[T] =
TerminableChannelImpl(SyncChannel())

def ofBuffered[T: ClassTag]: TerminableChannel[T] =
TerminableChannelImpl(BufferedChannel())

def ofUnbounded[T: ClassTag]: TerminableChannel[T] =
TerminableChannelImpl(UnboundedChannel())

private class TerminableChannelImpl[T: ClassTag](c: Channel[Terminable[T]]) extends TerminableChannel[T]:
opaque type Res[R] = Either[Channel.Closed, R]

private var _terminated: Boolean = false

override val readSource: Async.Source[Res[Terminable[T]]] =
c.readSource.transformValuesWith {
case v @ Right(Terminated) =>
c.close()
v
case v @ _ => v
}

override def sendSource(x: Terminable[T]): Async.Source[Res[Unit]] = x match
case Terminated =>
if synchronized(_terminated) then throw IllegalStateException("Channel already terminated!")
else synchronized { _terminated = true }
c.sendSource(x)
case t => c.sendSource(t)

override def close(): Unit = c.close()

override def terminate()(using Async): Unit = c.send(Terminated)

object TerminableChannelOps:

extension [T: ClassTag](c: TerminableChannel[T])

/** Blocking consume channel items, executing the given function [[f]] for each element. */
@tailrec
def foreach[U](f: T => U)(using Async): Unit = c.read() match
case Left(Channel.Closed) => ()
case Right(value) =>
value match
case Terminated => ()
case v: T => f(v); foreach(f)

/** @return a [[Seq]] containing channel items, after having them read. This is a blocking operation. */
def toSeq(using Async): Seq[T] =
var results = Seq[T]()
c.foreach(t => results = results :+ t)
results
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,46 @@ package io.github.tassiLuca.pimping

import gears.async.default.given
import gears.async.TaskSchedule.Every
import gears.async.{Async, Future, SendableChannel, Task, UnboundedChannel}
import io.github.tassiLuca.pimping.ChannelsPimping.Terminable
import gears.async.{Async, AsyncOperations, Channel, Listener, SendableChannel, Task}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import io.github.tassiLuca.pimping.TerminableChannelOps.foreach

class ChannelsPimpingTest extends AnyFunSpec with Matchers {

type Item = Int
val itemsProduced = 10

describe("`Terminable` channels") {
it("allows iterating over them") {
var collectedResult = Seq[Item]()
val channel = UnboundedChannel[Terminable[Item]]()
describe("Terminable channels") {
it("if terminated should close the underlying channel") {
Async.blocking:
produceOn(channel.asSendable).run.await
val channel = TerminableChannel.ofUnbounded[Item]
channel.terminate()
channel.foreach { item => collectedResult = collectedResult :+ item }
collectedResult shouldBe Seq.range(0, itemsProduced)
channel.read() shouldBe Right(Terminated)
channel.read() shouldBe Left(Channel.Closed)
}

it("once closed, should be traversable") {
Async.blocking:
var collectedItems = Seq[Item]()
val channel = TerminableChannel.ofUnbounded[Item]
produceOn(channel).run.onComplete(Listener { (_, _) => channel.send(Terminated) })
channel.foreach(res => collectedItems = collectedItems :+ res)
collectedItems shouldBe Seq.range(0, itemsProduced)
}

it("Should again throw") {
Async.blocking:
val channel = TerminableChannel.ofUnbounded[Item]
produceOn(channel).run.onComplete(Listener { (_, _) => channel.send(Terminated) })
channel.foreach(res =>
println(s"test3 : $res")
println(res),
)
}
}

def produceOn(channel: SendableChannel[Terminable[Item]]): Task[Unit] =
def produceOn(channel: TerminableChannel[Item]): Task[Unit] =
var i = 0
Task {
channel.send(i)
Expand Down

0 comments on commit b89726f

Please sign in to comment.