From 81fcf01e7b8d19570cacfd6d99271b2aa415c01d Mon Sep 17 00:00:00 2001 From: Richard Bair Date: Fri, 15 Nov 2024 11:43:40 -0800 Subject: [PATCH 1/2] Create deadlineFuture before checking the content type. Signed-off-by: Richard Bair --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 35 +-- .../pbj/grpc/helidon/PbjProtocolSelector.java | 2 +- .../pbj/grpc/helidon/GreeterServiceImpl.java | 120 +++++++++ .../grpc/helidon/PbjProtocolHandlerTest.java | 244 ++++++++++++++++++ .../com/hedera/pbj/grpc/helidon/PbjTest.java | 95 ------- 5 files changed, 383 insertions(+), 113 deletions(-) create mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java create mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 28f8b8a4..d5f56021 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -42,9 +42,9 @@ import io.helidon.http.HeaderValues; import io.helidon.http.HttpException; import io.helidon.http.HttpMediaType; -import io.helidon.http.HttpMediaTypes; import io.helidon.http.Status; import io.helidon.http.WritableHeaders; +import io.helidon.http.http2.FlowControl; import io.helidon.http.http2.Http2Flag; import io.helidon.http.http2.Http2FrameData; import io.helidon.http.http2.Http2FrameHeader; @@ -92,7 +92,7 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa private final Http2Headers headers; private final Http2StreamWriter streamWriter; private final int streamId; - private final StreamFlowControl flowControl; + private final FlowControl.Outbound flowControl; private final AtomicReference currentStreamState; /** @@ -176,7 +176,7 @@ enum ReadState { @NonNull final Http2Headers headers, @NonNull final Http2StreamWriter streamWriter, final int streamId, - @NonNull final StreamFlowControl flowControl, + @NonNull final FlowControl.Outbound flowControl, @NonNull final Http2StreamState currentStreamState, @NonNull final PbjConfig config, @NonNull final PbjMethodRoute route, @@ -201,16 +201,22 @@ public void init() { route.requestCounter().increment(); try { + // If the grpc-timeout header is present, determine when that timeout would occur, or + // default to a future that is so far in the future it will never happen. + final var requestHeaders = headers.httpHeaders(); + final var timeout = requestHeaders.value(GRPC_TIMEOUT); + deadlineFuture = + timeout.map(this::scheduleDeadline).orElse(new NoopScheduledFuture<>()); + // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2 // clients from interpreting a gRPC error response, which uses status 200 (OK), as // successful. // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md // In addition, "application/grpc" is interpreted as "application/grpc+proto". - final var requestHeaders = headers.httpHeaders(); final var requestContentType = - requestHeaders.contentType().orElse(HttpMediaTypes.PLAINTEXT_UTF_8); - final var ct = requestContentType.text(); + requestHeaders.contentType().orElse(null); + final var ct = requestContentType == null ? "" : requestContentType.text(); final var contentType = switch (ct) { case APPLICATION_GRPC, APPLICATION_GRPC_PROTO -> APPLICATION_GRPC_PROTO; @@ -220,7 +226,8 @@ public void init() { yield ct; } throw new HttpException( - "Unsupported", Status.UNSUPPORTED_MEDIA_TYPE_415); + "invalid gRPC request content-type \"" + ct + "\"", + Status.UNSUPPORTED_MEDIA_TYPE_415); } }; @@ -259,13 +266,6 @@ public void init() { // then we should pick one and use it in the response. Otherwise, we should not have // any compression. - // If the grpc-timeout header is present, determine when that timeout would occur, or - // default to a future that is so far in the future it will never happen. - final var timeout = requestHeaders.value(GRPC_TIMEOUT); - - deadlineFuture = - timeout.map(this::scheduleDeadline).orElse(new NoopScheduledFuture<>()); - // At this point, the request itself is valid. Maybe it will still fail to be handled by // the service interface, but as far as the protocol is concerned, this was a valid // request. Send the headers back to the client (the messages and trailers will be sent @@ -306,6 +306,7 @@ public void init() { new TrailerOnlyBuilder() .httpStatus(httpException.status()) .grpcStatus(GrpcStatus.INVALID_ARGUMENT) + .statusMessage(httpException.getMessage()) .send(); error(); } catch (final Exception unknown) { @@ -566,7 +567,7 @@ private void sendResponseHeaders( http2Headers, streamId, Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), - flowControl.outbound()); + flowControl); } /** @@ -630,7 +631,7 @@ protected void send( streamId, Http2Flag.HeaderFlags.create( Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), - flowControl.outbound()); + flowControl); } } @@ -701,7 +702,7 @@ public void onNext(@NonNull final Bytes response) { streamId); streamWriter.writeData( - new Http2FrameData(header, bufferData), flowControl.outbound()); + new Http2FrameData(header, bufferData), flowControl); } catch (final Exception e) { LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e); } diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java index bc076381..7d5ba3ed 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java @@ -140,7 +140,7 @@ public SubProtocolResult subProtocol( headers, streamWriter, streamId, - flowControl, + flowControl.outbound(), currentStreamState, config, route, diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java new file mode 100644 index 00000000..43470057 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.pbj.grpc.helidon; + +import com.hedera.pbj.runtime.grpc.GrpcException; +import com.hedera.pbj.runtime.grpc.GrpcStatus; +import com.hedera.pbj.runtime.grpc.Pipeline; +import greeter.HelloReply; +import greeter.HelloRequest; +import java.util.ArrayList; +import java.util.concurrent.Flow; + +final class GreeterServiceImpl implements GreeterService { + GrpcStatus errorToThrow = null; + + @Override + public HelloReply sayHello(HelloRequest request) { + if (errorToThrow != null) { + throw new GrpcException(errorToThrow); + } + + return HelloReply.newBuilder().setMessage("Hello " + request.getName()).build(); + } + + // Streams of stuff coming from the client, with a single response. + @Override + public Pipeline sayHelloStreamRequest( + Flow.Subscriber replies) { + final var names = new ArrayList(); + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + onComplete(); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(Long.MAX_VALUE); // turn off flow control + } + + @Override + public void onNext(HelloRequest item) { + names.add(item.getName()); + } + + @Override + public void onError(Throwable throwable) { + replies.onError(throwable); + } + + @Override + public void onComplete() { + final var reply = + HelloReply.newBuilder() + .setMessage("Hello " + String.join(", ", names)) + .build(); + replies.onNext(reply); + replies.onComplete(); + } + }; + } + + @Override + public void sayHelloStreamReply( + HelloRequest request, Flow.Subscriber replies) { + for (int i = 0; i < 10; i++) { + replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build()); + } + + replies.onComplete(); + } + + @Override + public Pipeline sayHelloStreamBidi( + Flow.Subscriber replies) { + // Here we receive info from the client. In this case, it is a stream of requests with + // names. We will respond with a stream of replies. + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + onComplete(); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(Long.MAX_VALUE); // turn off flow control + } + + @Override + public void onNext(HelloRequest item) { + replies.onNext( + HelloReply.newBuilder().setMessage("Hello " + item.getName()).build()); + } + + @Override + public void onError(Throwable throwable) { + replies.onError(throwable); + } + + @Override + public void onComplete() { + replies.onComplete(); + } + }; + } +} \ No newline at end of file diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java new file mode 100644 index 00000000..77fa8b69 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java @@ -0,0 +1,244 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.pbj.grpc.helidon; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import com.hedera.pbj.grpc.helidon.config.PbjConfig; +import com.hedera.pbj.runtime.grpc.GrpcStatus; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.helidon.common.uri.UriEncoding; +import io.helidon.http.Header; +import io.helidon.http.HeaderNames; +import io.helidon.http.Status; +import io.helidon.http.WritableHeaders; +import io.helidon.http.http2.FlowControl; +import io.helidon.http.http2.Http2Flag; +import io.helidon.http.http2.Http2FrameData; +import io.helidon.http.http2.Http2Headers; +import io.helidon.http.http2.Http2StreamState; +import io.helidon.http.http2.Http2StreamWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class PbjProtocolHandlerTest { + private Http2Headers headers; + private StreamWriterStub streamWriter; + private int streamId; + private OutboundFlowControlStub flowControl; + private Http2StreamState currentStreamState; + private PbjConfigStub config; + private PbjMethodRoute route; + private DeadlineDetectorStub deadlineDetector; + + @BeforeAll + static void beforeAll() { + } + + @BeforeEach + void setUp() { + headers = Http2Headers.create(WritableHeaders.create()); + streamWriter = new StreamWriterStub(); + streamId = 1; + flowControl = new OutboundFlowControlStub(); + currentStreamState = Http2StreamState.OPEN; + config = new PbjConfigStub(); + route = new PbjMethodRoute(new GreeterServiceImpl(), GreeterService.GreeterMethod.sayHello); + deadlineDetector = new DeadlineDetectorStub(); + + // FUTURE: We need an easy way to test counters. When we have them, we can test this. +// assumeThat(route.requestCounter().count()).isEqualTo(0); +// assumeThat(route.failedGrpcRequestCounter().count()).isEqualTo(0); +// assumeThat(route.failedHttpRequestCounter().count()).isEqualTo(0); +// assumeThat(route.failedUnknownRequestCounter().count()).isEqualTo(0); + } + + /** + * If the content-type is missing, or does not start with "application/grpc", the server should respond with a 415 + * Unsupported Media Type and the stream state should end up CLOSED. See + * + */ + @ValueSource(strings = {"", "text/plain", "application/json"}) + @ParameterizedTest + void unsupportedContentType(String contentType) { + final var h = WritableHeaders.create(); + if (!contentType.isBlank()) h.add(HeaderNames.CONTENT_TYPE, contentType); + headers = Http2Headers.create(h); + final var handler = new PbjProtocolHandler(headers, streamWriter, streamId, flowControl, currentStreamState, config, route, deadlineDetector); + handler.init(); + // Even though the request failed, it was made, and should be counted +// assertThat(route.requestCounter().count()).isEqualTo(1); + // And since it failed the failed counter should be incremented +// assertThat(route.failedGrpcRequestCounter().count()).isEqualTo(0); +// assertThat(route.failedHttpRequestCounter().count()).isEqualTo(1); +// assertThat(route.failedUnknownRequestCounter().count()).isEqualTo(0); + + // Check the HTTP2 response header frame was error 415 + assertThat(streamWriter.writtenHeaders).hasSize(1); + assertThat(streamWriter.writtenDataFrames).isEmpty(); + final var responseHeaderFrame = streamWriter.writtenHeaders.getFirst(); + assertThat(responseHeaderFrame.status()).isEqualTo(Status.UNSUPPORTED_MEDIA_TYPE_415); + + // I verified with the go GRPC server its behavior in this scenario. The following headers should be + // available in the response + // Content-Type: application/grpc + // Grpc-Message: invalid gRPC request content-type "" + // Grpc-Status: 3 + final var responseHeaders = responseHeaderFrame.httpHeaders().stream() + .collect(Collectors.toMap(Header::name, Header::values)); + assertThat(responseHeaders).contains( + entry("grpc-status", "" + GrpcStatus.INVALID_ARGUMENT.ordinal()), + entry("grpc-message", UriEncoding.encodeUri("invalid gRPC request content-type \"" + contentType + "\"")), + entry("Content-Type", "application/grpc"), + entry("grpc-accept-encoding", "identity")); + + // The stream should be closed + assertThat(handler.streamState()).isEqualTo(Http2StreamState.CLOSED); + } + + private static final class OutboundFlowControlStub implements FlowControl.Outbound { + + @Override + public long incrementStreamWindowSize(int increment) { + return 0; + } + + @Override + public Http2FrameData[] cut(Http2FrameData frame) { + return new Http2FrameData[0]; + } + + @Override + public void blockTillUpdate() { + + } + + @Override + public int maxFrameSize() { + return 0; + } + + @Override + public void decrementWindowSize(int decrement) { + + } + + @Override + public void resetStreamWindowSize(int size) { + + } + + @Override + public int getRemainingWindowSize() { + return 0; + } + } + + private static final class StreamWriterStub implements Http2StreamWriter { + private final List writtenDataFrames = new ArrayList<>(); + private final List writtenHeaders = new ArrayList<>(); + + + @Override + public void write(Http2FrameData frame) { + writtenDataFrames.add(frame); + } + + @Override + public void writeData(Http2FrameData frame, FlowControl.Outbound flowControl) { + writtenDataFrames.add(frame); + } + + @Override + public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlags flags, FlowControl.Outbound flowControl) { + writtenHeaders.add(headers); + return 0; + } + + @Override + public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlags flags, Http2FrameData dataFrame, FlowControl.Outbound flowControl) { + writtenHeaders.add(headers); + writtenDataFrames.add(dataFrame); + return 0; + } + } + + private static final class PbjConfigStub implements PbjConfig { + + @Override + public int maxMessageSizeBytes() { + return 0; + } + + @Override + public String name() { + return ""; + } + } + + private static final class DeadlineDetectorStub implements DeadlineDetector { + @NonNull + @Override + public ScheduledFuture scheduleDeadline(long deadlineNanos, @NonNull Runnable onDeadlineExceeded) { + return new ScheduledFuture<>() { + @Override + public long getDelay(@NonNull TimeUnit unit) { + return 0; + } + + @Override + public int compareTo(@NonNull Delayed o) { + return 0; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Object get() { + return null; + } + + @Override + public Object get(long timeout, @NonNull TimeUnit unit) { + return null; + } + }; + } + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java index 95705827..b06a4c1a 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java @@ -791,101 +791,6 @@ private byte[] messageBytesJson(HelloRequest req) { } } - private static final class GreeterServiceImpl implements GreeterService { - GrpcStatus errorToThrow = null; - - @Override - public HelloReply sayHello(HelloRequest request) { - if (errorToThrow != null) { - throw new GrpcException(errorToThrow); - } - - return HelloReply.newBuilder().setMessage("Hello " + request.getName()).build(); - } - - // Streams of stuff coming from the client, with a single response. - @Override - public Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { - final var names = new ArrayList(); - return new Pipeline<>() { - @Override - public void clientEndStreamReceived() { - onComplete(); - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(Long.MAX_VALUE); // turn off flow control - } - - @Override - public void onNext(HelloRequest item) { - names.add(item.getName()); - } - - @Override - public void onError(Throwable throwable) { - replies.onError(throwable); - } - - @Override - public void onComplete() { - final var reply = - HelloReply.newBuilder() - .setMessage("Hello " + String.join(", ", names)) - .build(); - replies.onNext(reply); - replies.onComplete(); - } - }; - } - - @Override - public void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) { - for (int i = 0; i < 10; i++) { - replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build()); - } - - replies.onComplete(); - } - - @Override - public Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { - // Here we receive info from the client. In this case, it is a stream of requests with - // names. We will respond with a stream of replies. - return new Pipeline<>() { - @Override - public void clientEndStreamReceived() { - onComplete(); - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(Long.MAX_VALUE); // turn off flow control - } - - @Override - public void onNext(HelloRequest item) { - replies.onNext( - HelloReply.newBuilder().setMessage("Hello " + item.getName()).build()); - } - - @Override - public void onError(Throwable throwable) { - replies.onError(throwable); - } - - @Override - public void onComplete() { - replies.onComplete(); - } - }; - } - } - private interface GreeterAdapter extends GreeterService { @Override default HelloReply sayHello(HelloRequest request) { From c521bc0cebdb421d3607d19ee0cc563293e18fe2 Mon Sep 17 00:00:00 2001 From: Richard Bair Date: Fri, 15 Nov 2024 11:48:24 -0800 Subject: [PATCH 2/2] A slightly better approach Signed-off-by: Richard Bair --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index d5f56021..b0717228 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -54,7 +54,6 @@ import io.helidon.http.http2.Http2StreamState; import io.helidon.http.http2.Http2StreamWriter; import io.helidon.http.http2.Http2WindowUpdate; -import io.helidon.http.http2.StreamFlowControl; import io.helidon.webserver.http2.spi.Http2SubProtocolSelector; import java.util.List; import java.util.Objects; @@ -62,6 +61,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Delayed; import java.util.concurrent.Flow; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -116,11 +116,11 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa * future. * *

This member isn't final because it is set in the {@link #init()} method. It should not be - * set at any other time. + * set at any other time, although it is initialized to avoid any possible NPE. * *

Method calls on this object are thread-safe. */ - private ScheduledFuture deadlineFuture; + private Future deadlineFuture = CompletableFuture.completedFuture(null); /** * The bytes of the next incoming message. This is created dynamically as a message is received, @@ -201,19 +201,13 @@ public void init() { route.requestCounter().increment(); try { - // If the grpc-timeout header is present, determine when that timeout would occur, or - // default to a future that is so far in the future it will never happen. - final var requestHeaders = headers.httpHeaders(); - final var timeout = requestHeaders.value(GRPC_TIMEOUT); - deadlineFuture = - timeout.map(this::scheduleDeadline).orElse(new NoopScheduledFuture<>()); - // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2 // clients from interpreting a gRPC error response, which uses status 200 (OK), as // successful. // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md // In addition, "application/grpc" is interpreted as "application/grpc+proto". + final var requestHeaders = headers.httpHeaders(); final var requestContentType = requestHeaders.contentType().orElse(null); final var ct = requestContentType == null ? "" : requestContentType.text(); @@ -266,6 +260,12 @@ public void init() { // then we should pick one and use it in the response. Otherwise, we should not have // any compression. + // If the grpc-timeout header is present, determine when that timeout would occur, or + // default to a future that is so far in the future it will never happen. + final var timeout = requestHeaders.value(GRPC_TIMEOUT); + deadlineFuture = + timeout.map(this::scheduleDeadline).orElse(new NoopScheduledFuture<>()); + // At this point, the request itself is valid. Maybe it will still fail to be handled by // the service interface, but as far as the protocol is concerned, this was a valid // request. Send the headers back to the client (the messages and trailers will be sent