mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Duplicate leader elect and lost advisories to the system account as well.
Also suppress lost quorums to at most once every 10 secs. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -217,6 +217,7 @@ type Consumer struct {
|
||||
ca *consumerAssignment
|
||||
node RaftNode
|
||||
infoSub *subscription
|
||||
lqsent time.Time
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -2606,3 +2607,10 @@ func (o *Consumer) decStreamPending(sseq uint64, subj string) {
|
||||
}
|
||||
o.mu.Unlock()
|
||||
}
|
||||
|
||||
func (o *Consumer) account() *Account {
|
||||
o.mu.RLock()
|
||||
a := o.acc
|
||||
o.mu.RUnlock()
|
||||
return a
|
||||
}
|
||||
|
||||
@@ -1265,32 +1265,12 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
|
||||
|
||||
if isLeader {
|
||||
s.Noticef("JetStream cluster new stream leader for '%s > %s'", sa.Client.Account, stream)
|
||||
if node := mset.raftNode(); node != nil {
|
||||
s.publishAdvisory(mset.account(), JSAdvisoryStreamLeaderElectedPre+"."+stream, &JSStreamLeaderElectedAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSStreamLeaderElectedAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Leader: s.serverNameForNode(node.GroupLeader()),
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
}
|
||||
s.sendStreamLeaderElectAdvisory(mset)
|
||||
} else {
|
||||
// We are stepping down. Make sure if we are doing so because we have lost quorum that
|
||||
// we send the appropriate advisories.
|
||||
// We are stepping down.
|
||||
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
|
||||
if node := mset.raftNode(); node != nil && !node.Quorum() {
|
||||
s.Warnf("JetStream cluster stream '%s > %s' has lost quorum, stalled.", sa.Client.Account, stream)
|
||||
s.publishAdvisory(mset.account(), JSAdvisoryStreamQuorumLostPre+"."+stream, &JSStreamQuorumLostAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSStreamQuorumLostAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
s.sendStreamLostQuorumAdvisory(mset)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1316,6 +1296,83 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
|
||||
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
|
||||
}
|
||||
|
||||
// Fixed value ok for now.
|
||||
const lostQuorumAdvInterval = 10 * time.Second
|
||||
|
||||
// Determines if we should send lost quorum advisory. We throttle these after first one.
|
||||
func (mset *Stream) shouldSendLostQuorum() bool {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
if time.Since(mset.lqsent) >= lostQuorumAdvInterval {
|
||||
mset.lqsent = time.Now()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Server) sendStreamLostQuorumAdvisory(mset *Stream) {
|
||||
if mset == nil {
|
||||
return
|
||||
}
|
||||
node, stream, acc := mset.raftNode(), mset.Name(), mset.account()
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
if !mset.shouldSendLostQuorum() {
|
||||
return
|
||||
}
|
||||
|
||||
s.Warnf("JetStream cluster stream '%s > %s' has NO quorum, stalled.", acc.GetName(), stream)
|
||||
|
||||
subj := JSAdvisoryStreamQuorumLostPre + "." + stream
|
||||
adv := &JSStreamQuorumLostAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSStreamQuorumLostAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Replicas: s.replicas(node),
|
||||
}
|
||||
|
||||
// Send to the user's account if not the system account.
|
||||
if acc != s.SystemAccount() {
|
||||
s.publishAdvisory(acc, subj, adv)
|
||||
}
|
||||
// Now do system level one. Place account info in adv, and nil account means system.
|
||||
adv.Account = acc.GetName()
|
||||
s.publishAdvisory(nil, subj, adv)
|
||||
}
|
||||
|
||||
func (s *Server) sendStreamLeaderElectAdvisory(mset *Stream) {
|
||||
if mset == nil {
|
||||
return
|
||||
}
|
||||
node, stream, acc := mset.raftNode(), mset.Name(), mset.account()
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
subj := JSAdvisoryStreamLeaderElectedPre + "." + stream
|
||||
adv := &JSStreamLeaderElectedAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSStreamLeaderElectedAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Leader: s.serverNameForNode(node.GroupLeader()),
|
||||
Replicas: s.replicas(node),
|
||||
}
|
||||
|
||||
// Send to the user's account if not the system account.
|
||||
if acc != s.SystemAccount() {
|
||||
s.publishAdvisory(acc, subj, adv)
|
||||
}
|
||||
// Now do system level one. Place account info in adv, and nil account means system.
|
||||
adv.Account = acc.GetName()
|
||||
s.publishAdvisory(nil, subj, adv)
|
||||
}
|
||||
|
||||
// Will lookup a stream assignment.
|
||||
// Lock should be held.
|
||||
func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignment) {
|
||||
@@ -2033,34 +2090,12 @@ func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssign
|
||||
|
||||
if isLeader {
|
||||
s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.Account, stream, consumer)
|
||||
if node := o.raftNode(); node != nil {
|
||||
s.publishAdvisory(acc, JSAdvisoryConsumerLeaderElectedPre+"."+stream+"."+consumer, &JSConsumerLeaderElectedAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSConsumerLeaderElectedAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Consumer: consumer,
|
||||
Leader: s.serverNameForNode(node.GroupLeader()),
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
}
|
||||
s.sendConsumerLeaderElectAdvisory(o)
|
||||
} else {
|
||||
// We are stepping down. Make sure if we are doing so because we have lost quorum that
|
||||
// we send the appropriate advisories.
|
||||
// We are stepping down.
|
||||
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
|
||||
if node := o.raftNode(); node != nil && !node.Quorum() {
|
||||
s.Warnf("JetStream cluster consumer '%s > %s >%s' has lost quorum, stalled.", ca.Client.Account, stream, consumer)
|
||||
s.publishAdvisory(acc, JSAdvisoryConsumerQuorumLostPre+"."+stream+"."+consumer, &JSConsumerQuorumLostAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSConsumerQuorumLostAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Consumer: consumer,
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
s.sendConsumerLostQuorumAdvisory(o)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2080,6 +2115,83 @@ func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssign
|
||||
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
|
||||
}
|
||||
|
||||
// Determines if we should send lost quorum advisory. We throttle these after first one.
|
||||
func (o *Consumer) shouldSendLostQuorum() bool {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
if time.Since(o.lqsent) >= lostQuorumAdvInterval {
|
||||
o.lqsent = time.Now()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Server) sendConsumerLostQuorumAdvisory(o *Consumer) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
node, stream, consumer, acc := o.raftNode(), o.Stream(), o.Name(), o.account()
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
if !o.shouldSendLostQuorum() {
|
||||
return
|
||||
}
|
||||
|
||||
s.Warnf("JetStream cluster consumer '%s > %s >%s' has NO quorum, stalled.", acc.GetName(), stream, consumer)
|
||||
|
||||
subj := JSAdvisoryConsumerQuorumLostPre + "." + stream + "." + consumer
|
||||
adv := &JSConsumerQuorumLostAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSConsumerQuorumLostAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Consumer: consumer,
|
||||
Replicas: s.replicas(node),
|
||||
}
|
||||
|
||||
// Send to the user's account if not the system account.
|
||||
if acc != s.SystemAccount() {
|
||||
s.publishAdvisory(acc, subj, adv)
|
||||
}
|
||||
// Now do system level one. Place account info in adv, and nil account means system.
|
||||
adv.Account = acc.GetName()
|
||||
s.publishAdvisory(nil, subj, adv)
|
||||
}
|
||||
|
||||
func (s *Server) sendConsumerLeaderElectAdvisory(o *Consumer) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
node, stream, consumer, acc := o.raftNode(), o.Stream(), o.Name(), o.account()
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
|
||||
subj := JSAdvisoryConsumerLeaderElectedPre + "." + stream + "." + consumer
|
||||
adv := &JSConsumerLeaderElectedAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSConsumerLeaderElectedAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Consumer: consumer,
|
||||
Leader: s.serverNameForNode(node.GroupLeader()),
|
||||
Replicas: s.replicas(node),
|
||||
}
|
||||
|
||||
// Send to the user's account if not the system account.
|
||||
if acc != s.SystemAccount() {
|
||||
s.publishAdvisory(acc, subj, adv)
|
||||
}
|
||||
// Now do system level one. Place account info in adv, and nil account means system.
|
||||
adv.Account = acc.GetName()
|
||||
s.publishAdvisory(nil, subj, adv)
|
||||
}
|
||||
|
||||
type streamAssignmentResult struct {
|
||||
Account string `json:"account"`
|
||||
Stream string `json:"stream"`
|
||||
|
||||
@@ -154,6 +154,7 @@ const JSStreamLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.stream_
|
||||
// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
|
||||
type JSStreamLeaderElectedAdvisory struct {
|
||||
TypedEvent
|
||||
Account string `json:"account,omitempty"`
|
||||
Stream string `json:"stream"`
|
||||
Leader string `json:"leader"`
|
||||
Replicas []*PeerInfo `json:"replicas"`
|
||||
@@ -166,6 +167,7 @@ const JSStreamQuorumLostAdvisoryType = "io.nats.jetstream.advisory.v1.stream_quo
|
||||
// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
|
||||
type JSStreamQuorumLostAdvisory struct {
|
||||
TypedEvent
|
||||
Account string `json:"account,omitempty"`
|
||||
Stream string `json:"stream"`
|
||||
Replicas []*PeerInfo `json:"replicas"`
|
||||
}
|
||||
@@ -176,6 +178,7 @@ const JSConsumerLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.consu
|
||||
// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
|
||||
type JSConsumerLeaderElectedAdvisory struct {
|
||||
TypedEvent
|
||||
Account string `json:"account,omitempty"`
|
||||
Stream string `json:"stream"`
|
||||
Consumer string `json:"consumer"`
|
||||
Leader string `json:"leader"`
|
||||
@@ -189,6 +192,7 @@ const JSConsumerQuorumLostAdvisoryType = "io.nats.jetstream.advisory.v1.consumer
|
||||
// JSConsumerQuorumLostAdvisory indicates that a consumer has lost quorum and is stalled.
|
||||
type JSConsumerQuorumLostAdvisory struct {
|
||||
TypedEvent
|
||||
Account string `json:"account,omitempty"`
|
||||
Stream string `json:"stream"`
|
||||
Consumer string `json:"consumer"`
|
||||
Replicas []*PeerInfo `json:"replicas"`
|
||||
|
||||
@@ -131,6 +131,7 @@ type Stream struct {
|
||||
infoSub *subscription
|
||||
clseq uint64
|
||||
clfs uint64
|
||||
lqsent time.Time
|
||||
}
|
||||
|
||||
// Headers for published messages.
|
||||
@@ -884,12 +885,8 @@ func (mset *Stream) unsubscribeInternal(subject string) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("invalid stream")
|
||||
}
|
||||
if !c.srv.eventsEnabled() {
|
||||
return ErrNoSysAccount
|
||||
}
|
||||
|
||||
var sid []byte
|
||||
|
||||
c.mu.Lock()
|
||||
for _, sub := range c.subs {
|
||||
if subject == string(sub.subject) {
|
||||
|
||||
@@ -201,7 +201,6 @@ func TestJetStreamClusterMultiReplicaStreams(t *testing.T) {
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
// FIXME(dlc) - This should be default.
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo", "bar"},
|
||||
@@ -2393,7 +2392,8 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
if len(lqa.Replicas) != 2 {
|
||||
t.Fatalf("Expected reports for both replicas, only got %d", len(lqa.Replicas))
|
||||
}
|
||||
// Consumer too.
|
||||
// Consumer too. Since we do not know if the consumer leader was not the one shutdown
|
||||
// we should wait for a bit for the system to detect.
|
||||
adv, _ = csub.NextMsg(time.Second)
|
||||
if adv == nil {
|
||||
t.Fatalf("Expected to receive a consumer quorum lost advisory")
|
||||
@@ -2406,6 +2406,10 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
t.Fatalf("Expected reports for both replicas, only got %d", len(clqa.Replicas))
|
||||
}
|
||||
|
||||
// Check to make sure we do not rapid fire these.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
checkSubsPending(t, csub, 0)
|
||||
|
||||
// Now let's take out the other non meta-leader server.
|
||||
// We should get same error for general API calls.
|
||||
c.randomNonLeader().Shutdown()
|
||||
@@ -2520,7 +2524,7 @@ func TestJetStreamClusterStreamPerf(t *testing.T) {
|
||||
var jsClusterTempl = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 2GB, max_file_store: 1GB, store_dir: "%s"}
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
|
||||
Reference in New Issue
Block a user