From fc00e499f04523e4b9168eaec03792225edfe010 Mon Sep 17 00:00:00 2001 From: Luca Tassinari Date: Thu, 29 Feb 2024 17:32:36 +0100 Subject: [PATCH] docs: add more on analyzer --- docs/content/docs/01-boundaries.md | 4 + docs/content/docs/02-basics.md | 4 + docs/content/docs/03-channels.md | 175 ++++++++++++++++++++++------ docs/content/docs/04-rears.md | 4 + docs/content/docs/05-conclusions.md | 4 + 5 files changed, 156 insertions(+), 35 deletions(-) diff --git a/docs/content/docs/01-boundaries.md b/docs/content/docs/01-boundaries.md index 59dd2f16..dd932c56 100644 --- a/docs/content/docs/01-boundaries.md +++ b/docs/content/docs/01-boundaries.md @@ -1,3 +1,7 @@ +--- +bookToc: false +--- + # `boundary` & `break` `boundary` & `break` mechanism provides a cleaner alternative to non-local returns: diff --git a/docs/content/docs/02-basics.md b/docs/content/docs/02-basics.md index 46df6596..1fe66770 100644 --- a/docs/content/docs/02-basics.md +++ b/docs/content/docs/02-basics.md @@ -1,3 +1,7 @@ +--- +bookToc: false +--- + # Basic asynchronous constructs ## The need for a new `Future` construct diff --git a/docs/content/docs/03-channels.md b/docs/content/docs/03-channels.md index 0078e943..8e715abf 100644 --- a/docs/content/docs/03-channels.md +++ b/docs/content/docs/03-channels.md @@ -1,3 +1,7 @@ +--- +bookToc: false +--- + # Channels as a communication primitive The fourth, yet not mentioned, abstraction of both Kotlin Coroutines and Scala Gears is the **channel**. @@ -75,7 +79,7 @@ Concerning channel behavior, it is important to note that: > 2. Once the element is handled, it is immediately removed from the channel; > 3. Fairness: `TBD` -## GitHub organization analyzer example +## Analyzer example To show channels in action an example has been prepared: @@ -88,15 +92,84 @@ listing their information along with all their contributors as soon as they are ![expected result](../../res/img/analyzer-e2e.png) -As usual, the example has been implemented using monadic `Future`s, as well as using Scala Gears and Kotlin Coroutines. +The example is structured in two different packages: `lib` and `client`. The former contains the logic of the library, while the latter contains the application (client code). +As usual, it has been implemented using monadic `Future`s, as well as using Scala Gears and Kotlin Coroutines. -### Monadic version +### Future monadic version +The entry point of the library is the `Analyzer` interface which takes in input the organization name and a function through which is possible to react to results while they are computed. +Since we want to achieve cancellation, the monadic version leverages Monix `Task`s which is returned by the `analyze` method wrapped in an `EitherT` monad transformer to allow handling errors functionally. -### Analyzer and App Controller +```scala +trait Analyzer: + def analyze(organizationName: String)( + updateResult: RepositoryReport => Unit, + ): EitherT[Task, String, Seq[RepositoryReport]] +``` -The direct version in Scala gears exposes the following interface, taking in input an organization name and a function through which is possible to react to results while they are computed. +To retrieve data from GitHub, a `RepositoryService` interface has been created, following the same pattern: + +```scala +trait RepositoryService: + def repositoriesOf(organizationName: String): EitherT[Task, String, Seq[Repository]] + def contributorsOf(organizationName: String, repositoryName: String): EitherT[Task, String, Seq[Contribution]] + def lastReleaseOf(organizationName: String, repositoryName: String): EitherT[Task, String, Release] +``` + +The implementation of the `Analyzer` is shown in the following code snippet and performs the following steps: + +1. first, the list of repositories is retrieved; +2. if no error occurred, the analysis of each repository is performed concurrently, thanks to the `Traverse` functor offered by Cats; +3. the analysis of each repository consists of retrieving the contributors and the last release of the repository and then updating the result through the `updateResult` function. Since both the contributors and last release retrieval are independent of each other, they are performed concurrently, thanks to `Task.parZip2`. + +```scala +override def analyze(organizationName: String)( + updateResult: RepositoryReport => Unit, +): EitherT[Task, String, Seq[RepositoryReport]] = + for + repositories <- gitHubService.repositoriesOf(organizationName) // 1 + reports <- repositories.traverse(r => EitherT.right(r.performAnalysis(updateResult))) // 2 + yield reports + +extension (r: Repository) + private def performAnalysis(updateResult: RepositoryReport => Unit): Task[RepositoryReport] = + val contributorsTask = gitHubService.contributorsOf(r.organization, r.name).value + val releaseTask = gitHubService.lastReleaseOf(r.organization, r.name).value + for + result <- Task.parZip2(contributorsTask, releaseTask) + report = RepositoryReport(r.name, r.issues, r.stars, result._1.getOrElse(Seq.empty), result._2.toOption) + _ <- Task(updateResult(report)) + yield report +``` + +Client-side, when a new session is requested, the `Analyzer` is used to start the computation, during which the single reports are aggregated and the UI is updated. +Whenever desired, the current computation can be stopped by canceling the Monix `CancelableFuture` returned by the `runToFuture` method, through which the returned Task from the `Analyzer` is started. + +```scala +class MonadicAppController extends AppController: + + import monix.execution.Scheduler.Implicits.global + private val view = AnalyzerView.gui(this) + private val analyzer = Analyzer.ofGitHub() + private var currentComputation: Option[CancelableFuture[Unit]] = None + + view.run() + + override def runSession(organizationName: String): Unit = + var organizationReport: OrganizationReport = (Map(), Set()) + val f = analyzer.analyze(organizationName) { report => + organizationReport = organizationReport.mergeWith(report) + view.update(organizationReport) + }.value.runToFuture.map { case Left(value) => view.error(value); case _ => view.endComputation() } + currentComputation = Some(f) + + override def stopSession(): Unit = currentComputation foreach (_.cancel()) +``` + +### Scala Gears version + +The interfaces of the Direct Style with Gears differ from the monadic one by their return type, which is a simpler `Either` data type, and by the fact they are **suspendable** functions, hence they require an Async context to be executed. ```scala trait Analyzer: @@ -106,39 +179,73 @@ trait Analyzer: ``` ```scala -object Analyzer: - def ofGitHub: Analyzer = GitHubAnalyzer() - - private class GitHubAnalyzer extends Analyzer: - private val gitHubService = GitHubService() - - override def analyze(organizationName: String)( - updateResults: RepositoryReport => Unit, - )(using Async): Either[String, Seq[RepositoryReport]] = either: - val reposInfo = gitHubService - .repositoriesOf(organizationName).? // 1 - .map(_.performAnalysis) // 2 - val collector = Collector[RepositoryReport](reposInfo.toList*) // 3 - for _ <- reposInfo.indices do - updateResults(collector.results.read().tryable.?.awaitResult.?) // 4 - reposInfo.map(_.await) - - extension (r: Repository) - private def performAnalysis(using Async): Future[RepositoryReport] = ??? +trait RepositoryService: + def repositoriesOf(organizationName: String)(using Async): Either[String, Seq[Repository]] + def contributorsOf( + organizationName: String, + repositoryName: String + )(using Async): Either[String, Seq[Contribution]] + def lastReleaseOf(organizationName: String, repositoryName: String)(using Async): Either[String, Release] +``` + +The implementation of the `Analyzer` leverages Channels to perform the concurrent analysis of the repositories: + +```scala +override def analyze(organizationName: String)( + updateResults: RepositoryReport => Unit, + )(using Async): Either[String, Seq[RepositoryReport]] = either: + val reposInfo = repositoryService.repositoriesOf(organizationName).? // 1 + .map(_.performAnalysis) // 2 + val collector = Collector[RepositoryReport](reposInfo.toList*) // 3 + for _ <- reposInfo.indices do + updateResults(collector.results.read().toTry().?.awaitResult.?) // 4 + reposInfo.map(_.await) + + extension (r: Repository) + private def performAnalysis(using Async): Future[RepositoryReport] = Future: + val contributions = Future { repositoryService.contributorsOf(r.organization, r.name) } // concurrent + val release = repositoryService.lastReleaseOf(r.organization, r.name) + RepositoryReport(r.name, r.issues, r.stars, contributions.await.getOrElse(Seq()), release.toOption) ``` 1. first, we get all the repositories of the requested organization 2. for each of them, the contributors and the last release are retrieved concurrently, starting a `Future` -3. `Future` results are gathered inside a `Collector` that collects a list of futures into a channel of futures, arriving as they finish. +3. `Future` results are gathered inside a **`Collector`** allowing to collect a list of futures into a channel of futures, arriving as they finish. + * the retrieval of the contributors and the last release are performed in parallel 4. read results from the channel as they come, calling the `updateResult` reaction function. -The application controller just takes care of running the application view and, whenever a new analysis starts, creates a new future, during which the single reports are aggregated and the UI is updated. -Thanks to the fact `Future`s are cancellable objects, whenever desired is possible to cancel the current computation. +Although it works, the proposed solution suffers from a performance issue when the organization we want to analyze has a large number of repositories. +Indeed, the GitHub API, like many ReSTful APIs, implements _pagination_: if the response includes many results, they are paginated, returning a subset of them; it is the responsibility of the client to request more data (pages). +Until now, the `RepositoryService` has been implemented to return the whole results in one shot, leading to suspension until all pages are retrieved. +It would be desirable, instead, to start performing the analysis as soon as one page is obtained from the API. + +To do so, the interface of the `RepositoryService` has been extended with new methods, `incremental***`, returning a terminable channel of results: ```scala -private class DirectAppController(using Async) extends AppController: +trait RepositoryService: + def incrementalRepositoriesOf( + organizationName: String, + )(using Async): ReadableChannel[Terminable[Either[String, Repository]]] + + def incrementalContributorsOf( + organizationName: String, + repositoryName: String, + )(using Async): ReadableChannel[Terminable[Either[String, Contribution]]] + + // ... + +``` + + + +--- + + ## Conclusions diff --git a/docs/content/docs/04-rears.md b/docs/content/docs/04-rears.md index 40766b33..fef4bc11 100644 --- a/docs/content/docs/04-rears.md +++ b/docs/content/docs/04-rears.md @@ -1,3 +1,7 @@ +--- +bookToc: false +--- + # An attempt to bring reactivity principles into gears So far, we've explored the basics of asynchronous abstraction mechanisms provided by the direct style of the Scala Gears and Kotlin Coroutines frameworks. diff --git a/docs/content/docs/05-conclusions.md b/docs/content/docs/05-conclusions.md index dd9531c1..d89ad5a9 100644 --- a/docs/content/docs/05-conclusions.md +++ b/docs/content/docs/05-conclusions.md @@ -1,3 +1,7 @@ +--- +bookToc: false +--- + # Going further and conclusions ![expected result](/static/analyzer-e2e.png)