Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert from Flow.Subscriber to Pipeline #325

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void init() {
// Setup the subscribers. The "outgoing" subscriber will send messages to the client.
// This is given to the "open" method on the service to allow it to send messages to
// the client.
final Flow.Subscriber<? super Bytes> outgoing = new SendToClientSubscriber();
final Pipeline<? super Bytes> outgoing = new SendToClientSubscriber();
pipeline = route.service().open(route.method(), options, outgoing);
} catch (final GrpcException grpcException) {
route.failedGrpcRequestCounter().increment();
Expand Down Expand Up @@ -676,10 +676,10 @@ protected void send(
}

/**
* The implementation of {@link Flow.Subscriber} used to send messages to the client. It
* The implementation of {@link Pipeline} used to send messages to the client. It
* receives bytes from the handlers to send to the client.
*/
private final class SendToClientSubscriber implements Flow.Subscriber<Bytes> {
private final class SendToClientSubscriber implements Pipeline<Bytes> {
@Override
public void onSubscribe(@NonNull final Flow.Subscription subscription) {
// FUTURE: Add support for flow control
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* A handler for the case where the path is not found.
*/
public final class RouteNotFoundHandler
final class RouteNotFoundHandler
implements Http2SubProtocolSelector.SubProtocolHandler {
private final Http2StreamWriter streamWriter;
private final int streamId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;

/**
* This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way
Expand All @@ -47,15 +46,13 @@ enum GreeterMethod implements Method {
HelloReply sayHello(HelloRequest request);

// A stream of messages coming from the client, with a single response from the server.
Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloRequest> sayHelloStreamRequest(Pipeline<? super HelloReply> replies);

// A single request from the client, with a stream of responses from the server.
void sayHelloStreamReply(HelloRequest request, Flow.Subscriber<? super HelloReply> replies);
void sayHelloStreamReply(HelloRequest request, Pipeline<? super HelloReply> replies);

// A bidirectional stream of requests and responses between the client and the server.
Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloRequest> sayHelloStreamBidi(Pipeline<? super HelloReply> replies);

@NonNull
default String serviceName() {
Expand All @@ -77,7 +74,7 @@ default List<Method> methods() {
default Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Flow.Subscriber<? super Bytes> replies) {
final @NonNull Pipeline<? super Bytes> replies) {

final var m = (GreeterMethod) method;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public HelloReply sayHello(HelloRequest request) {
// Streams of stuff coming from the client, with a single response.
@Override
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
final var names = new ArrayList<String>();
return new Pipeline<>() {
@Override
Expand Down Expand Up @@ -76,7 +76,7 @@ public void onComplete() {

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
for (int i = 0; i < 10; i++) {
replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build());
}
Expand All @@ -86,7 +86,7 @@ public void sayHelloStreamReply(

@Override
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
// Here we receive info from the client. In this case, it is a stream of requests with
// names. We will respond with a stream of replies.
return new Pipeline<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ void exceptionThrownWhileOpening() {
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
throw ex;
}
};
Expand Down Expand Up @@ -799,17 +799,17 @@ default HelloReply sayHello(HelloRequest request) {

@Override
default Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return null;
}

@Override
default void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {}
HelloRequest request, Pipeline<? super HelloReply> replies) {}

@Override
default Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return null;
}
}
Expand All @@ -826,20 +826,20 @@ public HelloReply sayHello(HelloRequest request) {
@Override
@NonNull
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamRequest(replies);
}

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
svc.sayHelloStreamReply(request, replies);
}

@Override
@NonNull
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamBidi(replies);
}

Expand All @@ -848,7 +848,7 @@ public Pipeline<? super HelloRequest> sayHelloStreamBidi(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
return svc.open(method, options, replies);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.runtime.grpc;

import java.util.concurrent.Flow;
Expand All @@ -11,5 +27,5 @@ public interface Pipeline<T> extends Flow.Subscriber<T> {
/**
* Called when an END_STREAM frame is received from the client.
*/
void clientEndStreamReceived();
default void clientEndStreamReceived() { };
}
Loading
Loading