-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#247 Implement basics of FlowReader #306
base: master
Are you sure you want to change the base?
Conversation
* created new module Info * the new modul added to JaCoco and CI routines
* JaCoCo exclusion for model
* created Provider to query the data from server * support for Future, IO, and ZIO based providers * work in progress
* fixed license headers
…endpoints-from-info-module
…endpoints-from-info-module
val params = Map( | ||
"limit" -> pageSize.toString, | ||
"offset" -> offset.toString | ||
) ++ checkpointName.map("checkpoint-name" -> _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider to extract this into some sort of a constant, but I don't insist - not sure if this is gonna be used elsewhere in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
|
||
for { | ||
mainPartitioningId <- partitioningId(mainFlowPartitioning) | ||
flowId <- mainPartitioningId.project(queryFlowId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the sake of better readability, I'd consider to reflect either types in the name - for example here it could be flowIdOrError
or so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same below etc - all of these 'wrapper' types are a bit easier to read and maintain truth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
for { | ||
mainPartitioningId <- partitioningId(mainFlowPartitioning) | ||
flowId <- mainPartitioningId.project(queryFlowId) | ||
checkpoints <- flowId.project(queryCheckpoints(_, checkpointName, pageSize, offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, I find this insanely complicated / advanced :D
IdMonad.unit(RequestFail(new RequestException("Not used"){})) | ||
} | ||
|
||
val source = PaginatedResponse(Seq(1, 2, 3), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the formatting is really wild here :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seriously, sometimes it worsens code readability as it's a bit unpredictable / misleading
assert(result.items == Vector(1, 2, 3)) | ||
assert(!result.hasNext) | ||
assert(result.limit == 3) | ||
assert(result.pageStart == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider adding test on pageEnd
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to increase test coverage more, computation seems correct
|
||
private val partitioningEncoded = "W3sia2V5IjoiYSIsInZhbHVlIjoiYiJ9LHsia2V5IjoiYyIsInZhbHVlIjoiZCJ9XQ==" | ||
|
||
private val partitioningResponse = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider to extract the test data to another file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reuse, and not that many. Imho OK to be here.
processEndTime = Some(ZonedDateTime.parse("2024-12-30T16:01:36.5052109+01:00[Europe/Budapest]")), | ||
measurements = Set( | ||
LongMeasurement( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary empty line
@@ -0,0 +1,47 @@ | |||
/* | |||
* Copyright 2024 ABSA Group Limited |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2021 - there might be more occurrences like this, but I didn't check
* addressed PR comments
} | ||
} | ||
|
||
private def queryCheckpoints(flowId: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scalafmt formatting hasn't been applied
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it, but the difference was an empty line, one missing and one surplus white space
I though, the formatting was off making it harder to read. This is hardly the case.
Adhering strictly to scalafmt prescribed format is not the adopted team policy.
(same in the other cases bellow)
import za.co.absa.atum.model.ApiPaths._ | ||
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader} | ||
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements | ||
import za.co.absa.atum.reader.requests.QueryParamNames | ||
import za.co.absa.atum.reader.server.ServerConfig | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer this
class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)(implicit
serverConfig: ServerConfig,
backend: SttpBackend[F, Any],
) extends Reader[F]
over this given the implicit monaderror ev isn't used explicitly
class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)(implicit
serverConfig: ServerConfig,
backend: SttpBackend[F, Any],
ev: MonadError[F]
) extends Reader[F]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a reason why it was done like this. but maybe with the cut code, that might have dissappeared (I think it was needed for Page
creation). I will doublecheck.
getQuery(endpoint, params) | ||
} | ||
|
||
def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(For consideration) I would prefer this over the "project". I think it's not even a good name for such mapping operation that lifts the computation into F context.
def getCheckpointsPage(
pageSize: Int = 10,
offset: Long = 0
): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
partitioningId(mainFlowPartitioning).flatMap {
case Left(error) => MonadError[F].unit(Left(error))
case Right(mainPartitioningId) =>
queryFlowId(mainPartitioningId).flatMap {
case Left(error) => MonadError[F].unit(Left(error))
case Right(flowId) =>
queryCheckpoints(flowId, None, pageSize, offset)
}
}
}
Also, the name of the method could be improved. And the "partitioningId" method could be renamed too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I get that the project
might not be a best name.
But honestly the suggested code seems to much less understandable than the original for
-comprehension. What do you see as the benefit in this form?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess mainly the project name bothers me, I agree that from readability stand point it's better than nested flatMaps or nested for-comprehensions. How about naming the method liftMapF or sth like that instead? Btw you don't need to explicitly use implicitly :D as there is apply method on the MonadError that does exactly that ...
object EitherImplicits {
implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal {
def liftMapF[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match {
case Right(b) => f(b)
// case Left(a) => implicitly[MonadError[F]].unit(Left(a))
case Left(a) => MonadError[F].unit(Left(a))
}
}
}
object MonadError {
def apply[F[_]: MonadError]: MonadError[F] = implicitly[MonadError[F]]
}
import za.co.absa.atum.model.dto.PartitioningWithIdDTO | ||
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse | ||
import za.co.absa.atum.model.types.basic.AtumPartitions | ||
import za.co.absa.atum.model.types.basic.AtumPartitionsOps | ||
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax | ||
import za.co.absa.atum.reader.basic.RequestResult.RequestResult | ||
import RequestResult.RequestResult | ||
|
||
trait PartitioningIdProvider[F[_]] {self: Reader[F] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scalafmt formatting hasn't been applied
sealed abstract class RequestException(message: String) extends ReaderException(message) | ||
|
||
|
||
object RequestException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scalafmt formatting hasn't been applied
FlowReade
is now a simple class to just retrieve checkpoints of the identified flow.ApiPaths
moved from_Server_ to Modelbasic
->core
Closes #247
Release notes:
FlowReader
implemented to return the flow's checkpoints (requested and returned in a paginated form)