diff --git a/server/raft.go b/server/raft.go index 6241290e..a6b914ea 100644 --- a/server/raft.go +++ b/server/raft.go @@ -347,7 +347,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error if cfg == nil { return nil, errNilCfg } - s.mu.Lock() + s.mu.RLock() if s.sys == nil { s.mu.Unlock() return nil, ErrNoSysAccount @@ -356,7 +356,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error sacc := s.sys.account hash := s.sys.shash pub := s.info.ID - s.mu.Unlock() + s.mu.RUnlock() ps, err := readPeerState(cfg.Store) if err != nil { diff --git a/server/raft_test.go b/server/raft_test.go index 89c983b9..bddce36f 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 The NATS Authors +// Copyright 2021-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,8 +17,55 @@ import ( "math" "math/rand" "testing" + "time" ) +func TestNRGSimple(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + // Do several state transitions. + rg.randomMember().(*stateAdder).proposeDelta(11) + rg.randomMember().(*stateAdder).proposeDelta(11) + rg.randomMember().(*stateAdder).proposeDelta(-22) + // Wait for all members to have the correct state. + rg.waitOnTotal(t, 0) +} + +func TestNRGSnapshotAndRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + var expectedTotal int64 + + leader := rg.leader().(*stateAdder) + sm := rg.nonLeader().(*stateAdder) + + for i := 0; i < 1000; i++ { + delta := rand.Int63n(222) + expectedTotal += delta + leader.proposeDelta(delta) + + if i == 250 { + // Let some things catchup. + time.Sleep(50 * time.Millisecond) + // Snapshot leader and stop and snapshot a member. + leader.snapshot(t) + sm.snapshot(t) + sm.stop() + } + } + // Restart. + sm.restart() + // Wait for all members to have the correct state. + rg.waitOnTotal(t, expectedTotal) +} + func TestNRGAppendEntryEncode(t *testing.T) { ae := &appendEntry{ term: 1,