Skip to content

Commit

Permalink
Refactor GrpcStatus to implement GrpcExceptionHandlerFunction (line#5571
Browse files Browse the repository at this point in the history
)

Motivation:

- Closes line#5550 

Modifications:

- `GrpcStatus` implements `GrpcExceptionHandlerFunction`
- Rename `GrpcStatus` to `DefaultGrpcExceptionHandlerFunction`

Result:

- Closes line#5550
- GrpcClientBuilder uses `DefaultGrpcExceptionHandlerFunction` as default
- GrpcService can use `DefaultGrpcExceptionHandlerFunction` for its `exceptionHandler`

<!--
Visit this URL to learn more about how to write a pull request description:
https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->
  • Loading branch information
jaeseung-bae authored and Dogacel committed Jun 8, 2024
1 parent 55d5eef commit 505ae9a
Show file tree
Hide file tree
Showing 21 changed files with 341 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageFramer;
import com.linecorp.armeria.internal.common.grpc.UnwrappingGrpcExceptionHandleFunction;
import com.linecorp.armeria.unsafe.grpc.GrpcUnsafeBufferUtil;

import io.grpc.CallCredentials;
Expand Down Expand Up @@ -418,7 +419,8 @@ public <T> T build(Class<T> clientType) {
option(INTERCEPTORS.newValue(clientInterceptors));
}
if (exceptionHandler != null) {
option(EXCEPTION_HANDLER.newValue(exceptionHandler));
option(EXCEPTION_HANDLER.newValue(new UnwrappingGrpcExceptionHandleFunction(exceptionHandler.orElse(
GrpcExceptionHandlerFunction.of()))));
}

final Object client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageFramer;
import com.linecorp.armeria.internal.client.grpc.NullCallCredentials;
import com.linecorp.armeria.internal.client.grpc.NullGrpcClientStubFactory;
import com.linecorp.armeria.internal.common.grpc.GrpcStatus;
import com.linecorp.armeria.internal.common.grpc.UnwrappingGrpcExceptionHandleFunction;
import com.linecorp.armeria.unsafe.grpc.GrpcUnsafeBufferUtil;

