Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#247 Implement basics of FlowReader #306

Open
wants to merge 58 commits into
base: master
Choose a base branch
from

Conversation

benedeki
Copy link
Contributor

@benedeki benedeki commented Dec 11, 2024

  • FlowReadeis now a simple class to just retrieve checkpoints of the identified flow.
  • Checkpoints are in the form of DTO without any extra functionality
  • Checkpoints are requested and returned in the form of pages with page offset and its size as parameters
  • ApiPaths moved from_Server_ to Model
  • Some minor renames in package names basic -> core
  • Fixes in License header years

Closes #247

Release notes:

  • FlowReader implemented to return the flow's checkpoints (requested and returned in a paginated form)

benedeki and others added 30 commits October 31, 2024 01:23
* created new module Info
* the new modul added to JaCoco and CI routines
* JaCoCo exclusion for model
* created Provider to query the data from server
* support for Future, IO, and ZIO based providers
* work in progress
@benedeki benedeki added the dependent The item depends on some other open item (Issue or PR) label Feb 5, 2025
val params = Map(
"limit" -> pageSize.toString,
"offset" -> offset.toString
) ++ checkpointName.map("checkpoint-name" -> _)
Copy link
Collaborator

@lsulak lsulak Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider to extract this into some sort of a constant, but I don't insist - not sure if this is gonna be used elsewhere in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.


for {
mainPartitioningId <- partitioningId(mainFlowPartitioning)
flowId <- mainPartitioningId.project(queryFlowId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the sake of better readability, I'd consider to reflect either types in the name - for example here it could be flowIdOrError or so

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same below etc - all of these 'wrapper' types are a bit easier to read and maintain truth

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

for {
mainPartitioningId <- partitioningId(mainFlowPartitioning)
flowId <- mainPartitioningId.project(queryFlowId)
checkpoints <- flowId.project(queryCheckpoints(_, checkpointName, pageSize, offset))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, I find this insanely complicated / advanced :D

IdMonad.unit(RequestFail(new RequestException("Not used"){}))
}

val source = PaginatedResponse(Seq(1, 2, 3),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the formatting is really wild here :D

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seriously, sometimes it worsens code readability as it's a bit unpredictable / misleading

assert(result.items == Vector(1, 2, 3))
assert(!result.hasNext)
assert(result.limit == 3)
assert(result.pageStart == 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding test on pageEnd as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to increase test coverage more, computation seems correct


private val partitioningEncoded = "W3sia2V5IjoiYSIsInZhbHVlIjoiYiJ9LHsia2V5IjoiYyIsInZhbHVlIjoiZCJ9XQ=="

private val partitioningResponse =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider to extract the test data to another file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reuse, and not that many. Imho OK to be here.

processEndTime = Some(ZonedDateTime.parse("2024-12-30T16:01:36.5052109+01:00[Europe/Budapest]")),
measurements = Set(
LongMeasurement(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary empty line

@@ -0,0 +1,47 @@
/*
* Copyright 2024 ABSA Group Limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021 - there might be more occurrences like this, but I didn't check

@benedeki benedeki changed the base branch from feature/247-paging-in-reader to master February 21, 2025 11:39
@benedeki benedeki removed work in progress Work on this item is not yet finished (mainly intended for PRs) dependent The item depends on some other open item (Issue or PR) labels Feb 21, 2025
}
}

private def queryCheckpoints(flowId: Long,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalafmt formatting hasn't been applied

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it, but the difference was an empty line, one missing and one surplus white space
I though, the formatting was off making it harder to read. This is hardly the case.
Adhering strictly to scalafmt prescribed format is not the adopted team policy.
(same in the other cases bellow)

import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements
import za.co.absa.atum.reader.requests.QueryParamNames
import za.co.absa.atum.reader.server.ServerConfig

/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer this

class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)(implicit
  serverConfig: ServerConfig,
  backend: SttpBackend[F, Any],
) extends Reader[F]

over this given the implicit monaderror ev isn't used explicitly

class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)(implicit
  serverConfig: ServerConfig,
  backend: SttpBackend[F, Any],
  ev: MonadError[F]
) extends Reader[F]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a reason why it was done like this. but maybe with the cut code, that might have dissappeared (I think it was needed for Page creation). I will doublecheck.

getQuery(endpoint, params)
}

def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
Copy link
Collaborator

@salamonpavel salamonpavel Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(For consideration) I would prefer this over the "project". I think it's not even a good name for such mapping operation that lifts the computation into F context.

def getCheckpointsPage(
    pageSize: Int = 10,
    offset: Long = 0
  ): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
    partitioningId(mainFlowPartitioning).flatMap {
      case Left(error) => MonadError[F].unit(Left(error))
      case Right(mainPartitioningId) =>
        queryFlowId(mainPartitioningId).flatMap {
          case Left(error) => MonadError[F].unit(Left(error))
          case Right(flowId) =>
            queryCheckpoints(flowId, None, pageSize, offset)
        }
    }
  }

Also, the name of the method could be improved. And the "partitioningId" method could be renamed too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I get that the project might not be a best name.
But honestly the suggested code seems to much less understandable than the original for-comprehension. What do you see as the benefit in this form?

Copy link
Collaborator

@salamonpavel salamonpavel Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess mainly the project name bothers me, I agree that from readability stand point it's better than nested flatMaps or nested for-comprehensions. How about naming the method liftMapF or sth like that instead? Btw you don't need to explicitly use implicitly :D as there is apply method on the MonadError that does exactly that ...

object EitherImplicits {

  implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal {
    def liftMapF[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match {
      case Right(b) => f(b)
      // case Left(a) => implicitly[MonadError[F]].unit(Left(a))
      case Left(a) => MonadError[F].unit(Left(a))
    }
  }

}


object MonadError {
  def apply[F[_]: MonadError]: MonadError[F] = implicitly[MonadError[F]]
}

import za.co.absa.atum.model.dto.PartitioningWithIdDTO
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.AtumPartitionsOps
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import RequestResult.RequestResult

trait PartitioningIdProvider[F[_]] {self: Reader[F] =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalafmt formatting hasn't been applied

sealed abstract class RequestException(message: String) extends ReaderException(message)


object RequestException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalafmt formatting hasn't been applied

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement basics of FlowReader
3 participants