-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#247 Implement basics of FlowReader #306
base: master
Are you sure you want to change the base?
Changes from 57 commits
b97b603
e623974
5e4eadb
2e1e2ea
738c904
df8c9bd
5affd82
0f1e121
d773a93
0776f9c
38fde1c
1ac2233
e6dcb52
6968b02
b9bacef
bbb1e7f
33e6628
f7ced56
ca2116b
e5e6f63
fe07272
7656f6f
eb9a678
7641c07
bc82a5b
0e7675e
432716a
11b0a16
2c3f145
e07dffb
3955a50
b287a66
d04d23b
c344249
c0b0988
b53ba99
55d60e1
5dfe5c5
67ffe07
09e2ed8
e7ff732
e63a2e4
1488d1f
96ffa33
8b69e3e
04b56bd
698b765
855a333
be90711
afd64a6
f70143e
f803ea3
1a56f3d
bbed1f0
15dd12a
c630fa0
f4ce4d3
56cfa09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
<mxfile host="app.diagrams.net" agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0" version="24.8.6"> | ||
<diagram name="Page-1" id="M1M2r3vxqz2qx0Wid1ZL"> | ||
<mxGraphModel dx="1434" dy="733" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0"> | ||
<root> | ||
<mxCell id="0" /> | ||
<mxCell id="1" parent="0" /> | ||
<mxCell id="rmWetNVkiCoe2Ea992-S-1" value="PartitioningReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1"> | ||
<mxGeometry x="440" y="40" width="120" height="60" as="geometry" /> | ||
</mxCell> | ||
<mxCell id="rmWetNVkiCoe2Ea992-S-2" value="FlowReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1"> | ||
<mxGeometry x="60" y="40" width="120" height="60" as="geometry" /> | ||
</mxCell> | ||
<mxCell id="rmWetNVkiCoe2Ea992-S-4" value="<div align="left"><ul><li><b>Identified by the main paritioining</b></li><li>ability to retrieve <i>id_flow</i> based on the main partitioing <br></li><li>Getting the checkpoints belonging to the flow</li><li>Getting checkpoints of certain name only</li><li>Getting it's partitioning (phase 2)</li></ul></div>" style="rounded=0;whiteSpace=wrap;html=1;align=left;" vertex="1" parent="1"> | ||
<mxGeometry x="80" y="110" width="240" height="160" as="geometry" /> | ||
</mxCell> | ||
<mxCell id="rmWetNVkiCoe2Ea992-S-5" value="<div align="left">Data returned<ul><li>Checkpoints including partitioiing info in form of DTOs</li><li>returns just the dto as pagination</li></ul></div>" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1"> | ||
<mxGeometry x="80" y="310" width="250" height="130" as="geometry" /> | ||
</mxCell> | ||
<mxCell id="rmWetNVkiCoe2Ea992-S-7" value="<blockquote><div align="left"><li><b>Identified by the paritioining</b></li></div><div align="left"><li>ability to retrieve <i>id_partitioing</i> based on the partitioing </li><li>Getting the checkpoints</li><li>Getting checkpoint(s) filtered by name</li><li>Getting the additional data</li><li>Getting the checkpoints + additional data in form of _INFO file format (phase 2)</li></div></blockquote>" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1"> | ||
<mxGeometry x="440" y="110" width="240" height="160" as="geometry" /> | ||
</mxCell> | ||
<mxCell id="rmWetNVkiCoe2Ea992-S-8" value="<div align="left">Data returned<ul><li>Checkpoints in the form of DTO in Pagination DTO</li><li>additional data as their DTO</li></ul></div>" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1"> | ||
<mxGeometry x="440" y="320" width="240" height="120" as="geometry" /> | ||
</mxCell> | ||
</root> | ||
</mxGraphModel> | ||
</diagram> | ||
</mxfile> |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,15 @@ package za.co.absa.atum.reader | |
|
||
import sttp.client3.SttpBackend | ||
import sttp.monad.MonadError | ||
import sttp.monad.syntax._ | ||
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO} | ||
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} | ||
import za.co.absa.atum.model.types.basic.AtumPartitions | ||
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader} | ||
import za.co.absa.atum.reader.core.RequestResult.RequestResult | ||
import za.co.absa.atum.model.ApiPaths._ | ||
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader} | ||
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements | ||
import za.co.absa.atum.reader.requests.QueryParamNames | ||
import za.co.absa.atum.reader.server.ServerConfig | ||
|
||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer this class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)(implicit
serverConfig: ServerConfig,
backend: SttpBackend[F, Any],
) extends Reader[F] over this given the implicit monaderror ev isn't used explicitly class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)(implicit
serverConfig: ServerConfig,
backend: SttpBackend[F, Any],
ev: MonadError[F]
) extends Reader[F] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a reason why it was done like this. but maybe with the cut code, that might have dissappeared (I think it was needed for |
||
|
@@ -35,4 +42,40 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions) | |
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) | ||
extends Reader[F] with PartitioningIdProvider[F]{ | ||
|
||
private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = { | ||
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}" | ||
val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint) | ||
queryResult.map{ result => | ||
result.map(_.data.id) | ||
} | ||
} | ||
|
||
private def queryCheckpoints(flowId: Long, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scalafmt formatting hasn't been applied There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will fix. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed it, but the difference was an empty line, one missing and one surplus white space |
||
checkpointName: Option[String], | ||
limit: Int, | ||
offset: Long): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { | ||
val endpoint = s"/$Api/$V2/${V2Paths.Flows}/$flowId/${V2Paths.Checkpoints}" | ||
val params = Map( | ||
QueryParamNames.limit -> limit.toString, | ||
QueryParamNames.offset -> offset.toString | ||
) ++ checkpointName.map(QueryParamNames.checkpointName -> _) | ||
getQuery(endpoint, params) | ||
} | ||
|
||
def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (For consideration) I would prefer this over the "project". I think it's not even a good name for such mapping operation that lifts the computation into F context. def getCheckpointsPage(
pageSize: Int = 10,
offset: Long = 0
): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
partitioningId(mainFlowPartitioning).flatMap {
case Left(error) => MonadError[F].unit(Left(error))
case Right(mainPartitioningId) =>
queryFlowId(mainPartitioningId).flatMap {
case Left(error) => MonadError[F].unit(Left(error))
case Right(flowId) =>
queryCheckpoints(flowId, None, pageSize, offset)
}
}
} Also, the name of the method could be improved. And the "partitioningId" method could be renamed too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I get that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess mainly the project name bothers me, I agree that from readability stand point it's better than nested flatMaps or nested for-comprehensions. How about naming the method liftMapF or sth like that instead? Btw you don't need to explicitly use implicitly :D as there is apply method on the MonadError that does exactly that ... object EitherImplicits {
implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal {
def liftMapF[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match {
case Right(b) => f(b)
// case Left(a) => implicitly[MonadError[F]].unit(Left(a))
case Left(a) => MonadError[F].unit(Left(a))
}
}
}
object MonadError {
def apply[F[_]: MonadError]: MonadError[F] = implicitly[MonadError[F]]
} |
||
for { | ||
mainPartitioningIdOrErrror <- partitioningId(mainFlowPartitioning) | ||
flowIdOrError <- mainPartitioningIdOrErrror.project(queryFlowId) | ||
checkpointsOrError <- flowIdOrError.project(queryCheckpoints(_, None, pageSize, offset)) | ||
} yield checkpointsOrError | ||
} | ||
|
||
def getCheckpointsOfNamePage(checkpointName: String, pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { | ||
for { | ||
mainPartitioningId <- partitioningId(mainFlowPartitioning) | ||
flowId <- mainPartitioningId.project(queryFlowId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for the sake of better readability, I'd consider to reflect either types in the name - for example here it could be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same below etc - all of these 'wrapper' types are a bit easier to read and maintain truth There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do |
||
checkpoints <- flowId.project(queryCheckpoints(_, Some(checkpointName), pageSize, offset)) | ||
} yield checkpoints | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,22 +14,23 @@ | |
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.reader.basic | ||
package za.co.absa.atum.reader.core | ||
|
||
import sttp.monad.MonadError | ||
import sttp.monad.syntax._ | ||
import za.co.absa.atum.model.ApiPaths._ | ||
import za.co.absa.atum.model.dto.PartitioningWithIdDTO | ||
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse | ||
import za.co.absa.atum.model.types.basic.AtumPartitions | ||
import za.co.absa.atum.model.types.basic.AtumPartitionsOps | ||
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax | ||
import za.co.absa.atum.reader.basic.RequestResult.RequestResult | ||
import RequestResult.RequestResult | ||
|
||
trait PartitioningIdProvider[F[_]] {self: Reader[F] => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scalafmt formatting hasn't been applied |
||
def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = { | ||
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString | ||
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning)) | ||
queryResult.map{result => | ||
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](s"/$Api/$V2/${V2Paths.Partitionings}", Map("partitioning" -> encodedPartitioning)) | ||
queryResult.map{ result => | ||
result.map(_.data.id) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.reader.exceptions | ||
|
||
class ReaderException(message: String) extends Exception(message) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.reader.exceptions | ||
|
||
import sttp.model.{StatusCode, Uri} | ||
import za.co.absa.atum.model.envelopes.ErrorResponse | ||
|
||
sealed abstract class RequestException(message: String) extends ReaderException(message) | ||
|
||
|
||
object RequestException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scalafmt formatting hasn't been applied |
||
type CirceError = io.circe.Error | ||
|
||
final case class HttpException( | ||
message: String, | ||
statusCode: StatusCode, | ||
errorResponse: ErrorResponse, | ||
request: Uri | ||
) extends RequestException(message) | ||
|
||
final case class ParsingException( | ||
message: String, | ||
body: String | ||
) extends RequestException(message) | ||
object ParsingException { | ||
def fromCirceError(error: CirceError, body: String): ParsingException = { | ||
ParsingException(error.getMessage, body) | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.reader.implicits | ||
|
||
import sttp.monad.MonadError | ||
|
||
object EitherImplicits { | ||
|
||
implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal { | ||
def project[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match { | ||
case Right(b) => f(b) | ||
case Left(a) => implicitly[MonadError[F]].unit(Left(a)) | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.reader.requests | ||
|
||
object QueryParamNames { | ||
val limit = "limit" | ||
val offset = "offset" | ||
val checkpointName = "checkpoint-name" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah something inside me says that this is not really really a 'model' but in general I'm okay with this; finding a better name for 'model' is difficult and it's backward incompatible change; alternatively, creating a new model for this seems a bit too much, so I prefer your solution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider the
model
as the library to make communication with server "easy". So IMHO it fits, without too much mind-bending 😉There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure such things should be part of model package. I think it belongs to the server package. And if need be you can define such api paths also in the reader. Duplicity in this particular case isn't a problem IMHO ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for existence of a file with constants is to avoid deviation of the values used on different places. Creating two files with the same constants contradicts the main purpose.
What's the problem here? That it's in the model module, to which it doesn't not belong by purest standard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the problem is the placement in Model module, I can use Sbt's
unmanagedSourceDirectories
setting and create a shared folder.