mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -6065,6 +6065,34 @@ func TestJetStreamClusterLeafNodeSPOFMigrateLeaders(t *testing.T) {
|
||||
_, err = nc.Request(dsubj, nil, 500*time.Millisecond)
|
||||
return err
|
||||
})
|
||||
|
||||
nc, _ = jsClientConnect(t, lnc.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Now make sure the consumer, or any other asset, can not become a leader on this node while the leafnode
|
||||
// is disconnected.
|
||||
csd := fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "d")
|
||||
for i := 0; i < 10; i++ {
|
||||
nc.Request(csd, nil, time.Second)
|
||||
lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d")
|
||||
if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl {
|
||||
t.Fatalf("Consumer leader should not migrate to server without a leafnode connection")
|
||||
}
|
||||
}
|
||||
|
||||
// Now make sure once leafnode is back we can have leaders on this server.
|
||||
cl.reEnableLeafnodes()
|
||||
checkLeafNodeConnectedCount(t, cl, 2)
|
||||
|
||||
// Make sure we can migrate back to this server now that we are connected.
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
nc.Request(csd, nil, time.Second)
|
||||
lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d")
|
||||
if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Not this server yet")
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) {
|
||||
|
||||
@@ -1027,6 +1027,14 @@ func (s *Server) closeAndDisableLeafnodes() {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to re-enable leafnode connections.
|
||||
func (s *Server) reEnableLeafnodes() {
|
||||
s.mu.Lock()
|
||||
// Re-enable leafnodes.
|
||||
s.leafDisableConnect = false
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Helper to set the remote migrate feature.
|
||||
func (s *Server) setJetStreamMigrateOnRemoteLeaf() {
|
||||
s.mu.Lock()
|
||||
|
||||
@@ -528,10 +528,45 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
|
||||
// We have a connection here to a remote server.
|
||||
// Go ahead and create our leaf node and return.
|
||||
s.createLeafNode(conn, rURL, remote, nil)
|
||||
|
||||
// Clear any observer states if we had them.
|
||||
s.clearObserverState(remote)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
|
||||
func (s *Server) clearObserverState(remote *leafNodeCfg) {
|
||||
s.mu.RLock()
|
||||
accName := remote.LocalAccount
|
||||
s.mu.RUnlock()
|
||||
|
||||
acc, err := s.LookupAccount(accName)
|
||||
if err != nil {
|
||||
s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
|
||||
return
|
||||
}
|
||||
|
||||
// Walk all streams looking for any clustered stream, skip otherwise.
|
||||
for _, mset := range acc.streams() {
|
||||
node := mset.raftNode()
|
||||
if node == nil {
|
||||
// Not R>1
|
||||
continue
|
||||
}
|
||||
// Check consumers
|
||||
for _, o := range mset.getConsumers() {
|
||||
if n := o.raftNode(); n != nil {
|
||||
// Ensure we can become a leader again.
|
||||
n.SetObserver(false)
|
||||
}
|
||||
}
|
||||
// Ensure we can not become a leader again.
|
||||
node.SetObserver(false)
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if we should migrate any assets from this account.
|
||||
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
|
||||
s.mu.RLock()
|
||||
@@ -544,7 +579,7 @@ func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
|
||||
|
||||
acc, err := s.LookupAccount(accName)
|
||||
if err != nil {
|
||||
s.Debugf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
|
||||
s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -558,14 +593,20 @@ func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
|
||||
}
|
||||
// Collect any consumers
|
||||
for _, o := range mset.getConsumers() {
|
||||
if n := o.raftNode(); n != nil && n.Leader() {
|
||||
n.StepDown()
|
||||
if n := o.raftNode(); n != nil {
|
||||
if n.Leader() {
|
||||
n.StepDown()
|
||||
}
|
||||
// Ensure we can not become a leader while in this state.
|
||||
n.SetObserver(true)
|
||||
}
|
||||
}
|
||||
// Stepdown if this stream was leader.
|
||||
if node.Leader() {
|
||||
node.StepDown()
|
||||
}
|
||||
// Ensure we can not become a leader while in this state.
|
||||
node.SetObserver(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1340,7 +1381,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
|
||||
c.mergeDenyPermissionsLocked(both, denyAllJs)
|
||||
// When a remote with a system account is present in a server, unless otherwise disabled, the server will be
|
||||
// started in observer mode. Now that it is clear that this not used, turn the observer mode off.
|
||||
if solicited && meta != nil && meta.isObserver() {
|
||||
if solicited && meta != nil && meta.IsObserver() {
|
||||
meta.setObserver(false, extNotExtended)
|
||||
c.Debugf("Turning JetStream metadata controller Observer Mode off")
|
||||
// Take note that the domain was not extended to avoid this state from startup.
|
||||
@@ -1361,7 +1402,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
|
||||
myRemoteDomain, srvDecorated())
|
||||
// In an extension use case, pin leadership to server remotes connect to.
|
||||
// Therefore, server with a remote that are not already in observer mode, need to be put into it.
|
||||
if solicited && meta != nil && !meta.isObserver() {
|
||||
if solicited && meta != nil && !meta.IsObserver() {
|
||||
meta.setObserver(true, extExtended)
|
||||
c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
|
||||
// Take note that the domain was not extended to avoid this state next startup.
|
||||
|
||||
@@ -51,6 +51,8 @@ type RaftNode interface {
|
||||
GroupLeader() string
|
||||
HadPreviousLeader() bool
|
||||
StepDown(preferred ...string) error
|
||||
SetObserver(isObserver bool)
|
||||
IsObserver() bool
|
||||
Campaign() error
|
||||
ID() string
|
||||
Group() string
|
||||
@@ -1663,12 +1665,17 @@ func (n *raft) electTimer() *time.Timer {
|
||||
return n.elect
|
||||
}
|
||||
|
||||
func (n *raft) isObserver() bool {
|
||||
func (n *raft) IsObserver() bool {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.observer
|
||||
}
|
||||
|
||||
// Sets the state to observer only.
|
||||
func (n *raft) SetObserver(isObserver bool) {
|
||||
n.setObserver(isObserver, extUndetermined)
|
||||
}
|
||||
|
||||
func (n *raft) setObserver(isObserver bool, extSt extensionState) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
@@ -1717,7 +1724,7 @@ func (n *raft) runAsFollower() {
|
||||
if n.outOfResources() {
|
||||
n.resetElectionTimeoutWithLock()
|
||||
n.debug("Not switching to candidate, no resources")
|
||||
} else if n.isObserver() {
|
||||
} else if n.IsObserver() {
|
||||
n.resetElectWithLock(48 * time.Hour)
|
||||
n.debug("Not switching to candidate, observer only")
|
||||
} else if n.isCatchingUp() {
|
||||
|
||||
Reference in New Issue
Block a user