diff --git a/server/events.go b/server/events.go index e160da54..55dfaefd 100644 --- a/server/events.go +++ b/server/events.go @@ -1663,8 +1663,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { Bytes: atomic.LoadInt64(&a.outBytes)}, SlowConsumers: atomic.LoadInt64(&a.slowConsumers), } - // Set timer to fire again unless we are at zero. - if localConns == 0 { + // Set timer to fire again unless we are at zero, but only if the account + // is not configured for JetStream. + if localConns == 0 && !a.jetStreamConfiguredNoLock() { clearTimer(&a.ctmr) } else { // Check to see if we have an HB running and update. diff --git a/server/jetstream.go b/server/jetstream.go index 111fb555..cd2420e6 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1586,7 +1586,12 @@ func (a *Account) jetStreamConfigured() bool { return false } a.mu.RLock() - defer a.mu.RUnlock() + jsc := a.jetStreamConfiguredNoLock() + a.mu.RUnlock() + return jsc +} + +func (a *Account) jetStreamConfiguredNoLock() bool { return len(a.jsLimits) > 0 } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index a44d69b1..04ddf18a 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -11674,3 +11674,44 @@ func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) // With bug we would fail here. c.waitOnStreamCurrent(rs, "$G", "TEST") } + +func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) { + orgEventsHBInterval := eventsHBInterval + eventsHBInterval = 500 * time.Millisecond + defer func() { eventsHBInterval = orgEventsHBInterval }() + + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + checkSysServers := func() { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + for _, s := range c.servers { + s.mu.RLock() + num := len(s.sys.servers) + s.mu.RUnlock() + if num != 2 { + return fmt.Errorf("Expected server %q to have 2 servers, got %v", s, num) + } + } + return nil + }) + } + + checkSysServers() + nc.Close() + + time.Sleep(7 * eventsHBInterval) + checkSysServers() +}