diff --git a/.github/workflows/test_filenames_check.yml b/.github/workflows/test_filenames_check.yml index b0cf8b7bf..b5c805c55 100644 --- a/.github/workflows/test_filenames_check.yml +++ b/.github/workflows/test_filenames_check.yml @@ -42,6 +42,7 @@ jobs: server/src/test/scala/za/co/absa/atum/server/api/TestData.scala, server/src/test/scala/za/co/absa/atum/server/api/TestTransactorProvider.scala, server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala, - model/src/test/scala/za/co/absa/atum/testing/* + model/src/test/scala/za/co/absa/atum/testing/*, + reader/src/test/scala/za/co/absa/atum/testing/* verbose-logging: 'false' fail-on-violation: 'true' diff --git a/adrs/01_Basics-of-FlowReader-and-PartitioningReader.drawio b/adrs/01_Basics-of-FlowReader-and-PartitioningReader.drawio new file mode 100644 index 000000000..6dbcf05db --- /dev/null +++ b/adrs/01_Basics-of-FlowReader-and-PartitioningReader.drawio @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala b/model/src/main/scala/za/co/absa/atum/model/ApiPaths.scala similarity index 96% rename from server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala rename to model/src/main/scala/za/co/absa/atum/model/ApiPaths.scala index 3e5903082..b7f99788f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala +++ b/model/src/main/scala/za/co/absa/atum/model/ApiPaths.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.atum.server.api.http +package za.co.absa.atum.model object ApiPaths { diff --git a/model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala index 11a529d02..cbd5e5fc1 100644 --- a/model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala +++ b/model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala @@ -1,5 +1,5 @@ /* - * Copyright 2024 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala index 4ca1ac9df..f07b8727e 100644 --- a/model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala +++ b/model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala @@ -1,5 +1,5 @@ /* - * Copyright 2024 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala index 830f4e9b7..25a815891 100644 --- a/model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala +++ b/model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala @@ -1,5 +1,5 @@ /* - * Copyright 2024 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala index 6a99bbe40..031953a0d 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala @@ -18,21 +18,65 @@ package za.co.absa.atum.reader import sttp.client3.SttpBackend import sttp.monad.MonadError +import sttp.monad.syntax._ +import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO} +import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import za.co.absa.atum.model.types.basic.AtumPartitions -import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader} +import za.co.absa.atum.reader.core.RequestResult.RequestResult +import za.co.absa.atum.model.ApiPaths._ +import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader} +import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements +import za.co.absa.atum.reader.requests.QueryParamNames import za.co.absa.atum.reader.server.ServerConfig /** * This class is a reader that reads data tight to a flow. - * @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning' - * @param serverConfig - the Atum server configuration - * @param backend - sttp backend, that will be executing the requests - * @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context - * bounds, as it make the imports easier to follow - * @tparam F - the effect type (e.g. Future, IO, Task, etc.) + * + * @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning' + * @param serverConfig - the Atum server configuration + * @param backend - sttp backend, that will be executing the requests + * @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context + * bounds, as it make the imports easier to follow + * @tparam F - the effect type (e.g. Future, IO, Task, etc.) */ -class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions) - (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) - extends Reader[F] with PartitioningIdProvider[F]{ +class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions) + (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any]) + extends Reader[F] with PartitioningIdProvider[F] { + + private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = { + val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}" + val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint) + queryResult.map { result => + result.map(_.data.id) + } + } + + private def queryCheckpoints(flowId: Long, + checkpointName: Option[String], + limit: Int, + offset: Long): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { + val endpoint = s"/$Api/$V2/${V2Paths.Flows}/$flowId/${V2Paths.Checkpoints}" + val params = Map( + QueryParamNames.limit -> limit.toString, + QueryParamNames.offset -> offset.toString + ) ++ checkpointName.map(QueryParamNames.checkpointName -> _) + getQuery(endpoint, params) + } + + def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { + for { + mainPartitioningIdOrErrror <- partitioningId(mainFlowPartitioning) + flowIdOrError <- mainPartitioningIdOrErrror.project(queryFlowId) + checkpointsOrError <- flowIdOrError.project(queryCheckpoints(_, None, pageSize, offset)) + } yield checkpointsOrError + } + + def getCheckpointsOfNamePage(checkpointName: String, pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { + for { + mainPartitioningId <- partitioningId(mainFlowPartitioning) + flowId <- mainPartitioningId.project(queryFlowId) + checkpoints <- flowId.project(queryCheckpoints(_, Some(checkpointName), pageSize, offset)) + } yield checkpoints + } } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index f103605b6..334643f44 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.reader import sttp.client3.SttpBackend import sttp.monad.MonadError import za.co.absa.atum.model.types.basic.AtumPartitions -import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader} +import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader} import za.co.absa.atum.reader.server.ServerConfig /** diff --git a/reader/src/main/scala/za/co/absa/atum/reader/basic/PartitioningIdProvider.scala b/reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala similarity index 78% rename from reader/src/main/scala/za/co/absa/atum/reader/basic/PartitioningIdProvider.scala rename to reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala index 8a802ff02..3398c7401 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/basic/PartitioningIdProvider.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala @@ -14,22 +14,27 @@ * limitations under the License. */ -package za.co.absa.atum.reader.basic +package za.co.absa.atum.reader.core import sttp.monad.MonadError import sttp.monad.syntax._ +import za.co.absa.atum.model.ApiPaths._ import za.co.absa.atum.model.dto.PartitioningWithIdDTO import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse import za.co.absa.atum.model.types.basic.AtumPartitions import za.co.absa.atum.model.types.basic.AtumPartitionsOps import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax -import za.co.absa.atum.reader.basic.RequestResult.RequestResult +import za.co.absa.atum.reader.core.RequestResult.RequestResult -trait PartitioningIdProvider[F[_]] {self: Reader[F] => +trait PartitioningIdProvider[F[_]] { + self: Reader[F] => def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = { val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString - val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning)) - queryResult.map{result => + val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]( + s"/$Api/$V2/${V2Paths.Partitionings}", + Map("partitioning" -> encodedPartitioning) + ) + queryResult.map { result => result.map(_.data.id) } } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala b/reader/src/main/scala/za/co/absa/atum/reader/core/Reader.scala similarity index 92% rename from reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala rename to reader/src/main/scala/za/co/absa/atum/reader/core/Reader.scala index 325f8c6fe..3d1e6e4d8 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/core/Reader.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.atum.reader.basic +package za.co.absa.atum.reader.core import io.circe.Decoder import sttp.client3.{Identity, RequestT, ResponseException, SttpBackend, basicRequest} @@ -22,8 +22,9 @@ import sttp.client3.circe.asJson import sttp.model.Uri import sttp.monad.MonadError import sttp.monad.syntax._ +import za.co.absa.atum.reader.core.RequestResult._ import za.co.absa.atum.reader.server.ServerConfig -import za.co.absa.atum.reader.basic.RequestResult._ +import za.co.absa.atum.reader.exceptions.RequestException.CirceError /** * Reader is a base class for reading data from a remote server. diff --git a/reader/src/main/scala/za/co/absa/atum/reader/basic/RequestResult.scala b/reader/src/main/scala/za/co/absa/atum/reader/core/RequestResult.scala similarity index 61% rename from reader/src/main/scala/za/co/absa/atum/reader/basic/RequestResult.scala rename to reader/src/main/scala/za/co/absa/atum/reader/core/RequestResult.scala index 76e8cbfa9..c54995597 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/basic/RequestResult.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/core/RequestResult.scala @@ -14,25 +14,31 @@ * limitations under the License. */ -package za.co.absa.atum.reader.basic +package za.co.absa.atum.reader.core import sttp.client3.{DeserializationException, HttpError, Response, ResponseException} +import sttp.monad.MonadError import za.co.absa.atum.model.envelopes.ErrorResponse +import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException} +import za.co.absa.atum.reader.exceptions.RequestException object RequestResult { - type CirceError = io.circe.Error - type RequestResult[R] = Either[ResponseException[ErrorResponse, CirceError], R] + type RequestResult[R] = Either[RequestException, R] + + def RequestOK[T](value: T): RequestResult[T] = Right(value) + def RequestFail[T](error: RequestException): RequestResult[T] = Left(error) implicit class ResponseOps[R](val response: Response[Either[ResponseException[String, CirceError], R]]) extends AnyVal { def toRequestResult: RequestResult[R] = { response.body.left.map { case he: HttpError[String] => ErrorResponse.basedOnStatusCode(he.statusCode.code, he.body) match { - case Right(er) => HttpError(er, he.statusCode) - case Left(ce) => DeserializationException(he.body, ce) + case Right(er) => HttpException(he.getMessage, he.statusCode, er, response.request.uri) + case Left(ce) => ParsingException.fromCirceError(ce, he.body) } - case de: DeserializationException[CirceError] => de + case de: DeserializationException[CirceError] => ParsingException.fromCirceError(de.error, de.body) } } } + } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/exceptions/ReaderException.scala b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/ReaderException.scala new file mode 100644 index 000000000..61ae91e06 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/ReaderException.scala @@ -0,0 +1,19 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.exceptions + +class ReaderException(message: String) extends Exception(message) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala new file mode 100644 index 000000000..6aba5f3d1 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.exceptions + +import sttp.model.{StatusCode, Uri} +import za.co.absa.atum.model.envelopes.ErrorResponse + +sealed abstract class RequestException(message: String) extends ReaderException(message) + + +object RequestException { + type CirceError = io.circe.Error + + final case class HttpException( + message: String, + statusCode: StatusCode, + errorResponse: ErrorResponse, + request: Uri + ) extends RequestException(message) + + final case class ParsingException( + message: String, + body: String + ) extends RequestException(message) + + object ParsingException { + def fromCirceError(error: CirceError, body: String): ParsingException = { + ParsingException(error.getMessage, body) + } + } + +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/implicits/EitherImplicits.scala b/reader/src/main/scala/za/co/absa/atum/reader/implicits/EitherImplicits.scala new file mode 100644 index 000000000..9410eb22d --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/implicits/EitherImplicits.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.implicits + +import sttp.monad.MonadError + +object EitherImplicits { + + implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal { + def project[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match { + case Right(b) => f(b) + case Left(a) => implicitly[MonadError[F]].unit(Left(a)) + } + } + +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/requests/QueryParamNames.scala b/reader/src/main/scala/za/co/absa/atum/reader/requests/QueryParamNames.scala new file mode 100644 index 000000000..42bc2d8b5 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/requests/QueryParamNames.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.requests + +object QueryParamNames { + val limit = "limit" + val offset = "offset" + val checkpointName = "checkpoint-name" +} diff --git a/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala index 3a85f6124..8ae886996 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala @@ -17,25 +17,303 @@ package za.co.absa.atum.reader import org.scalatest.funsuite.AnyFunSuiteLike -import sttp.client3.SttpBackend +import sttp.capabilities +import sttp.client3.monad.IdMonad +import sttp.client3.{Identity, Response, SttpBackend} import sttp.client3.testing.SttpBackendStub -import za.co.absa.atum.model.types.basic.AtumPartitions +import sttp.model.Uri.QuerySegment.KeyValue +import sttp.monad.MonadError +import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue +import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, MeasureDTO, MeasureResultDTO, MeasurementDTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.envelopes.Pagination +import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse +import za.co.absa.atum.model.types.basic.{AtumPartitions, AtumPartitionsOps} +import za.co.absa.atum.reader.FlowReaderUnitTests._ import za.co.absa.atum.reader.server.ServerConfig -import za.co.absa.atum.reader.implicits.future.futureMonadError +import java.time.ZonedDateTime +import java.util.UUID import scala.concurrent.Future class FlowReaderUnitTests extends AnyFunSuiteLike { private implicit val serverConfig: ServerConfig = ServerConfig.fromConfig() + private implicit val monad: MonadError[Identity] = IdMonad test("mainFlowPartitioning is the same as partitioning") { val atumPartitions: AtumPartitions = AtumPartitions(List( "a" -> "b", "c" -> "d" )) - implicit val server: SttpBackend[Future, Any] = SttpBackendStub.asynchronousFuture + implicit val server: SttpBackend[Identity, Any] = SttpBackendStub.synchronous val result = new FlowReader(atumPartitions).mainFlowPartitioning assert(result == atumPartitions) } + + test("The flow checkpoints are properly queried and delivered as DTO") { + implicit val server: SttpBackendStub[Identity, capabilities.WebSockets] = SttpBackendStub.synchronous + .whenRequestMatchesPartial { + case r if r.uri.path.endsWith(List("partitionings")) => + assert(r.uri.querySegments.contains(KeyValue("partitioning", partitioningEncoded))) + Response.ok(partitioningResponse) + case r if r.uri.path.endsWith(List("partitionings", "7", "main-flow")) => + Response.ok(flowResponse) + case r if r.uri.path.endsWith(List("checkpoints")) => + assert(r.uri.querySegments.contains(KeyValue("offset", "0"))) + assert(r.uri.querySegments.contains(KeyValue("limit", "10"))) + Response.ok(checkpointsResponse) + } + + val atumPartitions: AtumPartitions = AtumPartitions(List( + "a" -> "b", + "c" -> "d" + )) + val expectedData: PaginatedResponse[CheckpointWithPartitioningDTO] = PaginatedResponse( + data = Seq( + CheckpointWithPartitioningDTO( + id = UUID.fromString("51ee4257-0842-4d28-8779-8ecb19ae7bf0"), + name = "Test checkpoints 1", + author = "Jason Bourne", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.parse("2024-12-30T16:01:36.5042011+01:00[Europe/Budapest]"), + processEndTime = Some(ZonedDateTime.parse("2024-12-30T16:01:36.5052109+01:00[Europe/Budapest]")), + measurements = Set( + MeasurementDTO( + measure = MeasureDTO( + measureName = "Fictional", + measuredColumns = Seq("x", "y", "z") + ), + result = MeasureResultDTO( + mainValue = TypedValue("1", ResultValueType.LongValue), + ) + ) + ), + partitioning = PartitioningWithIdDTO( + id = 7, + atumPartitions.toPartitioningDTO, + author = "James Bond" + ) + ), + CheckpointWithPartitioningDTO( + id = UUID.fromString("8b7f603e-3fc3-474f-aced-a7af054589a2"), + name = "Test checkpoints 2", + author = "John McClane", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.parse("2024-12-30T16:02:36.5042011+01:00[Europe/Budapest]"), + processEndTime = None, + measurements = Set(), + partitioning = PartitioningWithIdDTO( + id = 7, + atumPartitions.toPartitioningDTO, + author = "James Bond" + ) + ) + ), + pagination = Pagination( + limit = 10, + offset = 0, + hasMore = false + ), + requestId = UUID.fromString("29ce91a7-b668-41d2-a160-26402551fb0b") + ) + + val reader = new FlowReader(atumPartitions) + val result = reader.getCheckpointsPage() + assert(result == Right(expectedData)) + } + + test("The flow checkpoints are properly queried with name and delivered as DTO") { + implicit val server: SttpBackendStub[Identity, capabilities.WebSockets] = SttpBackendStub.synchronous + .whenRequestMatchesPartial { + case r if r.uri.path.endsWith(List("partitionings")) => + assert(r.uri.querySegments.contains(KeyValue("partitioning", partitioningEncoded))) + Response.ok(partitioningResponse) + case r if r.uri.path.endsWith(List("partitionings", "7", "main-flow")) => + Response.ok(flowResponse) + case r if r.uri.path.endsWith(List("checkpoints")) => + assert(r.uri.querySegments.contains(KeyValue("offset", "0"))) + assert(r.uri.querySegments.contains(KeyValue("limit", "10"))) + assert(r.uri.querySegments.contains(KeyValue("checkpoint-name", "Test checkpoints 1"))) + Response.ok(checkpointsResponse) + } + + val atumPartitions: AtumPartitions = AtumPartitions(List( + "a" -> "b", + "c" -> "d" + )) + val expectedData: PaginatedResponse[CheckpointWithPartitioningDTO] = PaginatedResponse( + data = Seq( + CheckpointWithPartitioningDTO( + id = UUID.fromString("51ee4257-0842-4d28-8779-8ecb19ae7bf0"), + name = "Test checkpoints 1", + author = "Jason Bourne", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.parse("2024-12-30T16:01:36.5042011+01:00[Europe/Budapest]"), + processEndTime = Some(ZonedDateTime.parse("2024-12-30T16:01:36.5052109+01:00[Europe/Budapest]")), + measurements = Set( + MeasurementDTO( + measure = MeasureDTO( + measureName = "Fictional", + measuredColumns = Seq("x", "y", "z") + ), + result = MeasureResultDTO( + mainValue = TypedValue("1", ResultValueType.LongValue), + ) + ) + ), + partitioning = PartitioningWithIdDTO( + id = 7, + atumPartitions.toPartitioningDTO, + author = "James Bond" + ) + ), + CheckpointWithPartitioningDTO( + id = UUID.fromString("8b7f603e-3fc3-474f-aced-a7af054589a2"), + name = "Test checkpoints 2", + author = "John McClane", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.parse("2024-12-30T16:02:36.5042011+01:00[Europe/Budapest]"), + processEndTime = None, + measurements = Set(), + partitioning = PartitioningWithIdDTO( + id = 7, + atumPartitions.toPartitioningDTO, + author = "James Bond" + ) + ) + ), + pagination = Pagination( + limit = 10, + offset = 0, + hasMore = false + ), + requestId = UUID.fromString("29ce91a7-b668-41d2-a160-26402551fb0b") + ) + + val reader = new FlowReader(atumPartitions) + val result = reader.getCheckpointsOfNamePage("Test checkpoints 1") + assert(result == Right(expectedData)) + } + +} + +object FlowReaderUnitTests { + + private val partitioningEncoded = "W3sia2V5IjoiYSIsInZhbHVlIjoiYiJ9LHsia2V5IjoiYyIsInZhbHVlIjoiZCJ9XQ==" + + private val partitioningResponse = + """ + |{ + | "data" : { + | "id" : 7, + | "partitioning" : [ + | { + | "key" : "a", + | "value" : "b" + | }, + | { + | "key" : "c", + | "value" : "d" + | } + | ], + | "author" : "James Bond" + | }, + | "requestId" : "a8463570-b61f-4c35-9362-4d550848767e" + |} + |""".stripMargin + + + private val flowResponse = + """ + |{ + | "data" : { + | "id" : 42, + | "name" : "Test flow", + | "description" : "This is a test flow", + | "fromPattern" : false + | }, + | "requestId" : "c1343c53-463e-4ac0-80f8-c597c2f1f895" + |} + |""".stripMargin + + private val checkpointsResponse = + """ + |{ + | "data" : [ + | { + | "id" : "51ee4257-0842-4d28-8779-8ecb19ae7bf0", + | "name" : "Test checkpoints 1", + | "author" : "Jason Bourne", + | "measuredByAtumAgent" : true, + | "processStartTime" : "2024-12-30T16:01:36.5042011+01:00[Europe/Budapest]", + | "processEndTime" : "2024-12-30T16:01:36.5052109+01:00[Europe/Budapest]", + | "measurements" : [ + | { + | "measure" : { + | "measureName" : "Fictional", + | "measuredColumns" : [ + | "x", + | "y", + | "z" + | ] + | }, + | "result" : { + | "mainValue" : { + | "value" : "1", + | "valueType" : "Long" + | }, + | "supportValues" : { + | + | } + | } + | } + | ], + | "partitioning" : { + | "id" : 7, + | "partitioning" : [ + | { + | "key" : "a", + | "value" : "b" + | }, + | { + | "key" : "c", + | "value" : "d" + | } + | ], + | "author" : "James Bond" + | } + | }, + | { + | "id" : "8b7f603e-3fc3-474f-aced-a7af054589a2", + | "name" : "Test checkpoints 2", + | "author" : "John McClane", + | "measuredByAtumAgent" : true, + | "processStartTime" : "2024-12-30T16:02:36.5042011+01:00[Europe/Budapest]", + | "measurements" : [ + | ], + | "partitioning" : { + | "id" : 7, + | "partitioning" : [ + | { + | "key" : "a", + | "value" : "b" + | }, + | { + | "key" : "c", + | "value" : "d" + | } + | ], + | "author" : "James Bond" + | } + | } + | ], + | "pagination" : { + | "limit" : 10, + | "offset" : 0, + | "hasMore" : false + | }, + | "requestId" : "29ce91a7-b668-41d2-a160-26402551fb0b" + |} + |""".stripMargin } diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/PartitioningIdProviderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/core/PartitioningIdProviderUnitTests.scala similarity index 89% rename from reader/src/test/scala/za/co/absa/atum/reader/basic/PartitioningIdProviderUnitTests.scala rename to reader/src/test/scala/za/co/absa/atum/reader/core/PartitioningIdProviderUnitTests.scala index 20cac1a02..22672eabf 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/basic/PartitioningIdProviderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/core/PartitioningIdProviderUnitTests.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.atum.reader.basic +package za.co.absa.atum.reader.core import org.scalatest.funsuite.AnyFunSuiteLike import sttp.capabilities @@ -28,7 +28,8 @@ import za.co.absa.atum.model.envelopes.NotFoundErrorResponse import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse import za.co.absa.atum.model.types.basic.{AtumPartitions, AtumPartitionsOps} import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax -import za.co.absa.atum.reader.basic.RequestResult._ +import za.co.absa.atum.reader.core.RequestResult._ +import za.co.absa.atum.reader.exceptions.RequestException.{HttpException, ParsingException} import za.co.absa.atum.reader.server.ServerConfig class PartitioningIdProviderUnitTests extends AnyFunSuiteLike { @@ -38,7 +39,7 @@ class PartitioningIdProviderUnitTests extends AnyFunSuiteLike { private val atumPartitionsToNotFound = AtumPartitions(List.empty) private implicit val serverConfig: ServerConfig = ServerConfig(serverUrl) - private implicit val monad: IdMonad.type = IdMonad + private implicit val monad: MonadError[Identity] = IdMonad private implicit val server: SttpBackendStub[Identity, capabilities.WebSockets] = SttpBackendStub.synchronous .whenRequestMatches(request => isUriOfAtumPartitions(request.uri, atumPartitionsToReply)) .thenRespond(SingleSuccessResponse(PartitioningWithIdDTO(1, atumPartitionsToReply.toPartitioningDTO, "Gimli")).asJsonString) @@ -76,9 +77,8 @@ class PartitioningIdProviderUnitTests extends AnyFunSuiteLike { val result = reader.partitioningId(atumPartitionsToNotFound) result match { case Right(_) => fail("Expected a failure, but OK response received") - case Left(_: DeserializationException[CirceError]) => fail("Expected a not found response, but deserialization error received") - case Left(x: HttpError[_]) => - assert(x.body.isInstanceOf[NotFoundErrorResponse]) + case Left(x: HttpException) => + assert(x.errorResponse.isInstanceOf[NotFoundErrorResponse]) assert(x.statusCode == StatusCode.NotFound) case _ => fail("Unexpected response") } @@ -88,6 +88,6 @@ class PartitioningIdProviderUnitTests extends AnyFunSuiteLike { val reader = ReaderWithPartitioningIdForTest(atumPartitionsToFailedDecode) val result = reader.partitioningId(atumPartitionsToFailedDecode) assert(result.isLeft) - result.swap.map(e => assert(e.isInstanceOf[DeserializationException[CirceError]])) + result.swap.map(e => assert(e.isInstanceOf[ParsingException])) } } diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_CatsIOUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/core/Reader_CatsIOUnitTests.scala similarity index 95% rename from reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_CatsIOUnitTests.scala rename to reader/src/test/scala/za/co/absa/atum/reader/core/Reader_CatsIOUnitTests.scala index 1aaad0901..b02cbcd21 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_CatsIOUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/core/Reader_CatsIOUnitTests.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.atum.reader.basic +package za.co.absa.atum.reader.core import cats.effect.unsafe.implicits.global import io.circe.Decoder @@ -24,7 +24,7 @@ import sttp.client3.testing.SttpBackendStub import sttp.monad.{MonadAsyncError, MonadError} import za.co.absa.atum.model.dto.PartitionDTO import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax -import za.co.absa.atum.reader.basic.RequestResult.RequestResult +import za.co.absa.atum.reader.core.RequestResult.RequestResult import za.co.absa.atum.reader.server.ServerConfig class Reader_CatsIOUnitTests extends AnyFunSuiteLike { diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_FutureUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/core/Reader_FutureUnitTests.scala similarity index 95% rename from reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_FutureUnitTests.scala rename to reader/src/test/scala/za/co/absa/atum/reader/core/Reader_FutureUnitTests.scala index c19c6411d..cf200137a 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_FutureUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/core/Reader_FutureUnitTests.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.atum.reader.basic +package za.co.absa.atum.reader.core import io.circe.Decoder import org.scalatest.funsuite.AnyFunSuiteLike @@ -23,7 +23,7 @@ import sttp.client3.testing.SttpBackendStub import sttp.monad.MonadError import za.co.absa.atum.model.dto.PartitionDTO import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax -import za.co.absa.atum.reader.basic.RequestResult.RequestResult +import za.co.absa.atum.reader.core.RequestResult.RequestResult import za.co.absa.atum.reader.server.ServerConfig import scala.concurrent.duration.Duration diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/RequestResultUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/core/RequestResultUnitTests.scala similarity index 71% rename from reader/src/test/scala/za/co/absa/atum/reader/basic/RequestResultUnitTests.scala rename to reader/src/test/scala/za/co/absa/atum/reader/core/RequestResultUnitTests.scala index d181154df..34f8c7933 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/basic/RequestResultUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/core/RequestResultUnitTests.scala @@ -14,16 +14,17 @@ * limitations under the License. */ -package za.co.absa.atum.reader.basic +package za.co.absa.atum.reader.core import io.circe.ParsingFailure import org.scalatest.funsuite.AnyFunSuiteLike import sttp.client3.{DeserializationException, HttpError, Response, ResponseException} -import sttp.model.StatusCode +import sttp.model.{StatusCode, Uri} import za.co.absa.atum.model.dto.PartitionDTO import za.co.absa.atum.model.envelopes.NotFoundErrorResponse import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax -import za.co.absa.atum.reader.basic.RequestResult._ +import za.co.absa.atum.reader.core.RequestResult._ +import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException} class RequestResultUnitTests extends AnyFunSuiteLike { test("Response.toRequestResult keeps the right value") { @@ -37,7 +38,8 @@ class RequestResultUnitTests extends AnyFunSuiteLike { assert(result == body) } - test("Response.toRequestResult keeps the left value if it's a CirceError") { + test("Response.toRequestResult keeps the left value if it's a CirceError with its message") { + val circeError: CirceError = ParsingFailure("Just a test error", new Exception) val deserializationException = DeserializationException("This is not a json", circeError) val body = Left(deserializationException) @@ -46,20 +48,30 @@ class RequestResultUnitTests extends AnyFunSuiteLike { StatusCode.Ok ) val result = source.toRequestResult - assert(result == body) + result match { + case Left(ParsingException(message, body)) => + assert(message == "Just a test error") + assert(body == "This is not a json") + case _ => fail("Unexpected result") + } } test("Response.toRequestResult decodes NotFound error") { - val error = NotFoundErrorResponse("This is a test") - val errorResponse = error.asJsonString - val httpError = HttpError(errorResponse, StatusCode.NotFound) + val sourceError = NotFoundErrorResponse("This is a test") + val sourceErrorResponse = sourceError.asJsonString + val httpError = HttpError(sourceErrorResponse, StatusCode.NotFound) val source: Response[Either[ResponseException[String, CirceError], PartitionDTO]] = Response( Left(httpError), StatusCode.Ok ) val result = source.toRequestResult - val expected: RequestResult[PartitionDTO] = Left(HttpError(error, httpError.statusCode)) - assert(result == expected) + result match { + case Left(HttpException(_, statusCode, errorResponse, request)) => + assert(statusCode == StatusCode.NotFound) + assert(errorResponse == sourceError) + assert(request == Uri("example.com")) + case _ => fail("Unexpected result") + } } test("Response.toRequestResult fails to decode InternalServerErrorResponse error") { @@ -74,8 +86,8 @@ class RequestResultUnitTests extends AnyFunSuiteLike { assert(result.isLeft) result.swap.foreach { e => // investigate the error - assert(e.isInstanceOf[DeserializationException[_]]) - val ce = e.asInstanceOf[DeserializationException[ParsingFailure]] + assert(e.isInstanceOf[ParsingException]) + val ce = e.asInstanceOf[ParsingException] assert(ce.body == responseBody) } } diff --git a/reader/src/test/scala/za/co/absa/atum/reader/implicits/EitherImplicitsUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/implicits/EitherImplicitsUnitTests.scala new file mode 100644 index 000000000..f27d99217 --- /dev/null +++ b/reader/src/test/scala/za/co/absa/atum/reader/implicits/EitherImplicitsUnitTests.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.implicits + +import org.scalatest.funsuite.AnyFunSuiteLike +import sttp.client3.Identity +import sttp.client3.monad.IdMonad +import sttp.monad.MonadError +import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements + +class EitherImplicitsUnitTests extends AnyFunSuiteLike { + private implicit val monad: MonadError[Identity] = IdMonad + + test("EitherMonadEnhancements should project Right") { + def fnc(b: Int): Identity[Either[String, String]] = Right(b.toString) + val either = Right(1) + val result = either.project(fnc) + assert(result == Right("1")) + } + + test("EitherMonadEnhancements should not project Left") { + def fnc(b: Int): Identity[Either[Exception, String]] = Right(b.toString) + val either = Left(new Exception("error")) + val result = either.project(fnc) + assert(result == either) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 066c28f3a..1d6da5b89 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -16,10 +16,10 @@ package za.co.absa.atum.server.api.controller +import za.co.absa.atum.model.ApiPaths import za.co.absa.atum.model.envelopes.{ConflictErrorResponse, ErrorInDataErrorResponse, ErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse, Pagination} import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ -import za.co.absa.atum.server.api.http.ApiPaths import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.model.envelopes.SuccessResponse._ import za.co.absa.atum.server.model._ diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index 0966805bf..fefab4cfd 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.model.envelopes.ErrorResponse -import za.co.absa.atum.server.api.http.ApiPaths.V2Paths +import za.co.absa.atum.model.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.CheckpointService import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import za.co.absa.atum.server.model.PaginatedResult diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 20071545e..604206028 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.envelopes.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse} import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.api.http.ApiPaths.V2Paths +import za.co.absa.atum.model.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.PartitioningService import za.co.absa.atum.model.envelopes.SuccessResponse._ import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonDeserializationSyntax diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala index f6c928264..b68c0ec31 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala @@ -24,7 +24,7 @@ import sttp.tapir.typelevel.MatchType import sttp.tapir.ztapir._ import sttp.tapir.{EndpointOutput, PublicEndpoint} import za.co.absa.atum.model.envelopes.{BadRequestResponse, ConflictErrorResponse, ErrorInDataErrorResponse, ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse} -import za.co.absa.atum.server.api.http.ApiPaths._ +import za.co.absa.atum.model.ApiPaths._ import java.util.UUID diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 8a138d4b2..007f2ca4e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -24,7 +24,7 @@ import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.envelopes.SuccessResponse._ import sttp.tapir.{PublicEndpoint, Validator, endpoint} import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse} -import za.co.absa.atum.server.api.http.ApiPaths._ +import za.co.absa.atum.model.ApiPaths._ import java.util.UUID diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 00ade985b..978633997 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -132,8 +132,8 @@ trait Routes extends Endpoints with ServerOptions { getPartitioningEndpointV2, // getPartitioningMeasuresEndpointV2, // getFlowPartitioningsEndpointV2, - // getPartitioningMainFlowEndpointV2, - // getFlowCheckpointsEndpointV2, + getPartitioningMainFlowEndpointV2, + getFlowCheckpointsEndpointV2, healthEndpoint ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None))