Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How To Properly Add and Sync A New Node in an Existing Raft Network #357

Open
candostyavuz opened this issue Jun 7, 2024 · 8 comments
Open

Comments

@candostyavuz
Copy link

candostyavuz commented Jun 7, 2024

I am attempting to add a new node into an existing Raft cluster to increase the number of nodes in the network. My process involves using Dragonboat's functions like SyncGetShardMembership, SyncRequestAddNonVoting, and SyncRequestAddReplica. However, I'm facing issues with correctly syncing and propagating the new node information across the existing cluster.
I see no information about this very important concept in docs, examples or in discussions. So, I really be grateful if you can provide me a flow that allow me to dynamically add / remove nodes in a running Raft network.

  1. What is the recommended procedure to ensure that a new node addition is recognized and correctly synced across an existing Raft cluster? Including Leader recognition and state sync.
  2. When I register the new node with SyncRequestAddReplica and then start the node with related Dragonboat start method, does it handle state sync for the new node automatically or else what should I do to ensure the new node successfully synchronized with the network and connect to existing leader?
  3. Are there specific steps or configurations that should be followed or adjusted to improve the reliability of node additions?
  4. How can we verify that the new node has been fully integrated into the cluster from all nodes' perspectives?

Current Approach and Problems

  • Retrieve Current Membership: I use SyncGetShardMembership to fetch the current members of the cluster.
  • Case-1: Add Node as Non-Voting: Initially, I add the new node as a non-voting member using SyncRequestAddNonVoting. The nodeID and raftAddress for the new node are derived from our internal configurations and passed appropriately. In this scenario membership config has not been updated when SyncGetShardMembership is called and logged again afterwards.
  • Case-2: Add Node Directly: When I execute SyncRequestAddReplica directly, the membership is updated and new node info is added but only one node, which probably processed that operation. How should I propagate this updated member list to the other nodes? Isn't SyncRequestAddReplica handling this consensus automatically?

In both scenarios, when starting new node with join flag = true and initial members list as empty by using StartOnDiskReplica function, the new node has absolutely no info about who the leader is or it can't fetch the state info about existing network. It feels like there's no automatic synchronization mechanism when we register & start the new node.

Here are some function I am using to implement the logic I described above:

func (s *Service) AddNodeToCluster(nodeInfo uuid.UUID, raftAddress string) error {
	nodeID := common.Uint64FromUUID(nodeInfo)

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
	defer cancel()

	membershipInfo, err := s.nh.SyncGetShardMembership(ctx, s.config.ClusterID)
	if err != nil {
		return fmt.Errorf("failed to get cluster membership: %v", err)
	}

	err = s.nh.SyncRequestAddReplica(ctx, s.config.ClusterID, nodeID, raftAddress, membershipInfo.ConfigChangeID)
	if err != nil {
		return fmt.Errorf("failed to add node %s to the Raft cluster: %v", nodeInfo, err)
	}

	// Refresh the configuration to update s.config.Members
	if err := s.RefreshCurrentMembers(); err != nil {
		return fmt.Errorf("failed to refresh configuration: %v", err)
	}

	return nil
}

func (s *Service) RefreshCurrentMembers() error {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
	defer cancel()

	membershipInfo, err := s.nh.SyncGetShardMembership(ctx, s.config.ClusterID)
	if err != nil {
		return fmt.Errorf("failed to get shard membership: %v", err)
	}

	newMembers := make(map[uint64]string)
	for id, addr := range membershipInfo.Nodes {
		newMembers[id] = addr
	}

	s.lock.Lock()
	s.currentMembers = newMembers
	s.lock.Unlock()

	return nil
}

Leader logic which runs in go routine with time ticker:

func (s *Service) Leader() (uint64, bool) {
	if s.nh == nil {
		log.Warnf("raft is not started yet")
		return 0, false
	}
	id, _, isValid, err := s.nh.GetLeaderID(s.config.ClusterID)
	if err != nil {
		log.Warnf("failed to determine leader due to error: %s", err)
		return 0, false
	} else if !isValid {
		log.Warnf("leader is not found, maybe under rebalancing")
		return 0, false
	}
	for memberID := range s.currentMembers {
		fmt.Println("memberID: ", memberID)
		if memberID == id {
			return id, true
		}
	}
	log.Errorf("can't find leader in config, that's not normal")
	return 0, false
}

To be more specific, the new node gives can't find leader in config, that's not normal error after registered & started with steps described above...

