diff --git a/server/events.go b/server/events.go index 90522c14..9eba3c75 100644 --- a/server/events.go +++ b/server/events.go @@ -1,4 +1,4 @@ -// Copyright 2018-2020 The NATS Authors +// Copyright 2018-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -94,6 +94,7 @@ type internal struct { orphMax time.Duration chkOrph time.Duration statsz time.Duration + cstatsz time.Duration shash string inboxPre string } @@ -560,14 +561,22 @@ func (s *Server) sendStatsz(subj string) { // This should be wrapChk() to setup common locking. func (s *Server) heartbeatStatsz() { if s.sys.stmr != nil { - s.sys.stmr.Reset(s.sys.statsz) + // Increase after startup to our max. + s.sys.cstatsz *= 4 + if s.sys.cstatsz > s.sys.statsz { + s.sys.cstatsz = s.sys.statsz + } + s.sys.stmr.Reset(s.sys.cstatsz) } s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) } // This should be wrapChk() to setup common locking. func (s *Server) startStatszTimer() { - s.sys.stmr = time.AfterFunc(s.sys.statsz, s.wrapChk(s.heartbeatStatsz)) + // We will start by sending out more of these and trail off to the statsz being the max. + s.sys.cstatsz = time.Second + // Send out the first one only after a second. + s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz)) } // Start a ticker that will fire periodically and check for orphaned servers. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 091399f8..821ad0ca 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -539,7 +539,14 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { if rg.node == nil { return false } - return rg.node.GroupLeader() == _EMPTY_ + // If we don't have a leader. + if rg.node.GroupLeader() == _EMPTY_ { + // Make sure we have been running for enough time. + if time.Since(rg.node.Created()) > lostQuorumInterval { + return true + } + } + return false } func (s *Server) JetStreamIsStreamAssigned(account, stream string) bool { @@ -603,6 +610,10 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool { if cc == nil { return true } + if cc.meta == nil { + return false + } + var sa *streamAssignment if as := cc.streams[account]; as != nil { sa = as[stream] @@ -632,6 +643,10 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b if cc == nil { return true } + if cc.meta == nil { + return false + } + var sa *streamAssignment if as := cc.streams[account]; as != nil { sa = as[stream] @@ -747,6 +762,7 @@ type writeableStreamAssignment struct { func (js *jetStream) metaSnapshot() []byte { var streams []writeableStreamAssignment + js.mu.RLock() cc := js.cluster for _, asa := range cc.streams { @@ -764,13 +780,15 @@ func (js *jetStream) metaSnapshot() []byte { streams = append(streams, wsa) } } - js.mu.RUnlock() if len(streams) == 0 { + js.mu.RUnlock() return nil } b, _ := json.Marshal(streams) + js.mu.RUnlock() + return s2.EncodeBetter(nil, b) } @@ -1918,7 +1936,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { js.mu.Lock() s, cc := js.srv, js.cluster - if s == nil || cc == nil { + if s == nil || cc == nil || cc.meta == nil { // TODO(dlc) - debug at least js.mu.Unlock() return diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 4f80abd3..3bc3f1ce 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -2563,7 +2563,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { } // Consumer too. Since we do not know if the consumer leader was not the one shutdown // we should wait for a bit for the system to detect. - adv, _ = csub.NextMsg(time.Second) + adv, _ = csub.NextMsg(5 * time.Second) if adv == nil { t.Fatalf("Expected to receive a consumer quorum lost advisory") } @@ -2989,7 +2989,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { // Now check consumer info as well. checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { - ci, err := sub.ConsumerInfo() + ci, err := js.ConsumerInfo("TEST", "cat") if err != nil { return fmt.Errorf("Could not fetch consumer info: %v", err) } @@ -3097,7 +3097,7 @@ func TestJetStreamClusterStepDown(t *testing.T) { } checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { - ci, err := sub.ConsumerInfo() + ci, err := js.ConsumerInfo("TEST", "cat") if err != nil { return fmt.Errorf("Could not fetch consumer info: %v", err) }