Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#247 Implement basics of FlowReader #306

Open
wants to merge 58 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b97b603
#244: Create the Info module
benedeki Aug 19, 2024
e623974
* fixed License headers
benedeki Aug 19, 2024
5e4eadb
* renamed to _Reader_
benedeki Aug 24, 2024
2e1e2ea
* README.md update
benedeki Aug 25, 2024
738c904
* fix
benedeki Aug 25, 2024
df8c9bd
* JaCoCO action update
benedeki Aug 26, 2024
5affd82
* added dummy code for testing coverage
benedeki Aug 26, 2024
0f1e121
* erroneous class renamed
benedeki Aug 26, 2024
d773a93
* Deleted wrong files
benedeki Aug 26, 2024
0776f9c
#245 Add the ability to query REST endpoints from Reader module
benedeki Sep 10, 2024
38fde1c
* Work still in progress
benedeki Sep 23, 2024
1ac2233
* the first working commit
benedeki Nov 1, 2024
e6dcb52
* Removed temporary notes
benedeki Nov 1, 2024
6968b02
* introduced `MonadError` into the `GenericServerConnection`
benedeki Nov 1, 2024
b9bacef
* Fixed UTs
benedeki Nov 1, 2024
bbb1e7f
* trying to get rid of Java 11 dependency
benedeki Nov 4, 2024
33e6628
* Downgraded sttpClient
benedeki Nov 4, 2024
f7ced56
* further downgrade
benedeki Nov 5, 2024
ca2116b
* Removed exceptions
benedeki Nov 6, 2024
e5e6f63
* commented out parts of README.md which are not yet part of the code
benedeki Nov 6, 2024
fe07272
- major rework
benedeki Nov 17, 2024
7656f6f
* doc fix
benedeki Nov 17, 2024
eb9a678
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 17, 2024
7641c07
* disabled failing test
benedeki Nov 17, 2024
bc82a5b
* adjustments
benedeki Nov 18, 2024
0e7675e
- further cleaning
benedeki Nov 18, 2024
432716a
* tests progress
benedeki Nov 21, 2024
11b0a16
* several UTs added
benedeki Nov 22, 2024
2c3f145
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 22, 2024
e07dffb
* last improvements before PR ready
benedeki Nov 24, 2024
3955a50
* description to class `ServerConfig`
benedeki Nov 25, 2024
b287a66
* removed empty line
benedeki Nov 25, 2024
d04d23b
* addressed PR comments
benedeki Nov 27, 2024
c344249
* just better implementation
benedeki Nov 30, 2024
c0b0988
#247: Implement basics of FlowReader
benedeki Dec 4, 2024
b53ba99
Merge remote-tracking branch 'remotes/origin/master' into feature/247…
benedeki Dec 4, 2024
55d60e1
* major progress
benedeki Dec 9, 2024
5dfe5c5
* Further progress
benedeki Dec 9, 2024
67ffe07
* Flow reader methods to read checkpoints
benedeki Dec 11, 2024
09e2ed8
* small fixes
benedeki Dec 11, 2024
e7ff732
* License year
benedeki Dec 11, 2024
e63a2e4
* Finished implementation
benedeki Feb 2, 2025
1488d1f
#247: Paging support in Reader module
benedeki Feb 4, 2025
96ffa33
* some omitted files
benedeki Feb 4, 2025
8b69e3e
Merge branch 'master' into feature/247-paging-in-reader
benedeki Feb 4, 2025
04b56bd
* filenames check exclusions fix
benedeki Feb 4, 2025
698b765
Merge branch 'feature/247-paging-in-reader' of https://github.com/Abs…
benedeki Feb 4, 2025
855a333
* further fix
benedeki Feb 4, 2025
be90711
* fix trial 3
benedeki Feb 4, 2025
afd64a6
* Licenses fix
benedeki Feb 4, 2025
f70143e
Merge branch 'feature/247-paging-in-reader' into feature/247-implemen…
benedeki Feb 5, 2025
f803ea3
* uncommented endpoint for Swagger documentation creation
benedeki Feb 5, 2025
1a56f3d
* complete refactoring
benedeki Feb 21, 2025
bbed1f0
* added adr
benedeki Feb 21, 2025
15dd12a
* removing unnecessary changes
benedeki Feb 21, 2025
c630fa0
* removed unnecessary changes II
benedeki Feb 21, 2025
f4ce4d3
* Typo fixes
benedeki Feb 21, 2025
56cfa09
* addressed more PR comments
benedeki Feb 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test_filenames_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
server/src/test/scala/za/co/absa/atum/server/api/TestData.scala,
server/src/test/scala/za/co/absa/atum/server/api/TestTransactorProvider.scala,
server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala,
model/src/test/scala/za/co/absa/atum/testing/*
model/src/test/scala/za/co/absa/atum/testing/*,
reader/src/test/scala/za/co/absa/atum/testing/*
verbose-logging: 'false'
fail-on-violation: 'true'
28 changes: 28 additions & 0 deletions adrs/01_Basics-of-FlowReader-and-PartitioningReader.drawio
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<mxfile host="app.diagrams.net" agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0" version="24.8.6">
<diagram name="Page-1" id="M1M2r3vxqz2qx0Wid1ZL">
<mxGraphModel dx="1434" dy="733" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0">
<root>
<mxCell id="0" />
<mxCell id="1" parent="0" />
<mxCell id="rmWetNVkiCoe2Ea992-S-1" value="PartitioningReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="440" y="40" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-2" value="FlowReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="60" y="40" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-4" value="&lt;div align=&quot;left&quot;&gt;&lt;ul&gt;&lt;li&gt;&lt;b&gt;Identified by the main paritioining&lt;/b&gt;&lt;/li&gt;&lt;li&gt;ability to retrieve &lt;i&gt;id_flow&lt;/i&gt; based on the main partitioing &lt;br&gt;&lt;/li&gt;&lt;li&gt;Getting the checkpoints belonging to the flow&lt;/li&gt;&lt;li&gt;Getting checkpoints of certain name only&lt;/li&gt;&lt;li&gt;Getting it&#39;s partitioning (phase 2)&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;align=left;" vertex="1" parent="1">
<mxGeometry x="80" y="110" width="240" height="160" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-5" value="&lt;div align=&quot;left&quot;&gt;Data returned&lt;ul&gt;&lt;li&gt;Checkpoints including partitioiing info in form of DTOs&lt;/li&gt;&lt;li&gt;returns just the dto as pagination&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="80" y="310" width="250" height="130" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-7" value="&lt;blockquote&gt;&lt;div align=&quot;left&quot;&gt;&lt;li&gt;&lt;b&gt;Identified by the paritioining&lt;/b&gt;&lt;/li&gt;&lt;/div&gt;&lt;div align=&quot;left&quot;&gt;&lt;li&gt;ability to retrieve &lt;i&gt;id_partitioing&lt;/i&gt; based on the partitioing &lt;/li&gt;&lt;li&gt;Getting the checkpoints&lt;/li&gt;&lt;li&gt;Getting checkpoint(s) filtered by name&lt;/li&gt;&lt;li&gt;Getting the additional data&lt;/li&gt;&lt;li&gt;Getting the checkpoints + additional data in form of _INFO file format (phase 2)&lt;/li&gt;&lt;/div&gt;&lt;/blockquote&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="440" y="110" width="240" height="160" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-8" value="&lt;div align=&quot;left&quot;&gt;Data returned&lt;ul&gt;&lt;li&gt;Checkpoints in the form of DTO in Pagination DTO&lt;/li&gt;&lt;li&gt;additional data as their DTO&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="440" y="320" width="240" height="120" as="geometry" />
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.api.http
package za.co.absa.atum.model
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah something inside me says that this is not really really a 'model' but in general I'm okay with this; finding a better name for 'model' is difficult and it's backward incompatible change; alternatively, creating a new model for this seems a bit too much, so I prefer your solution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider the model as the library to make communication with server "easy". So IMHO it fits, without too much mind-bending 😉

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure such things should be part of model package. I think it belongs to the server package. And if need be you can define such api paths also in the reader. Duplicity in this particular case isn't a problem IMHO ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for existence of a file with constants is to avoid deviation of the values used on different places. Creating two files with the same constants contradicts the main purpose.
What's the problem here? That it's in the model module, to which it doesn't not belong by purest standard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the problem is the placement in Model module, I can use Sbt's unmanagedSourceDirectories setting and create a shared folder.


object ApiPaths {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 ABSA Group Limited
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 ABSA Group Limited
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 ABSA Group Limited
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
45 changes: 44 additions & 1 deletion reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ package za.co.absa.atum.reader

import sttp.client3.SttpBackend
import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO}
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.core.RequestResult.RequestResult
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements
import za.co.absa.atum.reader.requests.QueryParamNames
import za.co.absa.atum.reader.server.ServerConfig

/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer this

class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)(implicit
  serverConfig: ServerConfig,
  backend: SttpBackend[F, Any],
) extends Reader[F]

over this given the implicit monaderror ev isn't used explicitly

class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)(implicit
  serverConfig: ServerConfig,
  backend: SttpBackend[F, Any],
  ev: MonadError[F]
) extends Reader[F]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a reason why it was done like this. but maybe with the cut code, that might have dissappeared (I think it was needed for Page creation). I will doublecheck.

Expand All @@ -35,4 +42,40 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
extends Reader[F] with PartitioningIdProvider[F]{

private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}"
val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint)
queryResult.map{ result =>
result.map(_.data.id)
}
}

private def queryCheckpoints(flowId: Long,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalafmt formatting hasn't been applied

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it, but the difference was an empty line, one missing and one surplus white space
I though, the formatting was off making it harder to read. This is hardly the case.
Adhering strictly to scalafmt prescribed format is not the adopted team policy.
(same in the other cases bellow)

checkpointName: Option[String],
limit: Int,
offset: Long): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Flows}/$flowId/${V2Paths.Checkpoints}"
val params = Map(
QueryParamNames.limit -> limit.toString,
QueryParamNames.offset -> offset.toString
) ++ checkpointName.map(QueryParamNames.checkpointName -> _)
getQuery(endpoint, params)
}

def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
Copy link
Collaborator

@salamonpavel salamonpavel Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(For consideration) I would prefer this over the "project". I think it's not even a good name for such mapping operation that lifts the computation into F context.

def getCheckpointsPage(
    pageSize: Int = 10,
    offset: Long = 0
  ): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
    partitioningId(mainFlowPartitioning).flatMap {
      case Left(error) => MonadError[F].unit(Left(error))
      case Right(mainPartitioningId) =>
        queryFlowId(mainPartitioningId).flatMap {
          case Left(error) => MonadError[F].unit(Left(error))
          case Right(flowId) =>
            queryCheckpoints(flowId, None, pageSize, offset)
        }
    }
  }

Also, the name of the method could be improved. And the "partitioningId" method could be renamed too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I get that the project might not be a best name.
But honestly the suggested code seems to much less understandable than the original for-comprehension. What do you see as the benefit in this form?

Copy link
Collaborator

@salamonpavel salamonpavel Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess mainly the project name bothers me, I agree that from readability stand point it's better than nested flatMaps or nested for-comprehensions. How about naming the method liftMapF or sth like that instead? Btw you don't need to explicitly use implicitly :D as there is apply method on the MonadError that does exactly that ...

object EitherImplicits {

  implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal {
    def liftMapF[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match {
      case Right(b) => f(b)
      // case Left(a) => implicitly[MonadError[F]].unit(Left(a))
      case Left(a) => MonadError[F].unit(Left(a))
    }
  }

}


object MonadError {
  def apply[F[_]: MonadError]: MonadError[F] = implicitly[MonadError[F]]
}

for {
mainPartitioningIdOrErrror <- partitioningId(mainFlowPartitioning)
flowIdOrError <- mainPartitioningIdOrErrror.project(queryFlowId)
checkpointsOrError <- flowIdOrError.project(queryCheckpoints(_, None, pageSize, offset))
} yield checkpointsOrError
}

def getCheckpointsOfNamePage(checkpointName: String, pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
for {
mainPartitioningId <- partitioningId(mainFlowPartitioning)
flowId <- mainPartitioningId.project(queryFlowId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the sake of better readability, I'd consider to reflect either types in the name - for example here it could be flowIdOrError or so

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same below etc - all of these 'wrapper' types are a bit easier to read and maintain truth

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

checkpoints <- flowId.project(queryCheckpoints(_, Some(checkpointName), pageSize, offset))
} yield checkpoints
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.reader
import sttp.client3.SttpBackend
import sttp.monad.MonadError
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.server.ServerConfig

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@
* limitations under the License.
*/