import io.grpc.CallCredentials;
Expand Down Expand Up @@ -174,8 +174,8 @@ public final class GrpcClientOptions {
* to a gRPC {@link Status}.
*/
public static final ClientOption<GrpcExceptionHandlerFunction> EXCEPTION_HANDLER =
ClientOption.define("EXCEPTION_HANDLER",
(ctx, cause, metadata) -> GrpcStatus.fromThrowable(cause));
ClientOption.define("EXCEPTION_HANDLER", new UnwrappingGrpcExceptionHandleFunction(
GrpcExceptionHandlerFunction.of()));

/**
* Sets whether to respect the marshaller specified in gRPC {@link MethodDescriptor}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.grpc;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;

import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.armeria.client.UnprocessedRequestException;
import com.linecorp.armeria.client.circuitbreaker.FailFastException;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.ContentTooLargeException;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.TimeoutException;
import com.linecorp.armeria.common.stream.ClosedStreamException;
import com.linecorp.armeria.server.RequestTimeoutException;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;

enum DefaultGrpcExceptionHandlerFunction implements GrpcExceptionHandlerFunction {
INSTANCE;

/**
* Converts the {@link Throwable} to a {@link Status}, taking into account exceptions specific to Armeria as
* well and the protocol package.
*/
@Override
public Status apply(RequestContext ctx, Throwable cause, Metadata metadata) {
final Status s = Status.fromThrowable(cause);
if (s.getCode() != Code.UNKNOWN) {
return s;
}

if (cause instanceof ClosedSessionException || cause instanceof ClosedChannelException) {
// ClosedChannelException is used any time the Netty channel is closed. Proper error
// processing requires remembering the error that occurred before this one and using it
// instead.
return s;
}
if (cause instanceof ClosedStreamException || cause instanceof RequestTimeoutException) {
return Status.CANCELLED.withCause(cause);
}
if (cause instanceof InvalidProtocolBufferException) {
return Status.INVALID_ARGUMENT.withCause(cause);
}
if (cause instanceof UnprocessedRequestException ||
cause instanceof IOException ||
cause instanceof FailFastException) {
return Status.UNAVAILABLE.withCause(cause);
}
if (cause instanceof Http2Exception) {
if (cause instanceof Http2Exception.StreamException &&
((Http2Exception.StreamException) cause).error() == Http2Error.CANCEL) {
return Status.CANCELLED;
}
return Status.INTERNAL.withCause(cause);
}
if (cause instanceof TimeoutException) {
return Status.DEADLINE_EXCEEDED.withCause(cause);
}
if (cause instanceof ContentTooLargeException) {
return Status.RESOURCE_EXHAUSTED.withCause(cause);
}
return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ static GrpcExceptionHandlerFunctionBuilder builder() {
return new GrpcExceptionHandlerFunctionBuilder();
}

/**
* Returns the default {@link GrpcExceptionHandlerFunction}.
*/
@UnstableApi
static GrpcExceptionHandlerFunction of() {
return DefaultGrpcExceptionHandlerFunction.INSTANCE;
}

/**
* Maps the specified {@link Throwable} to a gRPC {@link Status},
* and mutates the specified {@link Metadata}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,9 @@ public void start(Listener<O> responseListener, Metadata metadata) {
prepareHeaders(compressor, metadata, remainingNanos);

final BiFunction<ClientRequestContext, Throwable, HttpResponse> errorResponseFactory =
(unused, cause) -> HttpResponse.ofFailure(
GrpcStatus.fromThrowable(exceptionHandler, ctx, cause, metadata)
.withDescription(cause.getMessage())
.asRuntimeException());
(unused, cause) -> HttpResponse.ofFailure(exceptionHandler.apply(ctx, cause, metadata)
.withDescription(cause.getMessage())
.asRuntimeException());
final HttpResponse res = initContextAndExecuteWithFallback(
httpClient, ctx, endpointGroup, HttpResponse::of, errorResponseFactory);

Expand Down Expand Up @@ -455,7 +454,7 @@ public void onNext(DeframedMessage message) {
});
} catch (Throwable t) {
final Metadata metadata = new Metadata();
close(GrpcStatus.fromThrowable(exceptionHandler, ctx, t, metadata), metadata);
close(exceptionHandler.apply(ctx, t, metadata), metadata);
}
}

Expand Down Expand Up @@ -512,7 +511,7 @@ private void prepareHeaders(Compressor compressor, Metadata metadata, long remai

private void closeWhenListenerThrows(Throwable t) {
final Metadata metadata = new Metadata();
closeWhenEos(GrpcStatus.fromThrowable(exceptionHandler, ctx, t, metadata), metadata);
closeWhenEos(exceptionHandler.apply(ctx, t, metadata), metadata);
}

private void closeWhenEos(Status status, Metadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@

package com.linecorp.armeria.internal.common.grpc;

import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.channels.ClosedChannelException;
import java.util.Base64;

import org.slf4j.Logger;
Expand All @@ -44,34 +40,19 @@
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.armeria.client.UnprocessedRequestException;
import com.linecorp.armeria.client.circuitbreaker.FailFastException;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.ContentTooLargeException;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.TimeoutException;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.common.grpc.StackTraceElementProto;
import com.linecorp.armeria.common.grpc.StatusCauseException;
import com.linecorp.armeria.common.grpc.ThrowableProto;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
import com.linecorp.armeria.common.grpc.protocol.DeframedMessage;
import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
import com.linecorp.armeria.common.grpc.protocol.StatusMessageEscaper;
import com.linecorp.armeria.common.stream.ClosedStreamException;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.server.RequestTimeoutException;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;

/**
* Utilities for handling {@link Status} in Armeria.
Expand All @@ -80,134 +61,6 @@ public final class GrpcStatus {

private static final Logger logger = LoggerFactory.getLogger(GrpcStatus.class);

/**
* Converts the {@link Throwable} to a {@link Status}, taking into account exceptions specific to Armeria as
* well and the protocol package.
*/
public static Status fromThrowable(Throwable t) {
t = peelAndUnwrap(requireNonNull(t, "t"));
return statusFromThrowable(t);
}

/**
* Converts the {@link Throwable} to a {@link Status}.
* If the specified {@code statusFunction} returns {@code null},
* the built-in exception mapping rule, which takes into account exceptions specific to Armeria as well
* and the protocol package, is used by default.
*/
public static Status fromThrowable(@Nullable GrpcStatusFunction statusFunction, RequestContext ctx,
Throwable t, Metadata metadata) {
final GrpcExceptionHandlerFunction exceptionHandler =
statusFunction != null ? statusFunction::apply : null;
return fromThrowable(exceptionHandler, ctx, t, metadata);
}

/**
* Converts the {@link Throwable} to a {@link Status}.
* If the specified {@link GrpcExceptionHandlerFunction} returns {@code null},
* the built-in exception mapping rule, which takes into account exceptions specific to Armeria as well
* and the protocol package, is used by default.
*/
public static Status fromThrowable(@Nullable GrpcExceptionHandlerFunction exceptionHandler,
RequestContext ctx, Throwable t, Metadata metadata) {
t = peelAndUnwrap(requireNonNull(t, "t"));

if (exceptionHandler != null) {
final Status status = exceptionHandler.apply(ctx, t, metadata);
if (status != null) {
return status;
}
}

return statusFromThrowable(t);
}

private static Status statusFromThrowable(Throwable t) {
final Status s = Status.fromThrowable(t);
if (s.getCode() != Code.UNKNOWN) {
return s;
}

if (t instanceof ClosedSessionException || t instanceof ClosedChannelException) {
// ClosedChannelException is used any time the Netty channel is closed. Proper error
// processing requires remembering the error that occurred before this one and using it
// instead.
return s;
}
if (t instanceof ClosedStreamException || t instanceof RequestTimeoutException) {
return Status.CANCELLED.withCause(t);
}
if (t instanceof InvalidProtocolBufferException) {
return Status.INVALID_ARGUMENT.withCause(t);
}
if (t instanceof UnprocessedRequestException ||
t instanceof IOException ||
t instanceof FailFastException) {
return Status.UNAVAILABLE.withCause(t);
}
if (t instanceof Http2Exception) {
if (t instanceof Http2Exception.StreamException &&
((Http2Exception.StreamException) t).error() == Http2Error.CANCEL) {
return Status.CANCELLED;
}
return Status.INTERNAL.withCause(t);
}
if (t instanceof TimeoutException) {
return Status.DEADLINE_EXCEEDED.withCause(t);
}
if (t instanceof ContentTooLargeException) {
return Status.RESOURCE_EXHAUSTED.withCause(t);
}
return s;
}

/**
* Converts the specified {@link Status} to a new user-specified {@link Status}
* using the specified {@link GrpcStatusFunction}.
* Returns the given {@link Status} as is if the {@link GrpcStatusFunction} returns {@code null}.
*/
public static Status fromStatusFunction(@Nullable GrpcStatusFunction statusFunction,
RequestContext ctx, Status status, Metadata metadata) {
final GrpcExceptionHandlerFunction exceptionHandler =
statusFunction != null ? statusFunction::apply : null;
return fromExceptionHandler(exceptionHandler, ctx, status, metadata);
}

/**
* Converts the specified {@link Status} to a new user-specified {@link Status}
* using the specified {@link GrpcExceptionHandlerFunction}.
* Returns the given {@link Status} as is if the {@link GrpcExceptionHandlerFunction} returns {@code null}.
*/
public static Status fromExceptionHandler(@Nullable GrpcExceptionHandlerFunction exceptionHandler,
RequestContext ctx, Status status, Metadata metadata) {
requireNonNull(status, "status");

if (exceptionHandler != null) {
final Throwable cause = status.getCause();
if (cause != null) {
final Throwable unwrapped = peelAndUnwrap(cause);
final Status newStatus = exceptionHandler.apply(ctx, unwrapped, metadata);
if (newStatus != null) {
return newStatus;
}
}
}
return status;
}

private static Throwable peelAndUnwrap(Throwable t) {
t = Exceptions.peel(t);
Throwable cause = t;
while (cause != null) {
if (cause instanceof ArmeriaStatusException) {
t = StatusExceptionConverter.toGrpc((ArmeriaStatusException) cause);
break;
}
cause = cause.getCause();
}
return t;
}

/**
* Maps gRPC {@link Status} to {@link HttpStatus}. If there is no matched rule for the specified
* {@link Status}, the mapping rules defined in upstream Google APIs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public final class HttpStreamDeframer extends ArmeriaMessageDeframer {
private final RequestContext ctx;
private final DecompressorRegistry decompressorRegistry;
private final TransportStatusListener transportStatusListener;
@Nullable
private final GrpcExceptionHandlerFunction exceptionHandler;

@Nullable
Expand All @@ -56,7 +55,7 @@ public HttpStreamDeframer(
DecompressorRegistry decompressorRegistry,
RequestContext ctx,
TransportStatusListener transportStatusListener,
@Nullable GrpcExceptionHandlerFunction exceptionHandler,
GrpcExceptionHandlerFunction exceptionHandler,
int maxMessageLength, boolean grpcWebText, boolean server) {
super(maxMessageLength, ctx.alloc(), grpcWebText);
this.ctx = requireNonNull(ctx, "ctx");
Expand Down Expand Up @@ -121,9 +120,8 @@ public void processHeaders(HttpHeaders headers, StreamDecoderOutput<DeframedMess
decompressor(ForwardingDecompressor.forGrpc(decompressor));
} catch (Throwable t) {
final Metadata metadata = new Metadata();
transportStatusListener.transportReportStatus(
GrpcStatus.fromThrowable(exceptionHandler, ctx, t, metadata),
metadata);
transportStatusListener.transportReportStatus(exceptionHandler.apply(ctx, t, metadata),
metadata);
return;
}
}
Expand All @@ -150,8 +148,7 @@ public void processTrailers(HttpHeaders headers, StreamDecoderOutput<DeframedMes
@Override
public void processOnError(Throwable cause) {
final Metadata metadata = new Metadata();
transportStatusListener.transportReportStatus(
GrpcStatus.fromThrowable(exceptionHandler, ctx, cause, metadata), metadata);
transportStatusListener.transportReportStatus(exceptionHandler.apply(ctx, cause, metadata), metadata);
}

@Override
Expand Down
Loading

0 comments on commit 505ae9a

Please sign in to comment.