Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

17 listener websocket connection #19

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
<version>4.5.13</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
<version>4.4.15</version>
</dependency>

<dependency>
Expand All @@ -102,7 +102,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/sirius/sdk/base/ReadOnlyChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
public interface ReadOnlyChannel {
/**
* Read message packet
* @param timeout Operation timeout is sec
* @return chunk of data stream
*/
CompletableFuture<byte[]> read();
Expand Down
176 changes: 176 additions & 0 deletions src/main/java/com/sirius/sdk/base/Retryer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.sirius.sdk.base;

import com.sirius.sdk.utils.Pair;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Retryer {

private static final Logger log = Logger.getLogger(Retryer.class.getName());

private final Policy policy;

private final Context context;

public Retryer(Policy policy, Context context) {
this.policy = policy;
this.context = context;
}

public static Builder builder() {
return new Builder();
}

public <R> R retry(Callable<R> callable) throws Exception {
while (policy.shouldRetry(context)) {
try {
return callable.call();
} catch (Exception e) {
context.setInvocationTimestamp(Instant.now());
log.log(Level.WARNING, "try " + context.getInvocationNumber() + " failed: " + e.getMessage());
context.incrementInvocationNumber();
context.setResult(Pair.pair(null, e));
Duration delay = policy.nextRunDelay(context);
if (delay != null && !delay.isZero()) {
try {
Thread.sleep(delay.toMillis());
} catch (InterruptedException ex) {
break;
}
}
}
}
if (context.getResult().second != null) {
throw context.getResult().second;
} else {
return (R) context.getResult().first;
}

}

public static class Context {

private final AtomicInteger invocationNumber = new AtomicInteger(0);

private final AtomicReference<Instant> invocationTimestamp = new AtomicReference<>();

private final AtomicReference<Pair<Object, Exception>> result = new AtomicReference<>();

public Pair<Object, Exception> getResult() {
return result.get();
}

public void setResult(Pair<Object, Exception> result) {
this.result.set(result);
}

public int getInvocationNumber() {
return invocationNumber.get();
}

public int incrementInvocationNumber() {
return invocationNumber.incrementAndGet();
}

public Instant getInvocationTimestamp() {
return invocationTimestamp.get();
}

public void setInvocationTimestamp(Instant invocationTimestamp) {
this.invocationTimestamp.set(invocationTimestamp);
}
}

public interface Policy {

int DEFAULT_RETRY_COUNT = 3;
Duration DEFAULT_DELAY = Duration.ZERO;

default boolean shouldRetry(Context context) {
return context.getInvocationNumber() < DEFAULT_RETRY_COUNT;
}

default Duration nextRunDelay(Context context) {
return DEFAULT_DELAY;
}

class Builder {

private int maxAttempts = Policy.DEFAULT_RETRY_COUNT;
private Duration waitDuration = Policy.DEFAULT_DELAY;
private Function<Integer, Duration> intervalFunction;

public Builder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}

public Builder waitDuration(Duration waitDuration) {
this.waitDuration = waitDuration;
this.intervalFunction = null;
return this;
}

public Builder intervalFunction(Function<Integer, Duration> intervalFunction) {
this.intervalFunction = intervalFunction;
this.waitDuration = null;
return this;
}

private Policy build() {
return new Policy() {
@Override
public boolean shouldRetry(Context context) {
return context.getInvocationNumber() < maxAttempts;
}

@Override
public Duration nextRunDelay(Context context) {
if (waitDuration != null) {
return waitDuration;
} else if (intervalFunction != null) {
return intervalFunction.apply(context.getInvocationNumber());
} else {
return Policy.super.nextRunDelay(context);
}
}
};
}

}
}

public static class Builder {

private final Policy.Builder retryPolicy = new Policy.Builder();

public Builder maxAttempts(int maxAttempts) {
retryPolicy.maxAttempts(maxAttempts);
return this;
}

public Builder waitDuration(Duration waitDuration) {
retryPolicy.waitDuration(waitDuration);
return this;
}

public Builder intervalFunction(Function<Integer, Duration> intervalFunction) {
retryPolicy.intervalFunction(intervalFunction);
return this;
}

public Retryer build() {
return new Retryer(retryPolicy.build(), new Context());
}

}


}
Loading