package za.co.absa.atum.reader.basic
package za.co.absa.atum.reader.core

import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.model.dto.PartitioningWithIdDTO
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.AtumPartitionsOps
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import RequestResult.RequestResult

trait PartitioningIdProvider[F[_]] {self: Reader[F] =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalafmt formatting hasn't been applied

def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = {
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning))
queryResult.map{result =>
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](s"/$Api/$V2/${V2Paths.Partitionings}", Map("partitioning" -> encodedPartitioning))
queryResult.map{ result =>
result.map(_.data.id)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* limitations under the License.
*/

package za.co.absa.atum.reader.basic
package za.co.absa.atum.reader.core

import io.circe.Decoder
import sttp.client3.{Identity, RequestT, ResponseException, SttpBackend, basicRequest}
import sttp.client3.circe.asJson
import sttp.model.Uri
import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.core.RequestResult._
import za.co.absa.atum.reader.server.ServerConfig
import za.co.absa.atum.reader.basic.RequestResult._
import za.co.absa.atum.reader.exceptions.RequestException.CirceError

/**
* Reader is a base class for reading data from a remote server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@
* limitations under the License.
*/

package za.co.absa.atum.reader.basic
package za.co.absa.atum.reader.core

import sttp.client3.{DeserializationException, HttpError, Response, ResponseException}
import sttp.monad.MonadError
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException}
import za.co.absa.atum.reader.exceptions.RequestException

