Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

00321 incorrect handling of bad content type #322

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import io.helidon.http.HeaderValues;
import io.helidon.http.HttpException;
import io.helidon.http.HttpMediaType;
import io.helidon.http.HttpMediaTypes;
import io.helidon.http.Status;
import io.helidon.http.WritableHeaders;
import io.helidon.http.http2.FlowControl;
import io.helidon.http.http2.Http2Flag;
import io.helidon.http.http2.Http2FrameData;
import io.helidon.http.http2.Http2FrameHeader;
Expand All @@ -54,14 +54,14 @@
import io.helidon.http.http2.Http2StreamState;
import io.helidon.http.http2.Http2StreamWriter;
import io.helidon.http.http2.Http2WindowUpdate;
import io.helidon.http.http2.StreamFlowControl;
import io.helidon.webserver.http2.spi.Http2SubProtocolSelector;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Delayed;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -92,7 +92,7 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa
private final Http2Headers headers;
private final Http2StreamWriter streamWriter;
private final int streamId;
private final StreamFlowControl flowControl;
private final FlowControl.Outbound flowControl;
private final AtomicReference<Http2StreamState> currentStreamState;

/**
Expand All @@ -116,11 +116,11 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa
* future.
*
* <p>This member isn't final because it is set in the {@link #init()} method. It should not be
* set at any other time.
* set at any other time, although it is initialized to avoid any possible NPE.
*
* <p>Method calls on this object are thread-safe.
*/
private ScheduledFuture<?> deadlineFuture;
private Future<?> deadlineFuture = CompletableFuture.completedFuture(null);

/**
* The bytes of the next incoming message. This is created dynamically as a message is received,
Expand Down Expand Up @@ -176,7 +176,7 @@ enum ReadState {
@NonNull final Http2Headers headers,
@NonNull final Http2StreamWriter streamWriter,
final int streamId,
@NonNull final StreamFlowControl flowControl,
@NonNull final FlowControl.Outbound flowControl,
@NonNull final Http2StreamState currentStreamState,
@NonNull final PbjConfig config,
@NonNull final PbjMethodRoute route,
Expand Down Expand Up @@ -209,8 +209,8 @@ public void init() {
// In addition, "application/grpc" is interpreted as "application/grpc+proto".
final var requestHeaders = headers.httpHeaders();
final var requestContentType =
requestHeaders.contentType().orElse(HttpMediaTypes.PLAINTEXT_UTF_8);
final var ct = requestContentType.text();
requestHeaders.contentType().orElse(null);
final var ct = requestContentType == null ? "" : requestContentType.text();
final var contentType =
switch (ct) {
case APPLICATION_GRPC, APPLICATION_GRPC_PROTO -> APPLICATION_GRPC_PROTO;
Expand All @@ -220,7 +220,8 @@ public void init() {
yield ct;
}
throw new HttpException(
"Unsupported", Status.UNSUPPORTED_MEDIA_TYPE_415);
"invalid gRPC request content-type \"" + ct + "\"",
Status.UNSUPPORTED_MEDIA_TYPE_415);
}
};

Expand Down Expand Up @@ -262,7 +263,6 @@ public void init() {
// If the grpc-timeout header is present, determine when that timeout would occur, or
// default to a future that is so far in the future it will never happen.
final var timeout = requestHeaders.value(GRPC_TIMEOUT);

deadlineFuture =
timeout.map(this::scheduleDeadline).orElse(new NoopScheduledFuture<>());

Expand Down Expand Up @@ -306,6 +306,7 @@ public void init() {
new TrailerOnlyBuilder()
.httpStatus(httpException.status())
.grpcStatus(GrpcStatus.INVALID_ARGUMENT)
.statusMessage(httpException.getMessage())
.send();
error();
} catch (final Exception unknown) {
Expand Down Expand Up @@ -566,7 +567,7 @@ private void sendResponseHeaders(
http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS),
flowControl.outbound());
flowControl);
}

/**
Expand Down Expand Up @@ -630,7 +631,7 @@ protected void send(
streamId,
Http2Flag.HeaderFlags.create(
Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
flowControl.outbound());
flowControl);
}
}

Expand Down Expand Up @@ -701,7 +702,7 @@ public void onNext(@NonNull final Bytes response) {
streamId);

streamWriter.writeData(
new Http2FrameData(header, bufferData), flowControl.outbound());
new Http2FrameData(header, bufferData), flowControl);
} catch (final Exception e) {
LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public SubProtocolResult subProtocol(
headers,
streamWriter,
streamId,
flowControl,
flowControl.outbound(),
currentStreamState,
config,
route,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.grpc.helidon;

import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
import greeter.HelloReply;
import greeter.HelloRequest;
import java.util.ArrayList;
import java.util.concurrent.Flow;

final class GreeterServiceImpl implements GreeterService {
GrpcStatus errorToThrow = null;

@Override
public HelloReply sayHello(HelloRequest request) {
if (errorToThrow != null) {
throw new GrpcException(errorToThrow);
}

return HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
}

// Streams of stuff coming from the client, with a single response.
@Override
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
final var names = new ArrayList<String>();
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
}

@Override
public void onNext(HelloRequest item) {
names.add(item.getName());
}

@Override
public void onError(Throwable throwable) {
replies.onError(throwable);
}

@Override
public void onComplete() {
final var reply =
HelloReply.newBuilder()
.setMessage("Hello " + String.join(", ", names))
.build();
replies.onNext(reply);
replies.onComplete();
}
};
}

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
for (int i = 0; i < 10; i++) {
replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build());
}

replies.onComplete();
}

@Override
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
// Here we receive info from the client. In this case, it is a stream of requests with
// names. We will respond with a stream of replies.
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
}

@Override
public void onNext(HelloRequest item) {
replies.onNext(
HelloReply.newBuilder().setMessage("Hello " + item.getName()).build());
}

@Override
public void onError(Throwable throwable) {
replies.onError(throwable);
}

@Override
public void onComplete() {
replies.onComplete();
}
};
}
}
Loading
Loading