Skip to content

Commit

Permalink
Merge branch 'dev' into dev_wenjun_removeUnusedUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
Gallardot authored Dec 19, 2024
2 parents e20dbac + 235695c commit 4ddda6d
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@

long timeout() default -1;

RpcMethodRetryStrategy retry() default @RpcMethodRetryStrategy;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.net.ConnectException;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcMethodRetryStrategy {

/**
* The maximum number of retries. Default is 3, which means that the method is retried at most 3 times, including the first call.
*/
int maxRetryTimes() default 3;

/**
* The interval between retries, in milliseconds. If the value is less than or equal to 0, no interval is set.
*/
long retryInterval() default 0;

/**
* Which exception to retry.
* <p> Default is {@link ConnectException}.
*/
Class<? extends Throwable>[] retryFor() default {ConnectException.class};

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
public class SyncRequestDto {

private Host serverHost;

private Transporter transporter;

private long timeoutMillis;

private RpcMethodRetryStrategy retryStrategy;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -51,7 +52,13 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}
ClientMethodInvoker methodInvoker = methodInvokerMap.computeIfAbsent(
method.toGenericString(), m -> new SyncClientMethodInvoker(serverHost, method, nettyRemotingClient));
return methodInvoker.invoke(proxy, method, args);
try {
return methodInvoker.invoke(proxy, method, args);
} catch (UndeclaredThrowableException undeclaredThrowableException) {
throw undeclaredThrowableException.getCause();
} catch (Throwable throwable) {
throw throwable;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.RpcMethodRetryStrategy;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.exception.RemoteTimeoutException;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
Expand All @@ -34,6 +35,7 @@
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -110,59 +112,79 @@ public void initChannel(SocketChannel ch) {
isStarted.compareAndSet(false, true);
}

public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws RemotingException {
long start = System.currentTimeMillis();

public IRpcResponse sendSync(final SyncRequestDto syncRequestDto) throws RemoteException {
final Host host = syncRequestDto.getServerHost();
final Transporter transporter = syncRequestDto.getTransporter();
final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? clientConfig.getConnectTimeoutMillis()
final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? clientConfig.getDefaultRpcTimeoutMillis()
: syncRequestDto.getTimeoutMillis();
final long opaque = transporter.getHeader().getOpaque();

try {
final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
final RpcMethodRetryStrategy retryStrategy = syncRequestDto.getRetryStrategy();

int maxRetryTimes = retryStrategy.maxRetryTimes();
int currentExecuteTimes = 1;

while (true) {
final long start = System.currentTimeMillis();
try {
return doSendSync(transporter, host, timeoutMillis);
} catch (Exception ex) {
ClientSyncExceptionMetrics clientSyncExceptionMetrics =
ClientSyncExceptionMetrics.of(syncRequestDto, ex);
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);

if (currentExecuteTimes < maxRetryTimes
&& Arrays.stream(retryStrategy.retryFor()).anyMatch(e -> e.isInstance(ex))) {
currentExecuteTimes++;
if (retryStrategy.retryInterval() > 0) {
ThreadUtils.sleep(retryStrategy.retryInterval());
}
continue;
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());
});
/*
* sync wait for result
*/
IRpcResponse iRpcResponse = responseFuture.waitResponse();
if (iRpcResponse == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());

if (ex instanceof RemoteException) {
throw (RemoteException) ex;
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
throw new RemoteException("Call method to " + host + " failed", ex);
}
} finally {
ClientSyncDurationMetrics clientSyncDurationMetrics = ClientSyncDurationMetrics
.of(syncRequestDto)
.withMilliseconds(System.currentTimeMillis() - start);
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
}
return iRpcResponse;
} catch (Exception ex) {
ClientSyncExceptionMetrics clientSyncExceptionMetrics = ClientSyncExceptionMetrics
.of(syncRequestDto)
.withThrowable(ex);
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
if (ex instanceof RemotingException) {
throw (RemotingException) ex;
}
}

private IRpcResponse doSendSync(final Transporter transporter,
final Host serverHost,
long timeoutMills) throws RemoteException, InterruptedException {
final Channel channel = getOrCreateChannel(serverHost);
if (channel == null) {
throw new RemoteException(String.format("connect to : %s fail", serverHost));
}
final ResponseFuture responseFuture = new ResponseFuture(transporter.getHeader().getOpaque(), timeoutMills);
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
throw new RemotingException(ex);
responseFuture.setSendOk(false);
}
} finally {
ClientSyncDurationMetrics clientSyncDurationMetrics = ClientSyncDurationMetrics
.of(syncRequestDto)
.withMilliseconds(System.currentTimeMillis() - start);
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
log.error("Send Sync request {} to host {} failed", transporter, serverHost, responseFuture.getCause());
});
/*
* sync wait for result
*/
final IRpcResponse iRpcResponse = responseFuture.waitResponse();
if (iRpcResponse != null) {
return iRpcResponse;
}
if (responseFuture.isSendOK()) {
throw new RemoteTimeoutException(serverHost.toString(), timeoutMills, responseFuture.getCause());
} else {
throw new RemoteException(serverHost.toString(), responseFuture.getCause());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class SyncClientMethodInvoker extends AbstractClientMethodInvoker {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcMethod sync = method.getAnnotation(RpcMethod.class);
Transporter transporter = new Transporter();
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
transporter.setHeader(TransporterHeader.of(methodIdentifier));
final Transporter transporter = Transporter.of(
TransporterHeader.of(methodIdentifier),
JsonSerializer.serialize(StandardRpcRequest.of(args)));

SyncRequestDto syncRequestDto = SyncRequestDto.builder()
final SyncRequestDto syncRequestDto = SyncRequestDto.builder()
.timeoutMillis(sync.timeout())
.retryStrategy(sync.retry())
.transporter(transporter)
.serverHost(serverHost)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
/**
* timeout exception
*/
public class RemotingTimeoutException extends RemotingException {
public class RemoteTimeoutException extends RemoteException {

public RemotingTimeoutException(String message) {
public RemoteTimeoutException(String message) {
super(message);
}

public RemotingTimeoutException(String address, long timeoutMillis) {
public RemoteTimeoutException(String address, long timeoutMillis) {
this(address, timeoutMillis, null);
}

public RemotingTimeoutException(String address, long timeoutMillis, Throwable cause) {
public RemoteTimeoutException(String address, long timeoutMillis, Throwable cause) {
super(String.format("wait response on the channel %s timeout %s", address, timeoutMillis), cause);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class ResponseFuture {

private final long opaque;

// remove the timeout
private final long timeoutMillis;

private final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Loading

0 comments on commit 4ddda6d

Please sign in to comment.