Skip to content

Commit

Permalink
perf: Trailers only support (#1973)
Browse files Browse the repository at this point in the history
An optimisation exists in the gRPC spec that allows responses that immediately
return an error to return it as a "Trailers-Only" response. This is where the
response has no body, rather, the `grpc-status`, `grpc-message` and
`grpc-status-details-bin` trailers are placed in the response headers frame.

When adding support for this, I found a bug in the Akka gRPC client where
trailers only responses were not correctly handled in streamed requests
(the error returned to the client code was an
`AbruptStageTerminationException`).
  • Loading branch information
jroper authored Sep 6, 2024
1 parent 8f09d84 commit ebf4411
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package akka.grpc.scaladsl

import akka.actor.ActorSystem
import akka.grpc.internal.{ GrpcProtocolNative, GrpcRequestHelpers, Identity }
import akka.grpc.scaladsl.headers.`Status`
import akka.http.scaladsl.model.{ AttributeKeys, HttpEntity, HttpRequest, HttpResponse }
import akka.http.scaladsl.model.{ AttributeKeys, HttpEntity, HttpHeader, HttpRequest, HttpResponse }
import akka.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk, Strict }
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.scaladsl.{ Sink, Source }
import akka.testkit.TestKit
import akka.util.ByteString
Expand Down Expand Up @@ -40,16 +40,7 @@ class GrpcExceptionHandlerSpec
.recoverWith(GrpcExceptionHandler.default)

val response = result.futureValue
response.entity match {
case Chunked(_, chunks) =>
chunks.runWith(Sink.seq).futureValue match {
case Seq(LastChunk("", List(`Status`("3")))) => // ok
}
case _: Strict =>
response.attribute(AttributeKeys.trailer).get.headers.contains("grpc-status" -> "3")
case other =>
fail(s"Unexpected [$other]")
}
trailersOnly(response).find(_.name() == "grpc-status").map(_.value()) shouldBe Some("3")
}

import example.myapp.helloworld.grpc.helloworld._
Expand Down Expand Up @@ -115,15 +106,12 @@ class GrpcExceptionHandlerSpec
val request = GrpcRequestHelpers(s"/${GreeterService.name}/SayHello", List.empty, Source.single(HelloRequest("")))

val reply = GreeterServiceHandler(ExampleImpl).apply(request).futureValue

val lastChunk = reply.entity.asInstanceOf[Chunked].chunks.runWith(Sink.last).futureValue.asInstanceOf[LastChunk]
val trailers = trailersOnly(reply)
// Invalid argument is '3' https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
val statusHeader = lastChunk.trailer.find { _.name == "grpc-status" }
statusHeader.map(_.value()) should be(Some("3"))
val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" }
statusMessageHeader.map(_.value()) should be(Some("No name found"))
trailers.find(_.name() == "grpc-status").map(_.value()) shouldBe Some("3")
trailers.find(_.name() == "grpc-message").map(_.value()) shouldBe Some("No name found")

val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer)
val metadata = MetadataBuilder.fromHeaders(trailers)
metadata.getText("test-text") should be(Some("test-text-data"))
metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data")))
}
Expand All @@ -150,4 +138,24 @@ class GrpcExceptionHandlerSpec
metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data")))
}
}

private def trailersOnly(response: HttpResponse): Seq[HttpHeader] = {
if (response.header("grpc-status").isDefined) {
response.headers
} else {
response.entity match {
case Chunked(_, chunks) =>
chunks.runWith(Sink.seq).futureValue match {
case Seq(LastChunk("", trailers)) =>
trailers
}
case _: Strict =>
response.attribute(AttributeKeys.trailer).get.headers.map { case (key, value) => RawHeader(key, value) }
case other =>
fail(s"Unexpected [$other]")
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ private final class AkkaNettyGrpcClientGraphStage[I, O](
override def onMessage(message: O): Unit =
callback.invoke(message)
override def onClose(status: Status, trailers: Metadata): Unit = {
if (!matVal.isCompleted) {
// Trailers only response, first invoke onHeaders to setup the materialized value
onHeaders(trailers)
}
trailerPromise.success(trailers)
callback.invoke(Closed(status, trailers))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.grpc.GrpcProtocol.{ GrpcProtocolWriter, TrailerFrame }
import akka.grpc.scaladsl.{ headers, GrpcExceptionHandler }
import akka.grpc.{ ProtobufSerializer, Trailers }
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model.{ HttpEntity, HttpResponse, Trailer }
import akka.http.scaladsl.model.{ HttpEntity, HttpResponse, StatusCodes, Trailer }
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
Expand Down Expand Up @@ -51,11 +51,7 @@ object GrpcResponseHelpers {
try writer.encodeDataToResponse(m.serialize(e), responseHeaders, TrailerOkAttribute)
catch {
case NonFatal(ex) =>
val trailers = GrpcEntityHelpers.handleException(ex, eHandler)
writer.encodeDataToResponse(
ByteString.empty,
responseHeaders,
Trailer(GrpcEntityHelpers.trailer(trailers.status, trailers.metadata).trailers))
status(GrpcEntityHelpers.handleException(ex, eHandler))
}
}

Expand Down Expand Up @@ -97,6 +93,12 @@ object GrpcResponseHelpers {
entity = HttpEntity.Chunked(writer.contentType, entity))
}

def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse =
response(Source.single(writer.encodeFrame(GrpcEntityHelpers.trailer(trailer.status, trailer.metadata))))
def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse = {
// This is the Trailers-Only optimisation (for sending immediate errors), which, for gRPC web and gRPC over HTTP2,
// are identical, it's just the 200 status code, content type, and then the trailers as headers.
HttpResponse(
status = StatusCodes.OK,
headers = GrpcEntityHelpers.trailer(trailer.status, trailer.metadata).trailers,
entity = HttpEntity(writer.contentType, ByteString.empty))
}
}

0 comments on commit ebf4411

Please sign in to comment.