From 7547093b24ac34bf12199ad922a910ccecf9261f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Apr 2023 08:38:30 -0700 Subject: [PATCH 1/6] _EMPTY_ Signed-off-by: Derek Collison --- server/opts.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/opts.go b/server/opts.go index 4d07f46a..7c34bc21 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1132,8 +1132,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error del := false hdel := false hdel_set := false - dir := "" - dirType := "" + dir := _EMPTY_ + dirType := _EMPTY_ limit := int64(0) ttl := time.Duration(0) sync := time.Duration(0) From 0d2269b1e9557db0a198d3e6d29390279b8eed4a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Apr 2023 10:27:36 -0700 Subject: [PATCH 2/6] Move stepdowns of streams and consumers to not be inline Signed-off-by: Derek Collison --- server/jetstream_api.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index cfe06442..8229b5a0 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1967,18 +1967,24 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * return } - // Call actual stepdown. - if mset != nil { + if mset == nil { + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + return + } + + // Call actual stepdown. Do this in a Go routine. + go func() { if node := mset.raftNode(); node != nil { mset.setLeader(false) // TODO (mh) eventually make sure all go routines exited and all channels are cleared time.Sleep(250 * time.Millisecond) node.StepDown() } - } - resp.Success = true - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }() } // Request to have a consumer leader stepdown. @@ -2073,16 +2079,23 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ return } - // Call actual stepdown. - if n := o.raftNode(); n != nil { + n := o.raftNode() + if n == nil { + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + return + } + + // Call actual stepdown. Do this in a Go routine. + go func() { o.setLeader(false) // TODO (mh) eventually make sure all go routines exited and all channels are cleared time.Sleep(250 * time.Millisecond) n.StepDown() - } - resp.Success = true - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }() } // Request to remove a peer from a clustered stream. From dee5229f9b9fb7fe792dca5cc66f673f3eaece37 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Apr 2023 10:37:31 -0700 Subject: [PATCH 3/6] Fix data race Signed-off-by: Derek Collison --- server/jetstream_api.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index cfe06442..7a7ce2c1 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -890,10 +890,17 @@ func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply const errRespDelay = 500 * time.Millisecond func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup) { + js := s.getJetStream() + if js == nil { + return + } var quitCh <-chan struct{} + js.mu.RLock() if rg != nil && rg.node != nil { quitCh = rg.node.QuitC() } + js.mu.RUnlock() + s.startGoRoutine(func() { defer s.grWG.Done() select { From a60f6238424843ddf544ed493cc2d1cde9e84ab3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Apr 2023 10:39:30 -0700 Subject: [PATCH 4/6] Fix flapping test since claims not processed inline now Signed-off-by: Derek Collison --- server/events_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/events_test.go b/server/events_test.go index 28bc4c9c..1c416570 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1515,6 +1515,7 @@ func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) { nc.Publish(claimUpdateSubj, []byte(ajwt2)) } nc.Flush() + time.Sleep(50 * time.Millisecond) if startSubs < s.NumSubscriptions() { t.Fatalf("Subscriptions leaked: %d vs %d", startSubs, s.NumSubscriptions()) From c16915bff47325c32beddddf0d1b11841f19c723 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Apr 2023 11:56:55 -0700 Subject: [PATCH 5/6] For checking the health of jetstream, do not hold the lock as we traverse the streams and consumers. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 46 ++++++++----------------------- server/monitor.go | 55 +++++++++++++++++++++++++------------ 2 files changed, 49 insertions(+), 52 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 924cf4f2..956648c4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -436,63 +436,39 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. -// Read lock should be held. -func (js *jetStream) isStreamHealthy(account, stream string) bool { +func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster if cc == nil { // Non-clustered mode return true } - as := cc.streams[account] - if as == nil { - return false - } - sa := as[stream] - if sa == nil { - return false - } rg := sa.Group if rg == nil { return false } - - if rg.node == nil || rg.node.Healthy() { + if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) { // Check if we are processing a snapshot and are catching up. - acc, err := cc.s.LookupAccount(account) - if err != nil { - return false + if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() { + return true } - mset, err := acc.lookupStream(stream) - if err != nil { - return false - } - if mset.isCatchingUp() { - return false - } - // Success. - return true } - return false } // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. -// Read lock should be held. -func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool { +func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster if cc == nil { // Non-clustered mode return true } - acc, err := cc.s.LookupAccount(account) - if err != nil { - return false - } - mset, err := acc.lookupStream(stream) - if err != nil { - return false - } o := mset.lookupConsumer(consumer) if o == nil { return false diff --git a/server/monitor.go b/server/monitor.go index 4adbffb8..2e72861f 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2022 The NATS Authors +// Copyright 2013-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3126,33 +3126,54 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // If they are assigned to this server check their status. ourID := meta.ID() - // TODO(dlc) - Might be better here to not hold the lock the whole time. + // Copy the meta layer so we do not need to hold the js read lock for an extended period of time. js.mu.RLock() - defer js.mu.RUnlock() - + streams := make(map[string]map[string]*streamAssignment, len(cc.streams)) for acc, asa := range cc.streams { + nasa := make(map[string]*streamAssignment) for stream, sa := range asa { if sa.Group.isMember(ourID) { - // Make sure we can look up - if !js.isStreamHealthy(acc, stream) { - health.Status = na - health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream) - return health - } - // Now check consumers. + csa := sa.copyGroup() + csa.consumers = make(map[string]*consumerAssignment) for consumer, ca := range sa.consumers { if ca.Group.isMember(ourID) { - if !js.isConsumerCurrent(acc, stream, consumer) { - health.Status = na - health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) - return health - } + csa.consumers[consumer] = ca.copyGroup() } } + nasa[stream] = csa + } + } + streams[acc] = nasa + } + js.mu.RUnlock() + + // Use our copy to traverse so we do not need to hold the js lock. + for accName, asa := range streams { + acc, err := s.LookupAccount(accName) + if err != nil && len(asa) > 0 { + health.Status = na + health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err) + return health + } + + for stream, sa := range asa { + // Make sure we can look up + if !js.isStreamHealthy(acc, sa) { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream) + return health + } + mset, _ := acc.lookupStream(stream) + // Now check consumers. + for consumer, ca := range sa.consumers { + if !js.isConsumerCurrent(mset, consumer, ca) { + health.Status = na + health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) + return health + } } } } - // Success. return health } From dbde8aba361c696b7404a690fafb6946a483c55f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Apr 2023 14:27:04 -0700 Subject: [PATCH 6/6] Make sure on reverse match to compensate for wider target subjects Signed-off-by: Derek Collison --- server/sublist.go | 19 ++++++++++++++++--- server/sublist_test.go | 16 +++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index a77f9f98..bcf271d8 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1,4 +1,4 @@ -// Copyright 2016-2020 The NATS Authors +// Copyright 2016-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1529,13 +1529,13 @@ func (s *Sublist) ReverseMatch(subject string) *SublistResult { result := &SublistResult{} - s.Lock() + s.RLock() reverseMatchLevel(s.root, tokens, nil, result) // Check for empty result. if len(result.psubs) == 0 && len(result.qsubs) == 0 { result = emptyResult } - s.Unlock() + s.RUnlock() return result } @@ -1553,9 +1553,22 @@ func reverseMatchLevel(l *level, toks []string, n *node, results *SublistResult) for _, n := range l.nodes { reverseMatchLevel(n.next, toks[i+1:], n, results) } + if l.pwc != nil { + reverseMatchLevel(l.pwc.next, toks[i+1:], n, results) + } + if l.fwc != nil { + getAllNodes(l, results) + } return } } + // If the sub tree has a fwc at this position, match as well. + if l.fwc != nil { + getAllNodes(l, results) + return + } else if l.pwc != nil { + reverseMatchLevel(l.pwc.next, toks[i+1:], n, results) + } n = l.nodes[t] if n == nil { break diff --git a/server/sublist_test.go b/server/sublist_test.go index a302405a..b9e12a26 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2020 The NATS Authors +// Copyright 2016-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1478,6 +1478,20 @@ func TestSublistReverseMatch(t *testing.T) { verifyMember(r.psubs, fooBarBazSub, t) } +func TestSublistReverseMatchWider(t *testing.T) { + s := NewSublistWithCache() + sub := newSub("uplink.*.*.>") + s.Insert(sub) + + r := s.ReverseMatch("uplink.1.*.*.>") + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub, t) + + r = s.ReverseMatch("uplink.1.2.3.>") + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub, t) +} + func TestSublistMatchWithEmptyTokens(t *testing.T) { for _, test := range []struct { name string