From 15355d783bb2602dba115523d426f6f2bd708a8a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 8 Feb 2021 06:26:33 -0800 Subject: [PATCH 1/4] For larger superclusters we send out our server information after a short delay on startup. For determing leaderless make sure the raft node has been running for long enough. Signed-off-by: Derek Collison --- server/events.go | 5 +++-- server/jetstream_cluster.go | 14 ++++++++++++-- server/jetstream_cluster_test.go | 2 +- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/server/events.go b/server/events.go index 90522c14..44d1e5d7 100644 --- a/server/events.go +++ b/server/events.go @@ -1,4 +1,4 @@ -// Copyright 2018-2020 The NATS Authors +// Copyright 2018-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -567,7 +567,8 @@ func (s *Server) heartbeatStatsz() { // This should be wrapChk() to setup common locking. func (s *Server) startStatszTimer() { - s.sys.stmr = time.AfterFunc(s.sys.statsz, s.wrapChk(s.heartbeatStatsz)) + // Send out the first one only after a second. + s.sys.stmr = time.AfterFunc(time.Second, s.wrapChk(s.heartbeatStatsz)) } // Start a ticker that will fire periodically and check for orphaned servers. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 091399f8..f70674e9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -539,7 +539,14 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { if rg.node == nil { return false } - return rg.node.GroupLeader() == _EMPTY_ + // If we don't have a leader. + if rg.node.GroupLeader() == _EMPTY_ { + // Make sure we have been running for enough time. + if time.Since(rg.node.Created()) > lostQuorumInterval { + return true + } + } + return false } func (s *Server) JetStreamIsStreamAssigned(account, stream string) bool { @@ -747,6 +754,7 @@ type writeableStreamAssignment struct { func (js *jetStream) metaSnapshot() []byte { var streams []writeableStreamAssignment + js.mu.RLock() cc := js.cluster for _, asa := range cc.streams { @@ -764,13 +772,15 @@ func (js *jetStream) metaSnapshot() []byte { streams = append(streams, wsa) } } - js.mu.RUnlock() if len(streams) == 0 { + js.mu.RUnlock() return nil } b, _ := json.Marshal(streams) + js.mu.RUnlock() + return s2.EncodeBetter(nil, b) } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 4f80abd3..03954a30 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3097,7 +3097,7 @@ func TestJetStreamClusterStepDown(t *testing.T) { } checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { - ci, err := sub.ConsumerInfo() + ci, err := js.ConsumerInfo("TEST", "cat") if err != nil { return fmt.Errorf("Could not fetch consumer info: %v", err) } From 594defa11bde8823278d0657e2ff03e854e7baec Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 8 Feb 2021 06:32:05 -0800 Subject: [PATCH 2/4] Fix for crash in test Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f70674e9..5e709fbb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -610,6 +610,10 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool { if cc == nil { return true } + if cc.meta == nil { + return false + } + var sa *streamAssignment if as := cc.streams[account]; as != nil { sa = as[stream] @@ -639,6 +643,10 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b if cc == nil { return true } + if cc.meta == nil { + return false + } + var sa *streamAssignment if as := cc.streams[account]; as != nil { sa = as[stream] From 7ae8d66c6d877084cfd6717a1b88c299fd5e8ad4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 8 Feb 2021 06:46:57 -0800 Subject: [PATCH 3/4] Do ramp down on server stats on startup Signed-off-by: Derek Collison --- server/events.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/events.go b/server/events.go index 44d1e5d7..9eba3c75 100644 --- a/server/events.go +++ b/server/events.go @@ -94,6 +94,7 @@ type internal struct { orphMax time.Duration chkOrph time.Duration statsz time.Duration + cstatsz time.Duration shash string inboxPre string } @@ -560,15 +561,22 @@ func (s *Server) sendStatsz(subj string) { // This should be wrapChk() to setup common locking. func (s *Server) heartbeatStatsz() { if s.sys.stmr != nil { - s.sys.stmr.Reset(s.sys.statsz) + // Increase after startup to our max. + s.sys.cstatsz *= 4 + if s.sys.cstatsz > s.sys.statsz { + s.sys.cstatsz = s.sys.statsz + } + s.sys.stmr.Reset(s.sys.cstatsz) } s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) } // This should be wrapChk() to setup common locking. func (s *Server) startStatszTimer() { + // We will start by sending out more of these and trail off to the statsz being the max. + s.sys.cstatsz = time.Second // Send out the first one only after a second. - s.sys.stmr = time.AfterFunc(time.Second, s.wrapChk(s.heartbeatStatsz)) + s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz)) } // Start a ticker that will fire periodically and check for orphaned servers. From c99608fa65a97991bb0cbd3686b9759951bdda15 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 8 Feb 2021 06:57:26 -0800 Subject: [PATCH 4/4] Fix for some test flappers Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 2 +- server/jetstream_cluster_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5e709fbb..821ad0ca 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1936,7 +1936,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { js.mu.Lock() s, cc := js.srv, js.cluster - if s == nil || cc == nil { + if s == nil || cc == nil || cc.meta == nil { // TODO(dlc) - debug at least js.mu.Unlock() return diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 03954a30..3bc3f1ce 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -2563,7 +2563,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { } // Consumer too. Since we do not know if the consumer leader was not the one shutdown // we should wait for a bit for the system to detect. - adv, _ = csub.NextMsg(time.Second) + adv, _ = csub.NextMsg(5 * time.Second) if adv == nil { t.Fatalf("Expected to receive a consumer quorum lost advisory") } @@ -2989,7 +2989,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { // Now check consumer info as well. checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { - ci, err := sub.ConsumerInfo() + ci, err := js.ConsumerInfo("TEST", "cat") if err != nil { return fmt.Errorf("Could not fetch consumer info: %v", err) }