mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Direct access to JetStream resources would be affected if across a leafnode that was down.
This allows a solciting leafnode config to ask that any JetStream cluster assets that are a current leader have the leader stepdown. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -495,8 +495,13 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
|
||||
if url != rURL.Host {
|
||||
ipStr = fmt.Sprintf(" (%s)", url)
|
||||
}
|
||||
s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
|
||||
conn, err = natsDialTimeout("tcp", url, dialTimeout)
|
||||
if s.isLeafNodeEnabled() {
|
||||
s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
|
||||
conn, err = natsDialTimeout("tcp", url, dialTimeout)
|
||||
} else {
|
||||
s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
|
||||
err = ErrLeafNodeDisabled
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
attempts++
|
||||
@@ -509,6 +514,8 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
|
||||
case <-s.quitCh:
|
||||
return
|
||||
case <-time.After(reconnectDelay):
|
||||
// Check if we should migrate any JetStream assets while this remote is down.
|
||||
s.checkJetStreamMigrate(remote)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -524,6 +531,50 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if we should migrate any assets from this account.
|
||||
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
|
||||
s.mu.RLock()
|
||||
accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !shouldMigrate {
|
||||
return
|
||||
}
|
||||
|
||||
acc, err := s.LookupAccount(accName)
|
||||
if err != nil {
|
||||
s.Debugf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
|
||||
return
|
||||
}
|
||||
|
||||
// Walk all streams looking for any clustered stream, skip otherwise.
|
||||
// If we are the leader force stepdown.
|
||||
for _, mset := range acc.streams() {
|
||||
node := mset.raftNode()
|
||||
if node == nil {
|
||||
// Not R>1
|
||||
continue
|
||||
}
|
||||
// Collect any consumers
|
||||
for _, o := range mset.getConsumers() {
|
||||
if n := o.raftNode(); n != nil && n.Leader() {
|
||||
n.StepDown()
|
||||
}
|
||||
}
|
||||
// Stepdown if this stream was leader.
|
||||
if node.Leader() {
|
||||
node.StepDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper for checking.
|
||||
func (s *Server) isLeafNodeEnabled() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.leafNodeEnabled
|
||||
}
|
||||
|
||||
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
|
||||
// come from the server we connect to.
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user