Skip to content

Commit

Permalink
!all node renames: node -> endpoint; unique node -> node (#1077)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso authored Oct 27, 2022
1 parent 970bfdd commit 7b8017c
Show file tree
Hide file tree
Showing 136 changed files with 3,823 additions and 2,888 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ if args.count >= 3 {
print("parsing port")
let port = Int(args[2])!
print("Joining")
system.cluster.join(node: Node(systemName: "System", host: host, port: port))
system.cluster.join(endpoint: Cluster.Endpoint(systemName: "System", host: host, port: port))
}

_Thread.sleep(.seconds(120))
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public final class MultiNodeClusterSingletonTests: MultiNodeTestSuite {
}

distributed func greet(name: String) -> String {
"\(self.greeting) \(name)! (from node: \(self.id.uniqueNode), id: \(self.id.detailedDescription))"
"\(self.greeting) \(name)! (from node: \(self.id.node), id: \(self.id.detailedDescription))"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class MultiNodeReceptionistTests: MultiNodeTestSuite {
let expectedCount = Nodes.allCases.count
var discovered: Set<DistributedEcho> = []
for try await actor in await multiNode.system.receptionist.listing(of: .init(DistributedEcho.self)) {
multiNode.log.notice("Discovered \(actor.id) from \(actor.id.uniqueNode)")
multiNode.log.notice("Discovered \(actor.id) from \(actor.id.node)")
discovered.insert(actor)

if discovered.count == expectedCount {
Expand Down Expand Up @@ -79,7 +79,7 @@ public final class MultiNodeReceptionistTests: MultiNodeTestSuite {
}

distributed func echo(name: String) -> String {
"echo: \(self.greeting)\(name)! (from node: \(self.id.uniqueNode), id: \(self.id.detailedDescription))"
"echo: \(self.greeting)\(name)! (from node: \(self.id.node), id: \(self.id.detailedDescription))"
}
}
}
10 changes: 5 additions & 5 deletions Protos/ActorID.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ option optimize_for = SPEED;
option swift_prefix = "_Proto";

message ActorID {
UniqueNode node = 1;
ClusterNode node = 1;
ActorPath path = 2;
uint32 incarnation = 3;
map<string, bytes> metadata = 4;
Expand All @@ -28,12 +28,12 @@ message ActorPath {
repeated string segments = 1;
}

message UniqueNode {
Node node = 1;
uint64 nid = 2;
message ClusterNode {
ClusterEndpoint endpoint = 1;
uint64 nid = 2;
}

message Node {
message ClusterEndpoint {
string protocol = 1;
string system = 2;
string hostname = 3;
Expand Down
6 changes: 3 additions & 3 deletions Protos/Clocks/VersionVector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ message ActorIdentity {

message VersionReplicaID {
oneof value {
ActorID actorID = 1;
UniqueNode uniqueNode = 2;
uint64 uniqueNodeID = 3;
ActorID actorID = 1;
ClusterNode node = 2;
uint64 nodeID = 3;
}
}

Expand Down
4 changes: 2 additions & 2 deletions Protos/Cluster/Cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ message ClusterInbound {
}

message ClusterRestInPeace {
UniqueNode targetNode = 1;
UniqueNode fromNode = 2;
ClusterNode targetNode = 1;
ClusterNode fromNode = 2;
}
4 changes: 2 additions & 2 deletions Protos/Cluster/ClusterEvents.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ message ClusterEvent {
}

message ClusterMembershipChange {
UniqueNode node = 1;
ClusterNode node = 1;

ClusterMemberStatus fromStatus = 2;
ClusterMemberStatus toStatus = 3;
ClusterMemberStatus toStatus = 3;
}

message ClusterLeadershipChange {
Expand Down
20 changes: 10 additions & 10 deletions Protos/Cluster/Membership.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ import "Clocks/VersionVector.proto";

message ClusterMembership {
repeated ClusterMember members = 1;
UniqueNode leaderNode = 2;
ClusterNode leaderNode = 2;
}

message ClusterMember {
UniqueNode node = 1;
ClusterMemberStatus status = 2;
ClusterNode node = 1;
ClusterMemberStatus status = 2;
ClusterMemberReachability reachability = 3;
uint32 upNumber = 4;
uint32 upNumber = 4;
}

enum ClusterMemberReachability {
CLUSTER_MEMBER_REACHABILITY_UNSPECIFIED = 0;
CLUSTER_MEMBER_REACHABILITY_REACHABLE = 1;
CLUSTER_MEMBER_REACHABILITY_REACHABLE = 1;
CLUSTER_MEMBER_REACHABILITY_UNREACHABLE = 2;
}

Expand All @@ -51,13 +51,13 @@ enum ClusterMemberStatus {
// ==== Membership Gossip ----------------------------------------------------------------------------------------------

message ClusterMembershipGossip {
// Membership contains full UniqueNode renderings, and the owner and seen table refer to them by UniqueNode.ID
// Membership contains full ClusterNode renderings, and the owner and seen table refer to them by ClusterNode.ID
// this saves us space (by avoiding to render the unique node explicitly many times for each member/seen-entry).
ClusterMembership membership = 1;

// The following fields will use compressed UniqueNode encoding and ONLY serialize them as their uniqueNodeID.
// During deserialization the fields can be resolved against the membership to obtain full UniqueNode values if necessary.
uint64 ownerUniqueNodeID = 2;
// The following fields will use compressed ClusterNode encoding and ONLY serialize them as their nodeID.
// During deserialization the fields can be resolved against the membership to obtain full ClusterNode values if necessary.
uint64 ownerClusterNodeID = 2;
ClusterMembershipSeenTable seenTable = 3;
}

Expand All @@ -66,6 +66,6 @@ message ClusterMembershipSeenTable {
}

message ClusterMembershipSeenTableRow {
uint64 uniqueNodeID = 1;
uint64 nodeID = 1;
VersionVector version = 2;
}
2 changes: 1 addition & 1 deletion Protos/Cluster/SWIM/SWIM.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ message SWIMStatus {

Type type = 1;
uint64 incarnation = 2;
repeated UniqueNode suspectedBy = 3;
repeated ClusterNode suspectedBy = 3;
}

message SWIMMember {
Expand Down
24 changes: 12 additions & 12 deletions Protos/WireProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import "Serialization/Serialization.proto";
// ==== Handshake ------------------------------------------------------------------------------------------------------

message HandshakeOffer {
ProtocolVersion version = 1;
UniqueNode originNode = 2;
Node targetNode = 3;
ProtocolVersion version = 1;
ClusterNode originNode = 2;
ClusterEndpoint targetEndpoint = 3;
// In the future we may want to add additional information
// about certain capabilities here. E.g. when a node supports
// faster transport like InfiniBand and the likes, so we can
Expand All @@ -43,14 +43,14 @@ message HandshakeResponse {

message HandshakeAccept {
ProtocolVersion version = 1;
UniqueNode originNode = 2;
UniqueNode targetNode = 3;
ClusterNode originNode = 2;
ClusterNode targetNode = 3;
}

message HandshakeReject {
ProtocolVersion version = 1;
UniqueNode originNode = 2;
UniqueNode targetNode = 3;
ClusterNode originNode = 2;
ClusterNode targetNode = 3;
string reason = 4;
}

Expand All @@ -66,16 +66,16 @@ message Envelope {
// System messages have to be reliable, therefore they need to be acknowledged
// by the receiving node.
message SystemEnvelope {
uint64 sequenceNr = 1;
UniqueNode from = 2;
uint64 sequenceNr = 1;
ClusterNode from = 2;

Manifest manifest = 3;
bytes payload = 4;
Manifest manifest = 3;
bytes payload = 4;
}

message SystemAck {
uint64 sequenceNr = 1;
UniqueNode from = 2;
ClusterNode from = 2;
}

// The version is represented as 4 bytes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ final class DistributedDiningPhilosophers {
print("~~~~~~~ started \(systems.count) actor systems ~~~~~~~")

// TODO: Joining to be simplified by having "seed nodes" (that a node should join)
systemA.cluster.join(node: systemB.settings.node)
systemA.cluster.join(node: systemC.settings.node)
systemC.cluster.join(node: systemB.settings.node)
systemA.cluster.join(endpoint: systemB.settings.endpoint)
systemA.cluster.join(endpoint: systemC.settings.endpoint)
systemC.cluster.join(endpoint: systemB.settings.endpoint)

print("waiting for cluster to form...")
try await self.ensureCluster(systems, within: .seconds(10))
Expand Down Expand Up @@ -71,7 +71,7 @@ final class DistributedDiningPhilosophers {
}

private func ensureCluster(_ systems: [ClusterSystem], within: Duration) async throws {
let nodes = Set(systems.map(\.settings.uniqueBindNode))
let nodes = Set(systems.map(\.settings.bindNode))

try await withThrowingTaskGroup(of: Void.self) { group in
for system in systems {
Expand Down
Loading

0 comments on commit 7b8017c

Please sign in to comment.