diff --git a/server/events_test.go b/server/events_test.go index 743a9626..e3a12857 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1515,6 +1515,7 @@ func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) { nc.Publish(claimUpdateSubj, []byte(ajwt2)) } nc.Flush() + time.Sleep(50 * time.Millisecond) if startSubs < s.NumSubscriptions() { t.Fatalf("Subscriptions leaked: %d vs %d", startSubs, s.NumSubscriptions()) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index ec318b24..0786b97d 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -894,10 +894,17 @@ func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply const errRespDelay = 500 * time.Millisecond func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup) { + js := s.getJetStream() + if js == nil { + return + } var quitCh <-chan struct{} + js.mu.RLock() if rg != nil && rg.node != nil { quitCh = rg.node.QuitC() } + js.mu.RUnlock() + s.startGoRoutine(func() { defer s.grWG.Done() select { @@ -1971,18 +1978,24 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * return } - // Call actual stepdown. - if mset != nil { + if mset == nil { + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + return + } + + // Call actual stepdown. Do this in a Go routine. + go func() { if node := mset.raftNode(); node != nil { mset.setLeader(false) // TODO (mh) eventually make sure all go routines exited and all channels are cleared time.Sleep(250 * time.Millisecond) node.StepDown() } - } - resp.Success = true - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }() } // Request to have a consumer leader stepdown. @@ -2077,16 +2090,23 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ return } - // Call actual stepdown. - if n := o.raftNode(); n != nil { + n := o.raftNode() + if n == nil { + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + return + } + + // Call actual stepdown. Do this in a Go routine. + go func() { o.setLeader(false) // TODO (mh) eventually make sure all go routines exited and all channels are cleared time.Sleep(250 * time.Millisecond) n.StepDown() - } - resp.Success = true - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }() } // Request to remove a peer from a clustered stream. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5cf4f7dd..1f6c163a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -436,63 +436,39 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. -// Read lock should be held. -func (js *jetStream) isStreamHealthy(account, stream string) bool { +func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster if cc == nil { // Non-clustered mode return true } - as := cc.streams[account] - if as == nil { - return false - } - sa := as[stream] - if sa == nil { - return false - } rg := sa.Group if rg == nil { return false } - - if rg.node == nil || rg.node.Healthy() { + if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) { // Check if we are processing a snapshot and are catching up. - acc, err := cc.s.LookupAccount(account) - if err != nil { - return false + if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() { + return true } - mset, err := acc.lookupStream(stream) - if err != nil { - return false - } - if mset.isCatchingUp() { - return false - } - // Success. - return true } - return false } // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. -// Read lock should be held. -func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool { +func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster if cc == nil { // Non-clustered mode return true } - acc, err := cc.s.LookupAccount(account) - if err != nil { - return false - } - mset, err := acc.lookupStream(stream) - if err != nil { - return false - } o := mset.lookupConsumer(consumer) if o == nil { return false diff --git a/server/monitor.go b/server/monitor.go index 801f3f69..11d85a8e 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2022 The NATS Authors +// Copyright 2013-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3142,33 +3142,54 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // If they are assigned to this server check their status. ourID := meta.ID() - // TODO(dlc) - Might be better here to not hold the lock the whole time. + // Copy the meta layer so we do not need to hold the js read lock for an extended period of time. js.mu.RLock() - defer js.mu.RUnlock() - + streams := make(map[string]map[string]*streamAssignment, len(cc.streams)) for acc, asa := range cc.streams { + nasa := make(map[string]*streamAssignment) for stream, sa := range asa { if sa.Group.isMember(ourID) { - // Make sure we can look up - if !js.isStreamHealthy(acc, stream) { - health.Status = na - health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream) - return health - } - // Now check consumers. + csa := sa.copyGroup() + csa.consumers = make(map[string]*consumerAssignment) for consumer, ca := range sa.consumers { if ca.Group.isMember(ourID) { - if !js.isConsumerCurrent(acc, stream, consumer) { - health.Status = na - health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) - return health - } + csa.consumers[consumer] = ca.copyGroup() } } + nasa[stream] = csa + } + } + streams[acc] = nasa + } + js.mu.RUnlock() + + // Use our copy to traverse so we do not need to hold the js lock. + for accName, asa := range streams { + acc, err := s.LookupAccount(accName) + if err != nil && len(asa) > 0 { + health.Status = na + health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err) + return health + } + + for stream, sa := range asa { + // Make sure we can look up + if !js.isStreamHealthy(acc, sa) { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream) + return health + } + mset, _ := acc.lookupStream(stream) + // Now check consumers. + for consumer, ca := range sa.consumers { + if !js.isConsumerCurrent(mset, consumer, ca) { + health.Status = na + health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) + return health + } } } } - // Success. return health } diff --git a/server/opts.go b/server/opts.go index c456ff7f..eb26359a 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1161,7 +1161,6 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error hdel_set := false dir := _EMPTY_ dirType := _EMPTY_ - limit := int64(0) ttl := time.Duration(0) sync := time.Duration(0) diff --git a/server/sublist.go b/server/sublist.go index 1a1f353f..47d45999 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1,4 +1,4 @@ -// Copyright 2016-2020 The NATS Authors +// Copyright 2016-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1533,13 +1533,13 @@ func (s *Sublist) ReverseMatch(subject string) *SublistResult { result := &SublistResult{} - s.Lock() + s.RLock() reverseMatchLevel(s.root, tokens, nil, result) // Check for empty result. if len(result.psubs) == 0 && len(result.qsubs) == 0 { result = emptyResult } - s.Unlock() + s.RUnlock() return result } @@ -1557,9 +1557,22 @@ func reverseMatchLevel(l *level, toks []string, n *node, results *SublistResult) for _, n := range l.nodes { reverseMatchLevel(n.next, toks[i+1:], n, results) } + if l.pwc != nil { + reverseMatchLevel(l.pwc.next, toks[i+1:], n, results) + } + if l.fwc != nil { + getAllNodes(l, results) + } return } } + // If the sub tree has a fwc at this position, match as well. + if l.fwc != nil { + getAllNodes(l, results) + return + } else if l.pwc != nil { + reverseMatchLevel(l.pwc.next, toks[i+1:], n, results) + } n = l.nodes[t] if n == nil { break diff --git a/server/sublist_test.go b/server/sublist_test.go index a302405a..b9e12a26 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2020 The NATS Authors +// Copyright 2016-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1478,6 +1478,20 @@ func TestSublistReverseMatch(t *testing.T) { verifyMember(r.psubs, fooBarBazSub, t) } +func TestSublistReverseMatchWider(t *testing.T) { + s := NewSublistWithCache() + sub := newSub("uplink.*.*.>") + s.Insert(sub) + + r := s.ReverseMatch("uplink.1.*.*.>") + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub, t) + + r = s.ReverseMatch("uplink.1.2.3.>") + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub, t) +} + func TestSublistMatchWithEmptyTokens(t *testing.T) { for _, test := range []struct { name string