mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Merge pull request #1895 from nats-io/startfix
Fixes for supercluster startup and JetStream clustering.
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright 2018-2020 The NATS Authors
|
||||
// Copyright 2018-2021 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -94,6 +94,7 @@ type internal struct {
|
||||
orphMax time.Duration
|
||||
chkOrph time.Duration
|
||||
statsz time.Duration
|
||||
cstatsz time.Duration
|
||||
shash string
|
||||
inboxPre string
|
||||
}
|
||||
@@ -560,14 +561,22 @@ func (s *Server) sendStatsz(subj string) {
|
||||
// This should be wrapChk() to setup common locking.
|
||||
func (s *Server) heartbeatStatsz() {
|
||||
if s.sys.stmr != nil {
|
||||
s.sys.stmr.Reset(s.sys.statsz)
|
||||
// Increase after startup to our max.
|
||||
s.sys.cstatsz *= 4
|
||||
if s.sys.cstatsz > s.sys.statsz {
|
||||
s.sys.cstatsz = s.sys.statsz
|
||||
}
|
||||
s.sys.stmr.Reset(s.sys.cstatsz)
|
||||
}
|
||||
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
|
||||
}
|
||||
|
||||
// This should be wrapChk() to setup common locking.
|
||||
func (s *Server) startStatszTimer() {
|
||||
s.sys.stmr = time.AfterFunc(s.sys.statsz, s.wrapChk(s.heartbeatStatsz))
|
||||
// We will start by sending out more of these and trail off to the statsz being the max.
|
||||
s.sys.cstatsz = time.Second
|
||||
// Send out the first one only after a second.
|
||||
s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz))
|
||||
}
|
||||
|
||||
// Start a ticker that will fire periodically and check for orphaned servers.
|
||||
|
||||
@@ -539,7 +539,14 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
|
||||
if rg.node == nil {
|
||||
return false
|
||||
}
|
||||
return rg.node.GroupLeader() == _EMPTY_
|
||||
// If we don't have a leader.
|
||||
if rg.node.GroupLeader() == _EMPTY_ {
|
||||
// Make sure we have been running for enough time.
|
||||
if time.Since(rg.node.Created()) > lostQuorumInterval {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Server) JetStreamIsStreamAssigned(account, stream string) bool {
|
||||
@@ -603,6 +610,10 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool {
|
||||
if cc == nil {
|
||||
return true
|
||||
}
|
||||
if cc.meta == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var sa *streamAssignment
|
||||
if as := cc.streams[account]; as != nil {
|
||||
sa = as[stream]
|
||||
@@ -632,6 +643,10 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b
|
||||
if cc == nil {
|
||||
return true
|
||||
}
|
||||
if cc.meta == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var sa *streamAssignment
|
||||
if as := cc.streams[account]; as != nil {
|
||||
sa = as[stream]
|
||||
@@ -747,6 +762,7 @@ type writeableStreamAssignment struct {
|
||||
|
||||
func (js *jetStream) metaSnapshot() []byte {
|
||||
var streams []writeableStreamAssignment
|
||||
|
||||
js.mu.RLock()
|
||||
cc := js.cluster
|
||||
for _, asa := range cc.streams {
|
||||
@@ -764,13 +780,15 @@ func (js *jetStream) metaSnapshot() []byte {
|
||||
streams = append(streams, wsa)
|
||||
}
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
|
||||
if len(streams) == 0 {
|
||||
js.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
b, _ := json.Marshal(streams)
|
||||
js.mu.RUnlock()
|
||||
|
||||
return s2.EncodeBetter(nil, b)
|
||||
}
|
||||
|
||||
@@ -1918,7 +1936,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
|
||||
func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
|
||||
js.mu.Lock()
|
||||
s, cc := js.srv, js.cluster
|
||||
if s == nil || cc == nil {
|
||||
if s == nil || cc == nil || cc.meta == nil {
|
||||
// TODO(dlc) - debug at least
|
||||
js.mu.Unlock()
|
||||
return
|
||||
|
||||
@@ -2563,7 +2563,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
}
|
||||
// Consumer too. Since we do not know if the consumer leader was not the one shutdown
|
||||
// we should wait for a bit for the system to detect.
|
||||
adv, _ = csub.NextMsg(time.Second)
|
||||
adv, _ = csub.NextMsg(5 * time.Second)
|
||||
if adv == nil {
|
||||
t.Fatalf("Expected to receive a consumer quorum lost advisory")
|
||||
}
|
||||
@@ -2989,7 +2989,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
|
||||
|
||||
// Now check consumer info as well.
|
||||
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
|
||||
ci, err := sub.ConsumerInfo()
|
||||
ci, err := js.ConsumerInfo("TEST", "cat")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not fetch consumer info: %v", err)
|
||||
}
|
||||
@@ -3097,7 +3097,7 @@ func TestJetStreamClusterStepDown(t *testing.T) {
|
||||
}
|
||||
|
||||
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
|
||||
ci, err := sub.ConsumerInfo()
|
||||
ci, err := js.ConsumerInfo("TEST", "cat")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not fetch consumer info: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user