mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] JetStream: servers may be reported as orphaned
In some situations, a server may report that a remote server is detected as orphaned (and the node is marked as offline). This is because the orphaned detection relies on conns update to be received, however, servers would suppress the update if an account does not have any connections attached. This PR ensures that the update is sent regardless if the account is JS configured (not necessarily enabled at the moment). Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1663,8 +1663,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
|
||||
Bytes: atomic.LoadInt64(&a.outBytes)},
|
||||
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
|
||||
}
|
||||
// Set timer to fire again unless we are at zero.
|
||||
if localConns == 0 {
|
||||
// Set timer to fire again unless we are at zero, but only if the account
|
||||
// is not configured for JetStream.
|
||||
if localConns == 0 && !a.jetStreamConfiguredNoLock() {
|
||||
clearTimer(&a.ctmr)
|
||||
} else {
|
||||
// Check to see if we have an HB running and update.
|
||||
|
||||
@@ -1586,7 +1586,12 @@ func (a *Account) jetStreamConfigured() bool {
|
||||
return false
|
||||
}
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
jsc := a.jetStreamConfiguredNoLock()
|
||||
a.mu.RUnlock()
|
||||
return jsc
|
||||
}
|
||||
|
||||
func (a *Account) jetStreamConfiguredNoLock() bool {
|
||||
return len(a.jsLimits) > 0
|
||||
}
|
||||
|
||||
|
||||
@@ -11674,3 +11674,44 @@ func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T)
|
||||
// With bug we would fail here.
|
||||
c.waitOnStreamCurrent(rs, "$G", "TEST")
|
||||
}
|
||||
|
||||
func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) {
|
||||
orgEventsHBInterval := eventsHBInterval
|
||||
eventsHBInterval = 500 * time.Millisecond
|
||||
defer func() { eventsHBInterval = orgEventsHBInterval }()
|
||||
|
||||
c := createJetStreamClusterExplicit(t, "R3F", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
checkSysServers := func() {
|
||||
t.Helper()
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
for _, s := range c.servers {
|
||||
s.mu.RLock()
|
||||
num := len(s.sys.servers)
|
||||
s.mu.RUnlock()
|
||||
if num != 2 {
|
||||
return fmt.Errorf("Expected server %q to have 2 servers, got %v", s, num)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
checkSysServers()
|
||||
nc.Close()
|
||||
|
||||
time.Sleep(7 * eventsHBInterval)
|
||||
checkSysServers()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user