I appreciate any guidance or insights into how to correctly manage node additions in a Raft cluster using Dragonboat, ensuring consistency and stability throughout the cluster. Thanks in advance 🙌

@kevburnsjr
Copy link
Contributor

Data synchronization is asynchronous. You will know that it's complete when you can read index on the shard from the new node. I use a function like this to block until the shard is ready after calling StartOnDiskReplica.

// readIndex blocks until it can read from a shard, indicating that the local replica is up to date.
func (a *Agent) readIndex(ctx context.Context, shardID uint64) (err error) {
	var rs *dragonboat.RequestState
	for {
		rs, err = a.host.ReadIndex(shardID, time.Second)
		if err != nil || rs == nil {
			a.log.Infof(`[%05x] Error reading shard index: %s: %v`, shardID, a.HostID(), err)
			select {
			case <-ctx.Done():
				return
			case <-a.clock.After(time.Second):
			}
			continue
		}
		res := <-rs.ResultC()
		rs.Release()
		if !res.Completed() {
			a.log.Infof(`[%05x] Waiting for other nodes`, shardID)
			select {
			case <-ctx.Done():
				return
			case <-a.clock.After(time.Second):
			}
			continue
		}
		break
	}
	return
}

Dragonboat v4 has a new WaitReady replica config option that effectively does the same thing.

@candostyavuz
Copy link
Author

candostyavuz commented Jun 7, 2024

Thank you so much for the response!

So is it the general flow, can you confirm please?

  1. Add node as non-voting first with: SyncRequestAddNonVoting
  2. I can start the node with StartReplica (or with StartOnDiskReplica in my case) with join=true & initialMemberList is empty
  3. Need to wait until ReadIndex completes is operations (or WaitReady in v4)
  4. Update membership with SyncRequestAddReplica
  5. At this stage I should be able to see the new node added in membership config and I should be able to query the new node with SyncGetShardMembership in each existing initial nodes now
  6. the new node is fully added and synchronized, it will recognize the current leader and process the proposals like the other followers (?)

Am I correct in the steps above? thanks in advance!

@kevburnsjr
Copy link
Contributor

I haven't explored the transition from non-voting to member but that sounds about right.
It should be a quick membership transition with no extra state to replicate.

@candostyavuz
Copy link
Author

candostyavuz commented Jun 9, 2024

Hey @kevburnsjr ,
I tried your solution and the new node got stuck in the loop where it waits for a reply from ReadIndex.

Here is how I tested it on a raft cluster with 3 initial members:

  1. Added the new node as non-voting member
  2. One of the initial nodes acknowledged me that non-voting member is added and it waits to connect to that node host
  3. Created a separate docker-compose.yaml for new node and the new node is started it with following operations:
	err = nh.StartOnDiskReplica(initialMembers, !isInitialMember, func(clusterId, nodeId uint64) statemachine.IOnDiskStateMachine {
		log.Infof("creating disk state machine for cluster=%d and node=%d", clusterId, nodeId)
		if clusterId != s.config.ClusterID {
			log.Fatalf("not compatible cluster version %d != %d", clusterId, s.config.ClusterID)
		}
		return s
	}, nhc)
	if err != nil {
		return errors.Wrapf(err, "failed to start on-disk raft cluster")
	}

	if err := s.readIndex(context.Background(), s.config.ClusterID); err != nil {
		return errors.Wrap(err, "failed to sync new node to the current state of the cluster")
	}
  1. Connection is established in networking layer, pinged new node from all other nodes and it was successful. But synchronization was still not happenning.
    Here is my readIndex func:
func (s *Service) readIndex(ctx context.Context, shardID uint64) error {
	for {
		rs, err := s.nh.ReadIndex(shardID, time.Second)
		fmt.Println("ReadIndex - rs: ", rs)
		if err != nil || rs == nil {
			log.Infof("[%05x] Error reading shard index: %v", shardID, err)
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-time.After(time.Second):
			}
			continue
		}
		res := <-rs.ResultC()
		fmt.Println("readIndex - res: ", res)

		rs.Release()
		if !res.Completed() {
			log.Infof("[%05x] Waiting for other nodes", shardID)
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-time.After(time.Second):
			}
			continue
		}
		break
	}
	return nil
}

Here is the response from new node:

