diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index b0717228..d9358747 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -292,7 +292,7 @@ public void init() { // Setup the subscribers. The "outgoing" subscriber will send messages to the client. // This is given to the "open" method on the service to allow it to send messages to // the client. - final Flow.Subscriber outgoing = new SendToClientSubscriber(); + final Pipeline outgoing = new SendToClientSubscriber(); pipeline = route.service().open(route.method(), options, outgoing); } catch (final GrpcException grpcException) { route.failedGrpcRequestCounter().increment(); @@ -676,10 +676,10 @@ protected void send( } /** - * The implementation of {@link Flow.Subscriber} used to send messages to the client. It + * The implementation of {@link Pipeline} used to send messages to the client. It * receives bytes from the handlers to send to the client. */ - private final class SendToClientSubscriber implements Flow.Subscriber { + private final class SendToClientSubscriber implements Pipeline { @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { // FUTURE: Add support for flow control diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/RouteNotFoundHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/RouteNotFoundHandler.java index 39cce5f3..0694adfa 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/RouteNotFoundHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/RouteNotFoundHandler.java @@ -19,7 +19,7 @@ /** * A handler for the case where the path is not found. */ -public final class RouteNotFoundHandler +final class RouteNotFoundHandler implements Http2SubProtocolSelector.SubProtocolHandler { private final Http2StreamWriter streamWriter; private final int streamId; diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java index ae4b4caf..7cd7d1a0 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.concurrent.Flow; /** * This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way @@ -47,15 +46,13 @@ enum GreeterMethod implements Method { HelloReply sayHello(HelloRequest request); // A stream of messages coming from the client, with a single response from the server. - Pipeline sayHelloStreamRequest( - Flow.Subscriber replies); + Pipeline sayHelloStreamRequest(Pipeline replies); // A single request from the client, with a stream of responses from the server. - void sayHelloStreamReply(HelloRequest request, Flow.Subscriber replies); + void sayHelloStreamReply(HelloRequest request, Pipeline replies); // A bidirectional stream of requests and responses between the client and the server. - Pipeline sayHelloStreamBidi( - Flow.Subscriber replies); + Pipeline sayHelloStreamBidi(Pipeline replies); @NonNull default String serviceName() { @@ -77,7 +74,7 @@ default List methods() { default Pipeline open( final @NonNull Method method, final @NonNull RequestOptions options, - final @NonNull Flow.Subscriber replies) { + final @NonNull Pipeline replies) { final var m = (GreeterMethod) method; try { diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java index 43470057..1b3523ac 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterServiceImpl.java @@ -39,7 +39,7 @@ public HelloReply sayHello(HelloRequest request) { // Streams of stuff coming from the client, with a single response. @Override public Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { final var names = new ArrayList(); return new Pipeline<>() { @Override @@ -76,7 +76,7 @@ public void onComplete() { @Override public void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) { + HelloRequest request, Pipeline replies) { for (int i = 0; i < 10; i++) { replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build()); } @@ -86,7 +86,7 @@ public void sayHelloStreamReply( @Override public Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { // Here we receive info from the client. In this case, it is a stream of requests with // names. We will respond with a stream of replies. return new Pipeline<>() { diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java index b06a4c1a..0403f311 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java @@ -513,7 +513,7 @@ void exceptionThrownWhileOpening() { public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, - @NonNull Flow.Subscriber replies) { + @NonNull Pipeline replies) { throw ex; } }; @@ -799,17 +799,17 @@ default HelloReply sayHello(HelloRequest request) { @Override default Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { return null; } @Override default void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) {} + HelloRequest request, Pipeline replies) {} @Override default Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { return null; } } @@ -826,20 +826,20 @@ public HelloReply sayHello(HelloRequest request) { @Override @NonNull public Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { return svc.sayHelloStreamRequest(replies); } @Override public void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) { + HelloRequest request, Pipeline replies) { svc.sayHelloStreamReply(request, replies); } @Override @NonNull public Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { return svc.sayHelloStreamBidi(replies); } @@ -848,7 +848,7 @@ public Pipeline sayHelloStreamBidi( public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, - @NonNull Flow.Subscriber replies) { + @NonNull Pipeline replies) { return svc.open(method, options, replies); } } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java index 3360cf9f..6d03dcee 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.hedera.pbj.runtime.grpc; import java.util.concurrent.Flow; @@ -11,5 +27,5 @@ public interface Pipeline extends Flow.Subscriber { /** * Called when an END_STREAM frame is received from the client. */ - void clientEndStreamReceived(); + default void clientEndStreamReceived() { }; } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java index a25f2bae..bb712a75 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java @@ -46,6 +46,7 @@ public void clientEndStreamReceived() { } private Flow.Subscription subscription; + @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { this.subscription = requireNonNull(subscription); @@ -119,6 +120,7 @@ public static ServerStreamingBuilder serverStreaming() { /** * A builder for constructing the pipeline for a unary gRPC service method. + * * @param The type of the request message. * @param The type of the response message. */ @@ -159,7 +161,7 @@ public interface UnaryBuilder { * @return This builder. */ @NonNull - UnaryBuilder respondTo(@NonNull Flow.Subscriber replies); + UnaryBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -217,7 +219,7 @@ public interface BidiStreamingBuilder { * @return This builder. */ @NonNull - BidiStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + BidiStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -256,8 +258,7 @@ public interface ClientStreamingBuilder { * @return This builder. */ @NonNull - ClientStreamingBuilder method( - @NonNull ClientStreamingMethod method); + ClientStreamingBuilder method(@NonNull ClientStreamingMethod method); /** * Configures a lambda for mapping from the response message type to {@link Bytes}. This must be specified. @@ -277,7 +278,7 @@ ClientStreamingBuilder method( * @return This builder. */ @NonNull - ClientStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + ClientStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -333,7 +334,7 @@ public interface ServerStreamingBuilder { * @return This builder. */ @NonNull - ServerStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + ServerStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -373,7 +374,7 @@ public interface ExceptionalFunction { */ @FunctionalInterface public interface ClientStreamingMethod - extends ExceptionalFunction, Flow.Subscriber> {} + extends ExceptionalFunction, Pipeline> {} /** * A function that handles a server streaming gRPC service method. A single request is received from the client, @@ -390,7 +391,7 @@ public interface ServerStreamingMethod { * @param replies The subscriber to send responses to. * @throws Exception If an error occurs during processing. */ - void apply(@NonNull T request, @NonNull Flow.Subscriber replies) throws Exception; + void apply(@NonNull T request, @NonNull Pipeline replies) throws Exception; } /** @@ -401,12 +402,12 @@ public interface ServerStreamingMethod { * @param The type of the response message. */ public interface BidiStreamingMethod - extends ExceptionalFunction, Flow.Subscriber> {} + extends ExceptionalFunction, Pipeline> {} /** * A convenient base class for the different builders. All builders have to hold state for request and * response mapping functions, as well as the subscriber to send responses to, so we have a base class. - * This class also implements the {@link Flow.Subscriber} and {@link Flow.Subscription} interfaces, to + * This class also implements the {@link Pipeline} and {@link Flow.Subscription} interfaces, to * reduce the overall number of instances created. * *

A {@link Flow.Subscription} is provided to each subscriber at the time they subscribe. Technically @@ -419,7 +420,7 @@ public interface BidiStreamingMethod private abstract static class PipelineBuilderImpl implements Pipeline, Flow.Subscription { protected ExceptionalFunction requestMapper; protected ExceptionalFunction responseMapper; - protected Flow.Subscriber replies; + protected Pipeline replies; private Flow.Subscription sourceSubscription; protected boolean completed = false; @@ -503,7 +504,7 @@ public UnaryBuilder mapResponse(@NonNull final ExceptionalFunction respondTo(@NonNull final Flow.Subscriber replies) { + public UnaryBuilder respondTo(@NonNull final Pipeline replies) { this.replies = requireNonNull(replies); return this; } @@ -557,32 +558,36 @@ public void clientEndStreamReceived() { private static final class BidiStreamingBuilderImpl extends PipelineBuilderImpl implements BidiStreamingBuilder { private BidiStreamingMethod method; - private Flow.Subscriber incoming; + private Pipeline incoming; @Override @NonNull - public BidiStreamingBuilderImpl mapRequest(@NonNull final ExceptionalFunction mapper) { + public BidiStreamingBuilderImpl mapRequest( + @NonNull final ExceptionalFunction mapper) { this.requestMapper = mapper; return this; } @Override @NonNull - public BidiStreamingBuilderImpl method(@NonNull final BidiStreamingMethod method) { + public BidiStreamingBuilderImpl method( + @NonNull final BidiStreamingMethod method) { this.method = method; return this; } @Override @NonNull - public BidiStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFunction mapper) { + public BidiStreamingBuilderImpl mapResponse( + @NonNull final ExceptionalFunction mapper) { this.responseMapper = mapper; return this; } @Override @NonNull - public BidiStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public BidiStreamingBuilderImpl respondTo( + @NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -640,38 +645,43 @@ public void clientEndStreamReceived() { /** * The implementation of the {@link ClientStreamingBuilder} interface. + * * @param The type of the request message. * @param The type of the response message. */ private static final class ClientStreamingBuilderImpl extends PipelineBuilderImpl implements ClientStreamingBuilder { private ClientStreamingMethod method; - private Flow.Subscriber incoming; + private Pipeline incoming; @Override @NonNull - public ClientStreamingBuilderImpl mapRequest(@NonNull final ExceptionalFunction mapper) { + public ClientStreamingBuilderImpl mapRequest( + @NonNull final ExceptionalFunction mapper) { this.requestMapper = mapper; return this; } @Override @NonNull - public ClientStreamingBuilderImpl method(@NonNull final ClientStreamingMethod method) { + public ClientStreamingBuilderImpl method( + @NonNull final ClientStreamingMethod method) { this.method = method; return this; } @Override @NonNull - public ClientStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFunction mapper) { + public ClientStreamingBuilderImpl mapResponse( + @NonNull final ExceptionalFunction mapper) { this.responseMapper = mapper; return this; } @Override @NonNull - public ClientStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public ClientStreamingBuilderImpl respondTo( + @NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -684,7 +694,8 @@ public Pipeline build() { throw new IllegalStateException("The method must be specified."); } replies.onSubscribe(this); - final var responseConverter = new MapSubscriber(replies, item -> responseMapper.apply(item)); + final var responseConverter = + new MapSubscriber(replies, item -> responseMapper.apply(item)); try { incoming = method.apply(responseConverter); @@ -730,32 +741,36 @@ public void clientEndStreamReceived() { private static final class ServerStreamingBuilderImpl extends PipelineBuilderImpl implements ServerStreamingBuilder { private ServerStreamingMethod method; - private Flow.Subscriber responseConverter; + private Pipeline responseConverter; @Override @NonNull - public ServerStreamingBuilderImpl mapRequest(@NonNull final ExceptionalFunction mapper) { + public ServerStreamingBuilderImpl mapRequest( + @NonNull final ExceptionalFunction mapper) { this.requestMapper = mapper; return this; } @Override @NonNull - public ServerStreamingBuilderImpl method(@NonNull final ServerStreamingMethod method) { + public ServerStreamingBuilderImpl method( + @NonNull final ServerStreamingMethod method) { this.method = method; return this; } @Override @NonNull - public ServerStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFunction mapper) { + public ServerStreamingBuilderImpl mapResponse( + @NonNull final ExceptionalFunction mapper) { this.responseMapper = mapper; return this; } @Override @NonNull - public ServerStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public ServerStreamingBuilderImpl respondTo( + @NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -805,8 +820,8 @@ public void clientEndStreamReceived() { * @param The type of the input. * @param The type of the output. */ - private record MapSubscriber(Flow.Subscriber next, ExceptionalFunction mapper) - implements Flow.Subscriber, Flow.Subscription { + private record MapSubscriber(Pipeline next, ExceptionalFunction mapper) + implements Pipeline, Flow.Subscription { private MapSubscriber { next.onSubscribe(this); diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java index a7e11d67..f3d9a4cf 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java @@ -18,10 +18,8 @@ import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; -import edu.umd.cs.findbugs.annotations.Nullable; import java.util.List; import java.util.Optional; -import java.util.concurrent.Flow; /** * Defines a common interface for all implementations of a gRPC {@code service}. PBJ will generate a sub-interface @@ -68,16 +66,12 @@ * register it with your webserver in whatever way is appropriate for your webserver. */ public interface ServiceInterface { - /** - * Represents the metadata of a method in a gRPC service. - */ + /** Represents the metadata of a method in a gRPC service. */ interface Method { String name(); } - /** - * The options that are passed to the service when a new connection is opened. - */ + /** The options that are passed to the service when a new connection is opened. */ interface RequestOptions { /** A constant for the gRPC content type "application/grpc". */ String APPLICATION_GRPC = "application/grpc"; @@ -136,12 +130,14 @@ interface RequestOptions { * implementation is provided by the generated PBJ code, which will handle the dispatching of messages to the * appropriate methods in the correct way (unary, server-side streaming, etc.). * - * @param method The method that was called by the client. - * @param opts Any options from the request, such as the content type. + * @param method The method that was called by the client. + * @param opts Any options from the request, such as the content type. * @param responses The subscriber used by the service to push responses back to the client. */ @NonNull Pipeline open( - @NonNull Method method, @NonNull RequestOptions opts, @NonNull Flow.Subscriber responses) + @NonNull Method method, + @NonNull RequestOptions opts, + @NonNull Pipeline responses) throws GrpcException; } diff --git a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java index 7acd3900..e91e7d49 100644 --- a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java +++ b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.hedera.pbj.runtime.grpc; import static org.assertj.core.api.Assertions.assertThat; @@ -65,13 +81,12 @@ void noopOnCompleteDoesNothing() { noop.onComplete(); assertThat(noop).isNotNull(); // if we get here, all is well. } - } @Nested @ExtendWith(MockitoExtension.class) class UnaryTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -184,8 +199,8 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class BidiTest { - @Mock Flow.Subscriber client; - @Mock Flow.Subscriber replies; + @Mock Pipeline client; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -341,7 +356,7 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class ServerStreamingTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -470,7 +485,7 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class ClientStreamingTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -595,11 +610,11 @@ void positive() { verify(replies).onNext(Bytes.wrap("hello:world")); } - private static final class ConcatenatingHandler implements Flow.Subscriber { + private static final class ConcatenatingHandler implements Pipeline { private final List strings = new ArrayList<>(); - private final Flow.Subscriber sink; + private final Pipeline sink; - private ConcatenatingHandler(Flow.Subscriber sink) { + private ConcatenatingHandler(Pipeline sink) { this.sink = sink; }