diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 55d9f365..36361f18 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6065,6 +6065,34 @@ func TestJetStreamClusterLeafNodeSPOFMigrateLeaders(t *testing.T) { _, err = nc.Request(dsubj, nil, 500*time.Millisecond) return err }) + + nc, _ = jsClientConnect(t, lnc.randomServer()) + defer nc.Close() + + // Now make sure the consumer, or any other asset, can not become a leader on this node while the leafnode + // is disconnected. + csd := fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "d") + for i := 0; i < 10; i++ { + nc.Request(csd, nil, time.Second) + lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d") + if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl { + t.Fatalf("Consumer leader should not migrate to server without a leafnode connection") + } + } + + // Now make sure once leafnode is back we can have leaders on this server. + cl.reEnableLeafnodes() + checkLeafNodeConnectedCount(t, cl, 2) + + // Make sure we can migrate back to this server now that we are connected. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + nc.Request(csd, nil, time.Second) + lnc.waitOnConsumerLeader(globalAccountName, "TEST", "d") + if lnc.consumerLeader(globalAccountName, "TEST", "d") == cl { + return nil + } + return fmt.Errorf("Not this server yet") + }) } func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) { diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index a5eb1ced..9b9a8230 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1027,6 +1027,14 @@ func (s *Server) closeAndDisableLeafnodes() { } } +// Helper function to re-enable leafnode connections. +func (s *Server) reEnableLeafnodes() { + s.mu.Lock() + // Re-enable leafnodes. + s.leafDisableConnect = false + s.mu.Unlock() +} + // Helper to set the remote migrate feature. func (s *Server) setJetStreamMigrateOnRemoteLeaf() { s.mu.Lock() diff --git a/server/leafnode.go b/server/leafnode.go index 7bd046d4..89acd7f7 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -528,10 +528,45 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) // We have a connection here to a remote server. // Go ahead and create our leaf node and return. s.createLeafNode(conn, rURL, remote, nil) + + // Clear any observer states if we had them. + s.clearObserverState(remote) + return } } +// This will clear any observer state such that stream or consumer assets on this server can become leaders again. +func (s *Server) clearObserverState(remote *leafNodeCfg) { + s.mu.RLock() + accName := remote.LocalAccount + s.mu.RUnlock() + + acc, err := s.LookupAccount(accName) + if err != nil { + s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName) + return + } + + // Walk all streams looking for any clustered stream, skip otherwise. + for _, mset := range acc.streams() { + node := mset.raftNode() + if node == nil { + // Not R>1 + continue + } + // Check consumers + for _, o := range mset.getConsumers() { + if n := o.raftNode(); n != nil { + // Ensure we can become a leader again. + n.SetObserver(false) + } + } + // Ensure we can not become a leader again. + node.SetObserver(false) + } +} + // Check to see if we should migrate any assets from this account. func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) { s.mu.RLock() @@ -544,7 +579,7 @@ func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) { acc, err := s.LookupAccount(accName) if err != nil { - s.Debugf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName) + s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName) return } @@ -558,14 +593,20 @@ func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) { } // Collect any consumers for _, o := range mset.getConsumers() { - if n := o.raftNode(); n != nil && n.Leader() { - n.StepDown() + if n := o.raftNode(); n != nil { + if n.Leader() { + n.StepDown() + } + // Ensure we can not become a leader while in this state. + n.SetObserver(true) } } // Stepdown if this stream was leader. if node.Leader() { node.StepDown() } + // Ensure we can not become a leader while in this state. + node.SetObserver(true) } } @@ -1340,7 +1381,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c c.mergeDenyPermissionsLocked(both, denyAllJs) // When a remote with a system account is present in a server, unless otherwise disabled, the server will be // started in observer mode. Now that it is clear that this not used, turn the observer mode off. - if solicited && meta != nil && meta.isObserver() { + if solicited && meta != nil && meta.IsObserver() { meta.setObserver(false, extNotExtended) c.Debugf("Turning JetStream metadata controller Observer Mode off") // Take note that the domain was not extended to avoid this state from startup. @@ -1361,7 +1402,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c myRemoteDomain, srvDecorated()) // In an extension use case, pin leadership to server remotes connect to. // Therefore, server with a remote that are not already in observer mode, need to be put into it. - if solicited && meta != nil && !meta.isObserver() { + if solicited && meta != nil && !meta.IsObserver() { meta.setObserver(true, extExtended) c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected") // Take note that the domain was not extended to avoid this state next startup. diff --git a/server/raft.go b/server/raft.go index 7f0ad3fb..88077360 100644 --- a/server/raft.go +++ b/server/raft.go @@ -51,6 +51,8 @@ type RaftNode interface { GroupLeader() string HadPreviousLeader() bool StepDown(preferred ...string) error + SetObserver(isObserver bool) + IsObserver() bool Campaign() error ID() string Group() string @@ -1663,12 +1665,17 @@ func (n *raft) electTimer() *time.Timer { return n.elect } -func (n *raft) isObserver() bool { +func (n *raft) IsObserver() bool { n.RLock() defer n.RUnlock() return n.observer } +// Sets the state to observer only. +func (n *raft) SetObserver(isObserver bool) { + n.setObserver(isObserver, extUndetermined) +} + func (n *raft) setObserver(isObserver bool, extSt extensionState) { n.Lock() defer n.Unlock() @@ -1717,7 +1724,7 @@ func (n *raft) runAsFollower() { if n.outOfResources() { n.resetElectionTimeoutWithLock() n.debug("Not switching to candidate, no resources") - } else if n.isObserver() { + } else if n.IsObserver() { n.resetElectWithLock(48 * time.Hour) n.debug("Not switching to candidate, observer only") } else if n.isCatchingUp() {