From fb9491bd9e0e0c101b9f8ab2a275eadf78b9ba38 Mon Sep 17 00:00:00 2001 From: NavidJalali Date: Wed, 23 Oct 2024 16:54:34 +0200 Subject: [PATCH 1/7] write trailers directly into headers on failure --- .../apache/pekko/grpc/internal/GrpcEntityHelpers.scala | 5 ++++- .../pekko/grpc/internal/GrpcResponseHelpers.scala | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala index 75a7da03..128b53f8 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala @@ -74,7 +74,10 @@ object GrpcEntityHelpers { TrailerFrame(trailers = statusHeaders(status)) def trailer(status: Status, metadata: Metadata): TrailerFrame = - TrailerFrame(trailers = statusHeaders(status) ++ metadataHeaders(metadata)) + TrailerFrame(trailers = trailers(status, metadata)) + + def trailers(status: Status, metadata: Metadata): List[HttpHeader] = + statusHeaders(status) ++ metadataHeaders(metadata) def statusHeaders(status: Status): List[HttpHeader] = List(headers.`Status`(status.getCode.value.toString)) ++ Option(status.getDescription).map(d => diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala index bb41a680..e686376d 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala @@ -107,6 +107,12 @@ object GrpcResponseHelpers { entity = HttpEntity.Chunked(writer.contentType, entity)) } - def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse = - response(Source.single(writer.encodeFrame(GrpcEntityHelpers.trailer(trailer.status, trailer.metadata)))) + def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse = { + HttpResponse( + headers = + headers.`Message-Encoding`(writer.messageEncoding.name) :: + GrpcEntityHelpers.trailers(trailer.status, trailer.metadata), + entity = HttpEntity.empty(writer.contentType) + ) + } } From b0170cfa23adcc65eb5a42857eee88fa86f2d808 Mon Sep 17 00:00:00 2001 From: NavidJalali Date: Sat, 26 Oct 2024 23:55:47 +0200 Subject: [PATCH 2/7] attempt fixing tests --- .../GrpcExceptionDefaultHandleSpec.scala | 16 ++++++++++------ project/PekkoHttpDependency.scala | 2 +- .../PekkoNettyGrpcClientGraphStage.scala | 1 + 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala index 2d84893f..b79d546a 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala @@ -17,7 +17,7 @@ import org.apache.pekko import pekko.actor.ActorSystem import pekko.grpc.internal.{ GrpcProtocolNative, GrpcRequestHelpers, Identity } import pekko.grpc.scaladsl.headers.`Status` -import pekko.http.scaladsl.model.{ AttributeKeys, HttpEntity, HttpRequest, HttpResponse } +import pekko.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse } import pekko.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk, Strict } import pekko.grpc.GrpcProtocol import pekko.stream.scaladsl.{ Sink, Source } @@ -58,7 +58,7 @@ class GrpcExceptionDefaultHandleSpec case Seq(LastChunk("", List(`Status`("3")))) => // ok } case _: Strict => - response.attribute(AttributeKeys.trailer).get.headers.contains("grpc-status" -> "3") + response.headers.find(_.is("grpc-status")).map(_.value()) shouldBe Some("3") case other => fail(s"Unexpected [$other]") } @@ -131,14 +131,18 @@ class GrpcExceptionDefaultHandleSpec val reply = GreeterServiceHandler(ExampleImpl).apply(request).futureValue - val lastChunk = reply.entity.asInstanceOf[Chunked].chunks.runWith(Sink.last).futureValue.asInstanceOf[LastChunk] + reply.entity shouldBe a[Strict] + val strict = reply.entity.asInstanceOf[Strict] + strict.contentType.mediaType.toString shouldBe "application/grpc+proto" + strict.data.isEmpty shouldBe true + // Invalid argument is '3' https://github.com/grpc/grpc/blob/master/doc/statuscodes.md - val statusHeader = lastChunk.trailer.find { _.name == "grpc-status" } + val statusHeader = reply.headers.find(_.is("grpc-status")) statusHeader.map(_.value()) should be(Some("3")) - val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" } + val statusMessageHeader = reply.headers.find(_.is("grpc-message")) statusMessageHeader.map(_.value()) should be(Some("No name found")) - val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer) + val metadata = MetadataBuilder.fromHeaders(reply.headers) metadata.getText("test-text") should be(Some("test-text-data")) metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data"))) } diff --git a/project/PekkoHttpDependency.scala b/project/PekkoHttpDependency.scala index f7941bb8..d89422ad 100644 --- a/project/PekkoHttpDependency.scala +++ b/project/PekkoHttpDependency.scala @@ -22,5 +22,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency object PekkoHttpDependency extends PekkoDependency { override val checkProject: String = "pekko-http-testkit" override val module: Option[String] = Some("http") - override val currentVersion: String = "1.1.0" + override val currentVersion: String = "1.1.0+13-750e8f96-SNAPSHOT" } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala index e129b5b3..aa75ead0 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala @@ -112,6 +112,7 @@ private final class PekkoNettyGrpcClientGraphStage[I, O]( override def onMessage(message: O): Unit = callback.invoke(message) override def onClose(status: Status, trailers: Metadata): Unit = { + if (!matVal.isCompleted) onHeaders(trailers) trailerPromise.success(trailers) callback.invoke(Closed(status, trailers)) } From cbba1df6b869eaf0eaa99941ff4c04010f4fc902 Mon Sep 17 00:00:00 2001 From: NavidJalali Date: Sun, 27 Oct 2024 12:54:57 +0100 Subject: [PATCH 3/7] do not add trailer if none exist --- .../pekko/grpc/interop/PekkoHttpServerProviderScala.scala | 8 +++----- project/PekkoHttpDependency.scala | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala index af3b3221..2dc30e01 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala @@ -118,11 +118,9 @@ object PekkoHttpServerProviderScala extends PekkoHttpServerProvider with Directi HttpEntity.LastChunk(last.extension, f(last.trailer)) })) case _ => - val origTrailers = response + response .attribute(AttributeKeys.trailer) - .map(_.headers) - .getOrElse(Vector.empty) - .map(e => RawHeader(e._1, e._2)) - response.addAttribute(AttributeKeys.trailer, Trailer(f(origTrailers))) + .map(trailer => Trailer(f(trailer.headers.map((RawHeader.apply _).tupled)))) + .fold(response)(response.addAttribute(AttributeKeys.trailer, _)) }) } diff --git a/project/PekkoHttpDependency.scala b/project/PekkoHttpDependency.scala index d89422ad..f7941bb8 100644 --- a/project/PekkoHttpDependency.scala +++ b/project/PekkoHttpDependency.scala @@ -22,5 +22,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency object PekkoHttpDependency extends PekkoDependency { override val checkProject: String = "pekko-http-testkit" override val module: Option[String] = Some("http") - override val currentVersion: String = "1.1.0+13-750e8f96-SNAPSHOT" + override val currentVersion: String = "1.1.0" } From 6f9e0caf0a5aecd56c33447c51db5ffdadb3a46a Mon Sep 17 00:00:00 2001 From: NavidJalali Date: Sun, 27 Oct 2024 12:55:17 +0100 Subject: [PATCH 4/7] remove useless generic type --- .../org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala index e686376d..75cb3e2b 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala @@ -101,7 +101,7 @@ object GrpcResponseHelpers { response(GrpcEntityHelpers(e, trail, eHandler)) } - private def response[T](entity: Source[ChunkStreamPart, NotUsed])(implicit writer: GrpcProtocolWriter) = { + private def response(entity: Source[ChunkStreamPart, NotUsed])(implicit writer: GrpcProtocolWriter) = { HttpResponse( headers = immutable.Seq(headers.`Message-Encoding`(writer.messageEncoding.name)), entity = HttpEntity.Chunked(writer.contentType, entity)) From fdf4496458f702ac97f06b288b889d3d06d10b81 Mon Sep 17 00:00:00 2001 From: NavidJalali Date: Sun, 27 Oct 2024 13:53:17 +0100 Subject: [PATCH 5/7] apply fix to copy-pasted method --- .../grpc/interop/PekkoHttpServerProviderScala.scala | 3 ++- .../scala/org/apache/pekko/grpc/GrpcInteropSpec.scala | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala index 2dc30e01..367bb43d 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala @@ -120,7 +120,8 @@ object PekkoHttpServerProviderScala extends PekkoHttpServerProvider with Directi case _ => response .attribute(AttributeKeys.trailer) - .map(trailer => Trailer(f(trailer.headers.map((RawHeader.apply _).tupled)))) + .map(trailer => f(trailer.headers.map((RawHeader.apply _).tupled))) + .flatMap(headers => if (headers.isEmpty) None else Some(Trailer(headers))) .fold(response)(response.addAttribute(AttributeKeys.trailer, _)) }) } diff --git a/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala b/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala index f2052823..d7686830 100644 --- a/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala +++ b/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala @@ -74,12 +74,11 @@ object PekkoHttpServerProviderScala extends PekkoHttpServerProvider { HttpEntity.LastChunk(last.extension, f(last.trailer)) })) case _ => - val origTrailers = response + response .attribute(AttributeKeys.trailer) - .map(_.headers) - .getOrElse(Vector.empty) - .map(e => RawHeader(e._1, e._2)) - response.addAttribute(AttributeKeys.trailer, Trailer(f(origTrailers))) + .map(trailer => f(trailer.headers.map((RawHeader.apply _).tupled))) + .flatMap(headers => if (headers.isEmpty) None else Some(Trailer(headers))) + .fold(response)(response.addAttribute(AttributeKeys.trailer, _)) }) } From b791d66abab4778d7cb178d4dc38e4dee3cf86e6 Mon Sep 17 00:00:00 2001 From: NavidJalali Date: Mon, 28 Oct 2024 12:31:59 +0100 Subject: [PATCH 6/7] complete headers with empty metadata --- .../PekkoNettyGrpcClientGraphStage.scala | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala index aa75ead0..5d0e37f3 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala @@ -16,13 +16,14 @@ package org.apache.pekko.grpc.internal import org.apache.pekko import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts -import pekko.grpc.GrpcResponseMetadata +import pekko.grpc.{ javadsl, scaladsl, GrpcResponseMetadata } import pekko.stream import pekko.stream.{ Attributes => _, _ } import pekko.stream.stage._ import pekko.util.FutureConverters._ import io.grpc._ +import java.util.concurrent.CompletionStage import scala.concurrent.{ Future, Promise } @InternalApi @@ -92,27 +93,29 @@ private final class PekkoNettyGrpcClientGraphStage[I, O]( var call: ClientCall[I, O] = null val listener = new ClientCall.Listener[O] { - override def onReady(): Unit = - callback.invoke(ReadyForSending) - override def onHeaders(responseHeaders: Metadata): Unit = - matVal.success(new GrpcResponseMetadata { - private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(responseHeaders) - private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(responseHeaders) - def headers = sMetadata - def getHeaders() = jMetadata + private def makeResponseMetadata(metadata: Metadata) = + new GrpcResponseMetadata { + private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadata) + private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(metadata) + def headers: scaladsl.Metadata = sMetadata + def getHeaders(): javadsl.Metadata = jMetadata private lazy val sTrailers = trailerPromise.future.map(MetadataImpl.scalaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic) private lazy val jTrailers = trailerPromise.future .map(MetadataImpl.javaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic) .asJava - def trailers = sTrailers - def getTrailers() = jTrailers - }) + def trailers: Future[scaladsl.Metadata] = sTrailers + def getTrailers(): CompletionStage[javadsl.Metadata] = jTrailers + } + override def onReady(): Unit = + callback.invoke(ReadyForSending) + override def onHeaders(responseHeaders: Metadata): Unit = + matVal.success(makeResponseMetadata(responseHeaders)) override def onMessage(message: O): Unit = callback.invoke(message) override def onClose(status: Status, trailers: Metadata): Unit = { - if (!matVal.isCompleted) onHeaders(trailers) + matVal.trySuccess(makeResponseMetadata(new Metadata())) trailerPromise.success(trailers) callback.invoke(Closed(status, trailers)) } From 8e24da86eca9e2af2fb846c9c690548253f57998 Mon Sep 17 00:00:00 2001 From: NavidJalali Date: Mon, 28 Oct 2024 17:16:42 +0100 Subject: [PATCH 7/7] match both the strict case and the chunked case --- .../GrpcExceptionDefaultHandleSpec.scala | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala index b79d546a..26ceff02 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala @@ -17,7 +17,7 @@ import org.apache.pekko import pekko.actor.ActorSystem import pekko.grpc.internal.{ GrpcProtocolNative, GrpcRequestHelpers, Identity } import pekko.grpc.scaladsl.headers.`Status` -import pekko.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse } +import pekko.http.scaladsl.model._ import pekko.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk, Strict } import pekko.grpc.GrpcProtocol import pekko.stream.scaladsl.{ Sink, Source } @@ -131,18 +131,33 @@ class GrpcExceptionDefaultHandleSpec val reply = GreeterServiceHandler(ExampleImpl).apply(request).futureValue - reply.entity shouldBe a[Strict] - val strict = reply.entity.asInstanceOf[Strict] - strict.contentType.mediaType.toString shouldBe "application/grpc+proto" - strict.data.isEmpty shouldBe true + val trailer = reply.entity match { + case chunked: Chunked => + val lastChunkWide = chunked.chunks.runWith(Sink.last).futureValue + lastChunkWide shouldBe a[LastChunk] + val lastChunk = lastChunkWide.asInstanceOf[LastChunk] + val trailer = lastChunk.trailer - // Invalid argument is '3' https://github.com/grpc/grpc/blob/master/doc/statuscodes.md - val statusHeader = reply.headers.find(_.is("grpc-status")) - statusHeader.map(_.value()) should be(Some("3")) - val statusMessageHeader = reply.headers.find(_.is("grpc-message")) - statusMessageHeader.map(_.value()) should be(Some("No name found")) + val trailerAttribute = reply.attribute(AttributeKeys.trailer).map(_.headers) + trailerAttribute.isDefined shouldBe true + trailerAttribute.get should contain theSameElementsAs trailer.map { h => (h.name(), h.value()) } + + trailer + + case strict: Strict => + strict.contentType.mediaType.toString shouldBe "application/grpc+proto" + strict.data.isEmpty shouldBe true + reply.attribute(AttributeKeys.trailer) shouldBe None + + reply.headers + + case _ => fail(s"Unexpected entity [$reply]. Should be one of [Chunked, Strict]") + } + + trailer.collect { case header if header.is("grpc-status") => header.value() } shouldBe Seq("3") + trailer.collect { case header if header.is("grpc-message") => header.value() } shouldBe Seq("No name found") - val metadata = MetadataBuilder.fromHeaders(reply.headers) + val metadata = MetadataBuilder.fromHeaders(trailer) metadata.getText("test-text") should be(Some("test-text-data")) metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data"))) }