From 39a0cfccca735cdb6f13ec9b17d2ac60567982cc Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 11 Jul 2022 16:15:01 -0600 Subject: [PATCH] [FIXED] JetStream: servers may be reported as orphaned In some situations, a server may report that a remote server is detected as orphaned (and the node is marked as offline). This is because the orphaned detection relies on conns update to be received, however, servers would suppress the update if an account does not have any connections attached. This PR ensures that the update is sent regardless if the account is JS configured (not necessarily enabled at the moment). Signed-off-by: Ivan Kozlovic --- server/events.go | 5 ++-- server/jetstream.go | 7 +++++- server/jetstream_cluster_test.go | 41 ++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/server/events.go b/server/events.go index e160da54..55dfaefd 100644 --- a/server/events.go +++ b/server/events.go @@ -1663,8 +1663,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { Bytes: atomic.LoadInt64(&a.outBytes)}, SlowConsumers: atomic.LoadInt64(&a.slowConsumers), } - // Set timer to fire again unless we are at zero. - if localConns == 0 { + // Set timer to fire again unless we are at zero, but only if the account + // is not configured for JetStream. + if localConns == 0 && !a.jetStreamConfiguredNoLock() { clearTimer(&a.ctmr) } else { // Check to see if we have an HB running and update. diff --git a/server/jetstream.go b/server/jetstream.go index 111fb555..cd2420e6 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1586,7 +1586,12 @@ func (a *Account) jetStreamConfigured() bool { return false } a.mu.RLock() - defer a.mu.RUnlock() + jsc := a.jetStreamConfiguredNoLock() + a.mu.RUnlock() + return jsc +} + +func (a *Account) jetStreamConfiguredNoLock() bool { return len(a.jsLimits) > 0 } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index a44d69b1..04ddf18a 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -11674,3 +11674,44 @@ func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) // With bug we would fail here. c.waitOnStreamCurrent(rs, "$G", "TEST") } + +func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) { + orgEventsHBInterval := eventsHBInterval + eventsHBInterval = 500 * time.Millisecond + defer func() { eventsHBInterval = orgEventsHBInterval }() + + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + checkSysServers := func() { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + for _, s := range c.servers { + s.mu.RLock() + num := len(s.sys.servers) + s.mu.RUnlock() + if num != 2 { + return fmt.Errorf("Expected server %q to have 2 servers, got %v", s, num) + } + } + return nil + }) + } + + checkSysServers() + nc.Close() + + time.Sleep(7 * eventsHBInterval) + checkSysServers() +}