object RequestResult {
type CirceError = io.circe.Error
type RequestResult[R] = Either[ResponseException[ErrorResponse, CirceError], R]
type RequestResult[R] = Either[RequestException, R]

def RequestOK[T](value: T): RequestResult[T] = Right(value)
def RequestFail[T](error: RequestException): RequestResult[T] = Left(error)

implicit class ResponseOps[R](val response: Response[Either[ResponseException[String, CirceError], R]]) extends AnyVal {
def toRequestResult: RequestResult[R] = {
response.body.left.map {
case he: HttpError[String] =>
ErrorResponse.basedOnStatusCode(he.statusCode.code, he.body) match {
case Right(er) => HttpError(er, he.statusCode)
case Left(ce) => DeserializationException(he.body, ce)
case Right(er) => HttpException(he.getMessage, he.statusCode, er, response.request.uri)
case Left(ce) => ParsingException.fromCirceError(ce, he.body)
}
case de: DeserializationException[CirceError] => de
case de: DeserializationException[CirceError] => ParsingException.fromCirceError(de.error, de.body)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

class ReaderException(message: String) extends Exception(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

import sttp.model.{StatusCode, Uri}
import za.co.absa.atum.model.envelopes.ErrorResponse

sealed abstract class RequestException(message: String) extends ReaderException(message)


object RequestException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalafmt formatting hasn't been applied

type CirceError = io.circe.Error

final case class HttpException(
message: String,
statusCode: StatusCode,
errorResponse: ErrorResponse,
request: Uri
) extends RequestException(message)

final case class ParsingException(
message: String,
body: String
) extends RequestException(message)
object ParsingException {
def fromCirceError(error: CirceError, body: String): ParsingException = {
ParsingException(error.getMessage, body)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.implicits

import sttp.monad.MonadError

object EitherImplicits {

implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal {
def project[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match {
case Right(b) => f(b)
case Left(a) => implicitly[MonadError[F]].unit(Left(a))
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.requests

object QueryParamNames {
val limit = "limit"
val offset = "offset"
val checkpointName = "checkpoint-name"
}
Loading
Loading