readIndex - res:  {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:12Z" level=info msg="[00000] Waiting for other nodes"
ReadIndex - rs:  &{0 0 0 0 367 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7440 0x40010aa000 0x40003e6870 false <nil>}
time="2024-06-09T16:02:14Z" level=info msg="[00000] Waiting for other nodes"
readIndex - res:  {0 {0 []} [] {0 0} false false}
ReadIndex - rs:  &{0 0 0 0 378 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
time="2024-06-09T16:02:16Z" level=info msg="[00000] Waiting for other nodes"
readIndex - res:  {0 {0 []} [] {0 0} false false}
ReadIndex - rs:  &{0 0 0 0 388 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
readIndex - res:  {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:18Z" level=info msg="[00000] Waiting for other nodes"
ReadIndex - rs:  &{0 0 0 0 399 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
readIndex - res:  {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:21Z" level=info msg="[00000] Waiting for other nodes"

This loop kept forever going on with same message.

Anything I might be missing? Do you have any repository that I can check how this process is handled from start to end?

Thanks in advance 🙌

@kevburnsjr
Copy link
Contributor

It's hard to tell what might be wrong without a full example.
Is the rest of this code in a repository that I can view?

The readIndex example I gave you was copy/pasted from my project Zongzi which is adds a registry. It is designed to simplify these sorts of multi-node operations. See (*Agent).Start() where all startup scenarios are covered.

@lni
Copy link
Owner

lni commented Jun 10, 2024

@candostyavuz

Once the replica is added to the shard, you can start it on the designated NodeHost, after that everything else is transparent to your application meaning raft logs will be appended to the new nodes and snapshot will be sent if necessary.

In general, there is no extra step required, some would add replica as non-vote first and upgrade it to a full member later, this is extensively discussed in the raft thesis, please read the thesis for more details.

To make sure the recently added replica is working as expected, you can - issue a ReadIndex on that node, if you can successfully complete it then it means all previous logs have been appended and applied onto your new replica. this is a pretty strong guarantee as it also implies that the new replica is recognized by the leader replica, which in turn requires a consensus from all replicas in the shard.

@candostyavuz
Copy link
Author

candostyavuz commented Jun 10, 2024

@lni @kevburnsjr Thank you so much for your explanations and examples, I managed to resolve most of my issues.
For now, it seems that new node has successfully synchronized and joined the cluster. I haven't tested adding more than 1 node yet but hopefully the process goes smoothly.

I really appreciate it!

@candostyavuz
Copy link
Author

candostyavuz commented Jun 11, 2024

@lni @kevburnsjr
Sometimes there are issues occurring during the state sync of new added node, do you know the main reason for this?

panic: not initial recovery but snapshot shrunk

2024-06-11 16:29:59.163950 I | rsm: [00000:70460] opened disk SM, index 0
2024-06-11 16:29:59.164460 C | rsm: [00000:70460], ss.OnDiskIndex (106) > s.onDiskInitIndex (0)
panic: [00000:70460], ss.OnDiskIndex (106) > s.onDiskInitIndex (0)

goroutine 423 [running]:
github.com/lni/goutils/logutil/capnslog.(*PackageLogger).Panicf(0x1400128c060, {0x1039581df?, 0x10?}, {0x14001412630?, 0x14000591070?, 0x14000054dd0?})
	/Users/candostyavuz/go/pkg/mod/github.com/lni/goutils@v1.4.0/logutil/capnslog/pkg_logger.go:88 +0xb4
github.com/lni/dragonboat/v4/logger.(*capnsLog).Panicf(0x14000591050?, {0x1039581df?, 0x10?}, {0x14001412630?, 0x103cefc01?, 0x140003e0098?})
	/Users/candostyavuz/go/pkg/mod/github.com/lni/dragonboat/v4@v4.0.0-20231222133740-1d6e2d76cd57/logger/capnslogger.go:74 +0x28
github.com/lni/dragonboat/v4/logger.(*dragonboatLogger).Panicf(0x140003e00b0?, {0x1039581df, 0x30}, {0x14001412630, 0x3, 0x3})
	/Users/candostyavuz/go/pkg/mod/github.com/lni/dragonboat/v4@v4.0.0-20231222133740-1d6e2d76cd57/logger/logger.go:135 +0x54
github.com/lni/dragonboat/v4/internal/rsm.(*StateMachine).checkPartialSnapshotApplyOnDiskSM(0x14000572400, {{0x140001f4d20, 0xd1}, 0x0, 0x6a, 0x2, {0x65, 0x14001412390, 0x0, 0x0, ...}, ...}, ...)

or sometimes:

panic: invalid commitTo index 105, lastIndex() 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants