From 5edd1ce397711e56d2f30d8696bdd57d0c8ee117 Mon Sep 17 00:00:00 2001
From: ohkinozomu <nozomunoise@gmail.com>
Date: Fri, 9 Aug 2024 19:32:36 +0900
Subject: [PATCH 1/2] testCluster: Add etcd integration tests

---
 cluster/agent_test.go     | 137 ++++++++++++++++++++++++++++++++++++--
 cluster/raft/etcd/peer.go |   3 +-
 2 files changed, 134 insertions(+), 6 deletions(-)

diff --git a/cluster/agent_test.go b/cluster/agent_test.go
index eddb9bc..847df72 100644
--- a/cluster/agent_test.go
+++ b/cluster/agent_test.go
@@ -134,6 +134,128 @@ func TestCluster_Hashicorp_Memberlist(t *testing.T) {
 	testCluster(t, conf1, conf2, conf3)
 }
 
+func TestCluster_Etcd_Serf(t *testing.T) {
+	bindPort1, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node1")
+	raftPort1, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node1 Raft")
+
+	bindPort2, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node2")
+	raftPort2, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node2 Raft")
+
+	bindPort3, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node3")
+	raftPort3, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node3 Raft")
+
+	members := []string{
+		"127.0.0.1:" + strconv.Itoa(bindPort1),
+		"127.0.0.1:" + strconv.Itoa(bindPort2),
+		"127.0.0.1:" + strconv.Itoa(bindPort3),
+	}
+
+	conf1 := &config.Cluster{
+		NodeName:      "1",
+		RaftImpl:      config.RaftImplEtcd,
+		BindAddr:      "127.0.0.1",
+		BindPort:      bindPort1,
+		RaftPort:      raftPort1,
+		RaftBootstrap: true,
+		RaftDir:       t.TempDir(),
+		GrpcEnable:    false,
+		Members:       members,
+		DiscoveryWay:  config.DiscoveryWaySerf,
+		NodesFileDir:  t.TempDir(),
+	}
+	conf2 := &config.Cluster{
+		NodeName:      "2",
+		RaftImpl:      config.RaftImplEtcd,
+		BindAddr:      "127.0.0.1",
+		BindPort:      bindPort2,
+		RaftPort:      raftPort2,
+		RaftBootstrap: false,
+		RaftDir:       t.TempDir(),
+		GrpcEnable:    false,
+		Members:       members,
+		DiscoveryWay:  config.DiscoveryWaySerf,
+		NodesFileDir:  t.TempDir(),
+	}
+	conf3 := &config.Cluster{
+		NodeName:      "3",
+		RaftImpl:      config.RaftImplEtcd,
+		BindAddr:      "127.0.0.1",
+		BindPort:      bindPort3,
+		RaftPort:      raftPort3,
+		RaftBootstrap: false,
+		RaftDir:       t.TempDir(),
+		GrpcEnable:    false,
+		Members:       members,
+		DiscoveryWay:  config.DiscoveryWaySerf,
+		NodesFileDir:  t.TempDir(),
+	}
+	testCluster(t, conf1, conf2, conf3)
+}
+
+func TestCluster_Etcd_Memberlist(t *testing.T) {
+	bindPort1, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node1")
+
+	bindPort2, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node2")
+
+	bindPort3, err := utils.GetFreePort()
+	require.NoError(t, err, "Failed to get free port for node3")
+
+	members := []string{
+		"127.0.0.1:" + strconv.Itoa(bindPort1),
+		"127.0.0.1:" + strconv.Itoa(bindPort2),
+		"127.0.0.1:" + strconv.Itoa(bindPort3),
+	}
+
+	conf1 := &config.Cluster{
+		NodeName:      "1",
+		RaftImpl:      config.RaftImplEtcd,
+		BindAddr:      "127.0.0.1",
+		BindPort:      bindPort1,
+		RaftPort:      mlist.GetRaftPortFromBindPort(bindPort1),
+		RaftBootstrap: true,
+		RaftDir:       t.TempDir(),
+		GrpcEnable:    false,
+		Members:       members,
+		DiscoveryWay:  config.DiscoveryWayMemberlist,
+		NodesFileDir:  t.TempDir(),
+	}
+	conf2 := &config.Cluster{
+		NodeName:      "2",
+		RaftImpl:      config.RaftImplEtcd,
+		BindAddr:      "127.0.0.1",
+		BindPort:      bindPort2,
+		RaftPort:      mlist.GetRaftPortFromBindPort(bindPort2),
+		RaftBootstrap: false,
+		RaftDir:       t.TempDir(),
+		GrpcEnable:    false,
+		Members:       members,
+		DiscoveryWay:  config.DiscoveryWayMemberlist,
+		NodesFileDir:  t.TempDir(),
+	}
+	conf3 := &config.Cluster{
+		NodeName:      "3",
+		RaftImpl:      config.RaftImplEtcd,
+		BindAddr:      "127.0.0.1",
+		BindPort:      bindPort3,
+		RaftPort:      mlist.GetRaftPortFromBindPort(bindPort3),
+		RaftBootstrap: false,
+		RaftDir:       t.TempDir(),
+		GrpcEnable:    false,
+		Members:       members,
+		DiscoveryWay:  config.DiscoveryWayMemberlist,
+		NodesFileDir:  t.TempDir(),
+	}
+	testCluster(t, conf1, conf2, conf3)
+}
+
 func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, conf3 *config.Cluster) {
 	log.Init(log.DefaultOptions())
 
@@ -141,11 +263,15 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con
 	err := agent1.Start()
 	require.NoError(t, err, "Agent start failed for node: %s", conf1.NodeName)
 
+	time.Sleep(1 * time.Second)
+
 	agent2 := NewAgent(conf2)
 	err = agent2.Start()
 	defer agent2.Stop()
 	require.NoError(t, err, "Agent start failed for node: %s", conf2.NodeName)
 
+	time.Sleep(1 * time.Second)
+
 	agent3 := NewAgent(conf3)
 	err = agent3.Start()
 	defer agent3.Stop()
@@ -157,9 +283,9 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con
 	_, leader2 := agent2.raftPeer.GetLeader()
 	_, leader3 := agent3.raftPeer.GetLeader()
 
-	require.Equal(t, leader1, "node1")
-	require.Equal(t, leader2, "node1")
-	require.Equal(t, leader3, "node1")
+	require.Equal(t, leader1, conf1.NodeName)
+	require.Equal(t, leader2, conf1.NodeName)
+	require.Equal(t, leader3, conf1.NodeName)
 
 	members1 := agent1.GetMemberList()
 	members2 := agent2.GetMemberList()
@@ -177,13 +303,14 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con
 	_, newLeader3 := agent3.raftPeer.GetLeader()
 
 	// Check that either agent2 or agent3 becomes the new leader
-	if newLeader2 == "node2" || newLeader2 == "node3" {
+	if newLeader2 == conf2.NodeName || newLeader3 == conf3.NodeName {
 		require.Equal(t, newLeader2, newLeader3, "Leaders should be the same for agent2 and agent3")
 	} else {
 		require.Fail(t, "New leader should be either node2 or node3")
 	}
 
 	// Restart agent1 and verify it is a follower
+	t.Log("Restarting agent1")
 	restartedAgent1 := NewAgent(conf1)
 	err = restartedAgent1.Start()
 	require.NoError(t, err, "Agent restart failed for node: %s", conf1.NodeName)
@@ -198,7 +325,7 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con
 	require.Equal(t, leaderAfterRestart2, leaderAfterRestart1)
 	require.Equal(t, leaderAfterRestart3, leaderAfterRestart1)
 
-	require.NotEqual(t, leaderAfterRestart1, "node1", "After restart, node1 should not be the leader")
+	require.NotEqual(t, leaderAfterRestart1, conf1.NodeName, "After restart, node1 should not be the leader")
 
 	t.Log("Test completed successfully")
 }
diff --git a/cluster/raft/etcd/peer.go b/cluster/raft/etcd/peer.go
index 0241b68..a14ab0c 100644
--- a/cluster/raft/etcd/peer.go
+++ b/cluster/raft/etcd/peer.go
@@ -368,7 +368,6 @@ func (p *Peer) serveChannels() {
 			return
 
 		case <-p.stopC:
-			p.Stop()
 			return
 		}
 	}
@@ -603,6 +602,8 @@ func (p *Peer) writeError(err error) {
 
 func (p *Peer) Stop() {
 	p.stopHTTP()
+	close(p.proposeC)
+	close(p.confChangeC)
 	close(p.commitC)
 	close(p.errorC)
 	p.node.Stop()

From d26aa3420f390700545ae9a139f7f1183ff4521e Mon Sep 17 00:00:00 2001
From: Wind <573966@qq.com>
Date: Tue, 13 Aug 2024 11:39:47 +0800
Subject: [PATCH 2/2] Update peer.go

---
 cluster/raft/etcd/peer.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/cluster/raft/etcd/peer.go b/cluster/raft/etcd/peer.go
index a14ab0c..48f9708 100644
--- a/cluster/raft/etcd/peer.go
+++ b/cluster/raft/etcd/peer.go
@@ -368,6 +368,7 @@ func (p *Peer) serveChannels() {
 			return
 
 		case <-p.stopC:
+			//p.Stop()
 			return
 		}
 	}