Skip to content

Commit

Permalink
Work on nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
HttpMarco committed Nov 28, 2024
1 parent b72d110 commit b92af6a
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 19 deletions.
6 changes: 6 additions & 0 deletions src/main/java/dev/httpmarco/netline/Net.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.httpmarco.netline;

import dev.httpmarco.netline.client.NetClient;
import dev.httpmarco.netline.node.NetNode;
import dev.httpmarco.netline.server.NetServer;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
Expand All @@ -21,4 +22,9 @@ public final class Net {
public @NotNull NetServer server() {
return new NetServer();
}

@Contract(value = " -> new", pure = true)
public @NotNull NetNode node() {
return new NetNode();
}
}
2 changes: 1 addition & 1 deletion src/main/java/dev/httpmarco/netline/client/NetClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

@Log4j2
@Accessors(fluent = true)
public final class NetClient extends AbstractNetCompImpl<NetClientConfig> implements NetChannel {
public class NetClient extends AbstractNetCompImpl<NetClientConfig> implements NetChannel {

private Bootstrap bootstrap;
@Setter(AccessLevel.PACKAGE)
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/dev/httpmarco/netline/node/NetNode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package dev.httpmarco.netline.node;

import dev.httpmarco.netline.NetChannel;
import dev.httpmarco.netline.NetCompHandler;
import dev.httpmarco.netline.client.NetClient;
import dev.httpmarco.netline.server.AbstractNetServer;
import io.netty5.channel.Channel;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Getter
@Accessors(fluent = true)
public final class NetNode extends AbstractNetServer<NetNodeConfig> {

private final List<NetClient> clients = new ArrayList<>();
private NetNodeState state = NetNodeState.INITIALIZING;

public NetNode() {
super(new NetNodeConfig());
}

@Contract(pure = true)
@Override
public @Nullable CompletableFuture<Void> onClose() {
return null;
}

@Override
public void onBindFail(Throwable throwable) {

}

@Override
public CompletableFuture<Void> onBindSuccess() {
var future = new CompletableFuture<Void>();
this.state = NetNodeState.BIND_CLUSTER;

if(this.config().bindings().isEmpty()) {
// we are the only node
this.state = NetNodeState.READY;
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture.allOf(config().bindings().stream().map(binding -> {
var netNodeClient = new NetNodeClient(binding);
clients.add(netNodeClient);
return netNodeClient.boot();
}).toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> {
this.state = NetNodeState.SYNC_CLUSTER;
// todo

this.state = NetNodeState.READY;
future.complete(null);
});
}
return future;
}

@Contract(pure = true)
@Override
public @Nullable NetCompHandler handler() {
return null;
}

@Contract(pure = true)
@Override
public @Nullable NetChannel findChannel(Channel channel) {
return null;
}

@Contract(pure = true)
@Override
public @Nullable NetChannel generateChannel(Channel channel, @Nullable String id) {
return null;
}
}
16 changes: 16 additions & 0 deletions src/main/java/dev/httpmarco/netline/node/NetNodeBinding.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package dev.httpmarco.netline.node;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.Accessors;

@Getter
@Accessors(fluent = true)
@AllArgsConstructor
public final class NetNodeBinding {

private final String id;
private final String hostname;
private final int port;

}
17 changes: 17 additions & 0 deletions src/main/java/dev/httpmarco/netline/node/NetNodeClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dev.httpmarco.netline.node;

import dev.httpmarco.netline.client.NetClient;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.jetbrains.annotations.NotNull;

@Getter
@Accessors(fluent = true)
@AllArgsConstructor
public class NetNodeClient extends NetClient {

public NetNodeClient(@NotNull NetNodeBinding binding) {
this.config().id(binding.id()).hostname(binding.hostname()).port(binding.port());
}
}
9 changes: 9 additions & 0 deletions src/main/java/dev/httpmarco/netline/node/NetNodeConfig.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package dev.httpmarco.netline.node;

import dev.httpmarco.netline.config.CompConfig;
import lombok.Getter;
import lombok.experimental.Accessors;

import java.util.ArrayList;
import java.util.List;

@Getter
@Accessors(fluent = true)
public final class NetNodeConfig extends CompConfig {

private List<NetNodeBinding> bindings = new ArrayList<>();

public NetNodeConfig() {
super("0.0.0.0", 9091, true, 5);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.httpmarco.netline.server;

import dev.httpmarco.netline.NetChannel;
import dev.httpmarco.netline.NetCompHandler;
import dev.httpmarco.netline.channel.NetChannelInitializer;
import dev.httpmarco.netline.config.CompConfig;
Expand All @@ -22,7 +21,6 @@ public AbstractNetServer(C config) {
super(1, config);
}


@Override
public CompletableFuture<Void> boot() {
var future = new CompletableFuture<Void>();
Expand All @@ -42,8 +40,7 @@ public CompletableFuture<Void> boot() {

bootstrap.bind(config().hostname(), config().port()).addListener(it -> {
if(it.isSuccess()) {
this.onBindSuccess();
future.complete(null);
this.onBindSuccess().whenComplete((unused, throwable) -> future.complete(null));
return;
}
this.onBindFail(it.cause());
Expand All @@ -64,7 +61,7 @@ public CompletableFuture<Void> boot() {

public abstract void onBindFail(Throwable throwable);

public abstract void onBindSuccess();
public abstract CompletableFuture<Void> onBindSuccess();

public abstract NetCompHandler handler();
}
9 changes: 6 additions & 3 deletions src/main/java/dev/httpmarco/netline/server/NetServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public void send(String id, Packet packet) {
public void send(Predicate<NetClientChannel> predicate, Packet packet) {
this.clients.stream().filter(predicate).forEach(channel -> channel.send(packet));
}

@Override
public CompletableFuture<Void> onClose() {
public @NotNull CompletableFuture<Void> onClose() {
var future = CompletableFuture.allOf(this.clients.stream().map(NetClientChannel::close).toArray(CompletableFuture[]::new));
this.state = NetServerState.CLOSED;
return future;
Expand All @@ -73,12 +74,14 @@ public void onBindFail(Throwable throwable) {
}

@Override
public void onBindSuccess() {
public @NotNull CompletableFuture<Void> onBindSuccess() {
this.state = NetServerState.BOOTED;
return CompletableFuture.completedFuture(null);
}

@Contract(" -> new")
@Override
public NetCompHandler handler() {
public @NotNull NetCompHandler handler() {
return new NetServerHandler(this);
}

Expand Down
34 changes: 24 additions & 10 deletions src/test/java/dev/httpmarco/netline/tests/NodeTest.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
package dev.httpmarco.netline.tests;

import dev.httpmarco.netline.Net;
import dev.httpmarco.netline.node.NetNode;
import dev.httpmarco.netline.node.NetNodeBinding;
import dev.httpmarco.netline.node.NetNodeState;
import org.junit.jupiter.api.*;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@DisplayName("5 - Node cluster test")
public class NodeTest {
@DisplayName("6 - Node cluster test")
public final class NodeTest {

/*
private static NetNode node;
private static NetNode nodeA;
private static NetNode nodeB;

@BeforeAll
public static void beforeHandling() {
node = new NetNode();
nodeA = Net.line().node();
nodeA.config(it -> it.bindings().add(new NetNodeBinding("nodeA", "localhost", 9093)));
nodeB = Net.line().node();
nodeB.config(it -> it.port(9093));
}

@Test
@Order(1)
@DisplayName("5.1 Start first node")
@DisplayName("6.1 Start first node")
public void testState() {
assert node.state() == NetNodeState.INITIALIZING;
node.bootSync();
assert node.state() == NetNodeState.READY;
assert nodeA.state() == NetNodeState.INITIALIZING;
nodeA.bootSync();
assert nodeA.state() == NetNodeState.READY;
}

@Test
@Order(2)
@DisplayName("6.2 Start second node")
public void testSecondNode() {
assert nodeB.state() == NetNodeState.INITIALIZING;
nodeB.bootSync();
assert nodeB.state() == NetNodeState.READY;
}

*/
}

0 comments on commit b92af6a

Please sign in to comment.