Skip to content

Commit

Permalink
Convert from Flow.Subscriber to Pipeline (#325)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Bair <rbair23@users.noreply.github.com>
  • Loading branch information
rbair23 authored Nov 15, 2024
1 parent 383b410 commit d7c41ea
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void init() {
// Setup the subscribers. The "outgoing" subscriber will send messages to the client.
// This is given to the "open" method on the service to allow it to send messages to
// the client.
final Flow.Subscriber<? super Bytes> outgoing = new SendToClientSubscriber();
final Pipeline<? super Bytes> outgoing = new SendToClientSubscriber();
pipeline = route.service().open(route.method(), options, outgoing);
} catch (final GrpcException grpcException) {
route.failedGrpcRequestCounter().increment();
Expand Down Expand Up @@ -676,10 +676,10 @@ protected void send(
}

/**
* The implementation of {@link Flow.Subscriber} used to send messages to the client. It
* The implementation of {@link Pipeline} used to send messages to the client. It
* receives bytes from the handlers to send to the client.
*/
private final class SendToClientSubscriber implements Flow.Subscriber<Bytes> {
private final class SendToClientSubscriber implements Pipeline<Bytes> {
@Override
public void onSubscribe(@NonNull final Flow.Subscription subscription) {
// FUTURE: Add support for flow control
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* A handler for the case where the path is not found.
*/
public final class RouteNotFoundHandler
final class RouteNotFoundHandler
implements Http2SubProtocolSelector.SubProtocolHandler {
private final Http2StreamWriter streamWriter;
private final int streamId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;

/**
* This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way
Expand All @@ -47,15 +46,13 @@ enum GreeterMethod implements Method {
HelloReply sayHello(HelloRequest request);

// A stream of messages coming from the client, with a single response from the server.
Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloRequest> sayHelloStreamRequest(Pipeline<? super HelloReply> replies);

// A single request from the client, with a stream of responses from the server.
void sayHelloStreamReply(HelloRequest request, Flow.Subscriber<? super HelloReply> replies);
void sayHelloStreamReply(HelloRequest request, Pipeline<? super HelloReply> replies);

// A bidirectional stream of requests and responses between the client and the server.
Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloRequest> sayHelloStreamBidi(Pipeline<? super HelloReply> replies);

@NonNull
default String serviceName() {
Expand All @@ -77,7 +74,7 @@ default List<Method> methods() {
default Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Flow.Subscriber<? super Bytes> replies) {
final @NonNull Pipeline<? super Bytes> replies) {

final var m = (GreeterMethod) method;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public HelloReply sayHello(HelloRequest request) {
// Streams of stuff coming from the client, with a single response.
@Override
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
final var names = new ArrayList<String>();
return new Pipeline<>() {
@Override
Expand Down Expand Up @@ -76,7 +76,7 @@ public void onComplete() {

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
for (int i = 0; i < 10; i++) {
replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build());
}
Expand All @@ -86,7 +86,7 @@ public void sayHelloStreamReply(

@Override
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
// Here we receive info from the client. In this case, it is a stream of requests with
// names. We will respond with a stream of replies.
return new Pipeline<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ void exceptionThrownWhileOpening() {
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
throw ex;
}
};
Expand Down Expand Up @@ -799,17 +799,17 @@ default HelloReply sayHello(HelloRequest request) {

@Override
default Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return null;
}

@Override
default void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {}
HelloRequest request, Pipeline<? super HelloReply> replies) {}

@Override
default Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return null;
}
}
Expand All @@ -826,20 +826,20 @@ public HelloReply sayHello(HelloRequest request) {
@Override
@NonNull
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamRequest(replies);
}

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
svc.sayHelloStreamReply(request, replies);
}

@Override
@NonNull
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamBidi(replies);
}

Expand All @@ -848,7 +848,7 @@ public Pipeline<? super HelloRequest> sayHelloStreamBidi(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
return svc.open(method, options, replies);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.runtime.grpc;

import java.util.concurrent.Flow;
Expand All @@ -11,5 +27,5 @@ public interface Pipeline<T> extends Flow.Subscriber<T> {
/**
* Called when an END_STREAM frame is received from the client.
*/
void clientEndStreamReceived();
default void clientEndStreamReceived() { };
}
Loading

0 comments on commit d7c41ea

Please sign in to comment.