From d04763eb7d6775a19e7a720aa074b92f57b4a399 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 24 Aug 2022 17:13:26 -0700 Subject: [PATCH] CAS operations improved, hold lock past store. Use separate lock for consumer list and storage updates. Signed-off-by: Derek Collison --- server/consumer.go | 3 +- server/norace_test.go | 72 +++++++++++++ server/stream.go | 235 ++++++++++++++++++++++-------------------- 3 files changed, 197 insertions(+), 113 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 0ca84539..30c65cc9 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4150,7 +4150,8 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { // If it was pending process it like an ack. // TODO(dlc) - we could do a term here instead with a reason to generate the advisory. if wasPending { - o.processAckMsg(sseq, p.Sequence, rdc, false) + // We could have lock for stream so do this in a go routine. + go o.processTerm(sseq, p.Sequence, rdc) } } diff --git a/server/norace_test.go b/server/norace_test.go index c0ba0347..ff730b4d 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -20,6 +20,7 @@ import ( "bufio" "bytes" "compress/gzip" + "context" "encoding/binary" "encoding/json" "fmt" @@ -5444,3 +5445,74 @@ func TestNoRaceJetStreamFileStoreLargeKVAccessTiming(t *testing.T) { t.Fatalf("Took too long to look up last key by subject vs first: %v vs %v", base, slow) } } + +func TestNoRaceJetStreamKVLock(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "LOCKS"}) + require_NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var wg sync.WaitGroup + start := make(chan bool) + + var tracker int64 + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + kv, err := js.KeyValue("LOCKS") + require_NoError(t, err) + + <-start + + for { + last, err := kv.Create("MY_LOCK", []byte("Z")) + if err != nil { + select { + case <-time.After(10 * time.Millisecond): + continue + case <-ctx.Done(): + return + } + } + + if v := atomic.AddInt64(&tracker, 1); v != 1 { + t.Logf("TRACKER NOT 1 -> %d\n", v) + cancel() + } + + time.Sleep(10 * time.Millisecond) + if v := atomic.AddInt64(&tracker, -1); v != 0 { + t.Logf("TRACKER NOT 0 AFTER RELEASE -> %d\n", v) + cancel() + } + + err = kv.Delete("MY_LOCK", nats.LastRevision(last)) + if err != nil { + t.Logf("Could not unlock for last %d: %v", last, err) + } + + if ctx.Err() != nil { + return + } + } + }() + } + + close(start) + wg.Wait() +} diff --git a/server/stream.go b/server/stream.go index df603e0f..cece5836 100644 --- a/server/stream.go +++ b/server/stream.go @@ -219,6 +219,10 @@ type stream struct { // For republishing. tr *transform + // For processing consumers as a list without main stream lock. + clsMu sync.RWMutex + cList []*consumer + // TODO(dlc) - Hide everything below behind two pointers. // Clustered mode. sa *streamAssignment @@ -1534,23 +1538,17 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) // Purge will remove all messages from the stream and underlying store based on the request. func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) { - mset.mu.Lock() - if mset.client == nil { - mset.mu.Unlock() + mset.mu.RLock() + if mset.client == nil || mset.store == nil { + mset.mu.RUnlock() return 0, errors.New("invalid stream") } if mset.cfg.Sealed { + mset.mu.RUnlock() return 0, errors.New("sealed stream") } - var _obs [4]*consumer - obs := _obs[:0] - for _, o := range mset.consumers { - if preq != nil && !o.isFilteredMatch(preq.Subject) { - continue - } - obs = append(obs, o) - } - mset.mu.Unlock() + store := mset.store + mset.mu.RUnlock() if preq != nil { purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep) @@ -1563,19 +1561,24 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err // Purge consumers. var state StreamState - mset.store.FastState(&state) - fseq := state.FirstSeq - lseq := state.LastSeq + store.FastState(&state) + fseq, lseq := state.FirstSeq, state.LastSeq // Check for filtered purge. if preq != nil && preq.Subject != _EMPTY_ { - ss := mset.store.FilteredState(state.FirstSeq, preq.Subject) + ss := store.FilteredState(state.FirstSeq, preq.Subject) fseq = ss.First } - for _, o := range obs { + mset.clsMu.RLock() + for _, o := range mset.cList { + if preq != nil && !o.isFilteredMatch(preq.Subject) { + continue + } o.purge(fseq, lseq) } + mset.clsMu.RUnlock() + return purged, nil } @@ -3203,21 +3206,12 @@ func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) { // If we have a single negative update then we will process our consumers for stream pending. // Purge and Store handled separately inside individual calls. if md == -1 && seq > 0 { - // We need to pull these out here and release the lock, even and RLock. RLocks are allowed to - // be reentrant, however once anyone signals interest in a write lock any subsequent RLocks - // will block. decStreamPending can try to re-acquire the RLock for this stream. - var _cl [8]*consumer - cl := _cl[:0] - - mset.mu.RLock() - for _, o := range mset.consumers { - cl = append(cl, o) - } - mset.mu.RUnlock() - - for _, o := range cl { + // We use our consumer list mutex here instead of the main stream lock since it may be held already. + mset.clsMu.RLock() + for _, o := range mset.cList { o.decStreamPending(seq, subj) } + mset.clsMu.RUnlock() } if mset.jsa != nil { @@ -3686,7 +3680,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if sm != nil { fseq = sm.seq } - // If seq passed in is zero that signals we expect no msg to be present. if err == ErrStoreMsgNotFound && seq == 0 { fseq, err = 0, nil } @@ -3773,8 +3766,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if canRespond { resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSStreamMessageExceedsMaximumError() - b, _ := json.Marshal(resp) - mset.outq.sendMsg(reply, b) + response, _ = json.Marshal(resp) + mset.outq.sendMsg(reply, response) } return ErrMaxPayload } @@ -3785,8 +3778,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if canRespond { resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSStreamHeaderExceedsMaximumError() - b, _ := json.Marshal(resp) - mset.outq.sendMsg(reply, b) + response, _ = json.Marshal(resp) + mset.outq.sendMsg(reply, response) } return ErrMaxPayload } @@ -3799,8 +3792,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if canRespond { resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSInsufficientResourcesError() - b, _ := json.Marshal(resp) - mset.outq.sendMsg(reply, b) + response, _ = json.Marshal(resp) + mset.outq.sendMsg(reply, response) } // Stepdown regardless. if node := mset.raftNode(); node != nil { @@ -3818,12 +3811,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } else if mset.numFilter > 0 { // Assume no interest and check to disqualify. noInterest = true - for _, o := range mset.consumers { + mset.clsMu.RLock() + for _, o := range mset.cList { if o.cfg.FilterSubject == _EMPTY_ || subjectIsSubsetMatch(subject, o.cfg.FilterSubject) { noInterest = false break } } + mset.clsMu.RUnlock() } } @@ -3836,17 +3831,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if noInterest { mset.lseq = store.SkipMsg() mset.lmsgId = msgId - mset.mu.Unlock() - + // If we have a msgId make sure to save. + if msgId != _EMPTY_ { + mset.storeMsgIdLocked(&ddentry{msgId, seq, ts}) + } if canRespond { response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...) response = append(response, '}') mset.outq.sendMsg(reply, response) } - // If we have a msgId make sure to save. - if msgId != _EMPTY_ { - mset.storeMsgId(&ddentry{msgId, seq, ts}) - } + mset.mu.Unlock() return nil } @@ -3870,11 +3864,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } republish := tsubj != _EMPTY_ && isLeader - // We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions. - // Currently can not hold while calling store b/c we have inline storage update calls that may need the lock. - // Note that upstream that sets seq/ts should be serialized as much as possible. - mset.mu.Unlock() - // If we are republishing grab last sequence for this exact subject. Aids in gap detection for lightweight clients. if republish { var smv StoreMsg @@ -3894,7 +3883,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if err != nil { // If we did not succeed put those values back and increment clfs in case we are clustered. - mset.mu.Lock() var state StreamState mset.store.FastState(&state) mset.lseq = state.LastSeq @@ -3914,8 +3902,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSStreamStoreFailedError(err, Unless(err)) response, _ = json.Marshal(resp) + mset.outq.sendMsg(reply, response) } - } else if exceeded, apiErr := jsa.limitsExceeded(stype, tierName); exceeded { + return err + } + + if exceeded, apiErr := jsa.limitsExceeded(stype, tierName); exceeded { s.RateLimitWarnf("JetStream resource limits exceeded for account: %q", accName) if canRespond { resp.PubAck = &PubAck{Stream: name} @@ -3925,67 +3917,71 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.Error = apiErr } response, _ = json.Marshal(resp) + mset.outq.sendMsg(reply, response) } // If we did not succeed put those values back. - mset.mu.Lock() var state StreamState mset.store.FastState(&state) mset.lseq = state.LastSeq mset.lmsgId = olmsgId mset.mu.Unlock() store.RemoveMsg(seq) - seq = 0 - } else { - // No errors, this is the normal path. - // If we have a msgId make sure to save. - if msgId != _EMPTY_ { - mset.storeMsgId(&ddentry{msgId, seq, ts}) - } - if rollupSub { - mset.purge(&JSApiStreamPurgeRequest{Subject: subject, Keep: 1}) - } else if rollupAll { - mset.purge(&JSApiStreamPurgeRequest{Keep: 1}) - } - if canRespond { - response = append(pubAck, strconv.FormatUint(seq, 10)...) - response = append(response, '}') - } - // Check for republish. - if republish { - var rpMsg []byte - if len(hdr) == 0 { - const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n" - const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n" - if !thdrsOnly { - hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tlseq)) - rpMsg = copyBytes(msg) - } else { - hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tlseq, len(msg))) - } + return nil + } + + // If we have a msgId make sure to save. + if msgId != _EMPTY_ { + mset.storeMsgIdLocked(&ddentry{msgId, seq, ts}) + } + + // If here we succeeded in storing the message. + mset.mu.Unlock() + + // No errors, this is the normal path. + if rollupSub { + mset.purge(&JSApiStreamPurgeRequest{Subject: subject, Keep: 1}) + } else if rollupAll { + mset.purge(&JSApiStreamPurgeRequest{Keep: 1}) + } + + // Check for republish. + if republish { + var rpMsg []byte + if len(hdr) == 0 { + const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n" + const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n" + if !thdrsOnly { + hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tlseq)) + rpMsg = copyBytes(msg) } else { - // Slow path. - hdr = genHeader(hdr, JSStream, name) - hdr = genHeader(hdr, JSSubject, subject) - hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10)) - hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10)) - if !thdrsOnly { - rpMsg = copyBytes(msg) - } else { - hdr = genHeader(hdr, JSMsgSize, strconv.Itoa(len(msg))) - } + hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tlseq, len(msg))) + } + } else { + // Slow path. + hdr = genHeader(hdr, JSStream, name) + hdr = genHeader(hdr, JSSubject, subject) + hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10)) + hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10)) + if !thdrsOnly { + rpMsg = copyBytes(msg) + } else { + hdr = genHeader(hdr, JSMsgSize, strconv.Itoa(len(msg))) } - mset.outq.send(newJSPubMsg(tsubj, _EMPTY_, _EMPTY_, copyBytes(hdr), rpMsg, nil, seq)) } + mset.outq.send(newJSPubMsg(tsubj, _EMPTY_, _EMPTY_, copyBytes(hdr), rpMsg, nil, seq)) } // Send response here. if canRespond { + response = append(pubAck, strconv.FormatUint(seq, 10)...) + response = append(response, '}') mset.outq.sendMsg(reply, response) } - if err == nil && seq > 0 && numConsumers > 0 { - mset.mu.RLock() - for _, o := range mset.consumers { + // Signal consumers for new messages. + if numConsumers > 0 { + mset.clsMu.RLock() + for _, o := range mset.cList { o.mu.Lock() if o.isLeader() && o.isFilteredMatch(subject) { if seq > o.npcm { @@ -3995,10 +3991,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } o.mu.Unlock() } - mset.mu.RUnlock() + mset.clsMu.RUnlock() } - return err + return nil } // Internal message for use by jetstream subsystem. @@ -4258,6 +4254,9 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { for _, o := range mset.consumers { obs = append(obs, o) } + mset.clsMu.Lock() + mset.consumers, mset.cList = nil, nil + mset.clsMu.Unlock() // Check if we are a mirror. if mset.mirror != nil && mset.mirror.sub != nil { @@ -4271,8 +4270,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.cancelSourceConsumer(si.iname) } } - mset.mu.Unlock() + for _, o := range obs { // Third flag says do not broadcast a signal. // TODO(dlc) - If we have an err here we don't want to stop @@ -4403,16 +4402,11 @@ func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) { }, nil } -// getConsumers will return all the current consumers for this stream. +// getConsumers will return a copy of all the current consumers for this stream. func (mset *stream) getConsumers() []*consumer { - mset.mu.RLock() - defer mset.mu.RUnlock() - - var obs []*consumer - for _, o := range mset.consumers { - obs = append(obs, o) - } - return obs + mset.clsMu.RLock() + defer mset.clsMu.RUnlock() + return append([]*consumer(nil), mset.cList...) } // Lock should be held for this one. @@ -4422,11 +4416,11 @@ func (mset *stream) numPublicConsumers() int { // This returns all consumers that are not DIRECT. func (mset *stream) getPublicConsumers() []*consumer { - mset.mu.RLock() - defer mset.mu.RUnlock() + mset.clsMu.RLock() + defer mset.clsMu.RUnlock() var obs []*consumer - for _, o := range mset.consumers { + for _, o := range mset.cList { if !o.cfg.Direct { obs = append(obs, o) } @@ -4441,6 +4435,18 @@ func (mset *stream) numConsumers() int { return len(mset.consumers) } +// Lock should be held +// Don't expect this to be called at high rates. +func (mset *stream) updateConsumerList() { + mset.clsMu.Lock() + defer mset.clsMu.Unlock() + mset.cList = make([]*consumer, 0, len(mset.consumers)) + for _, o := range mset.consumers { + mset.cList = append(mset.cList, o) + } +} + +// Lock should be held. func (mset *stream) setConsumer(o *consumer) { mset.consumers[o.name] = o if o.cfg.FilterSubject != _EMPTY_ { @@ -4449,8 +4455,11 @@ func (mset *stream) setConsumer(o *consumer) { if o.cfg.Direct { mset.directs++ } + // Now update consumers list as well + mset.updateConsumerList() } +// Lock should be held. func (mset *stream) removeConsumer(o *consumer) { if o.cfg.FilterSubject != _EMPTY_ && mset.numFilter > 0 { mset.numFilter-- @@ -4459,6 +4468,8 @@ func (mset *stream) removeConsumer(o *consumer) { mset.directs-- } delete(mset.consumers, o.name) + // Now update consumers list as well + mset.updateConsumerList() } // lookupConsumer will retrieve a consumer by name. @@ -4469,11 +4480,11 @@ func (mset *stream) lookupConsumer(name string) *consumer { } func (mset *stream) numDirectConsumers() (num int) { - mset.mu.RLock() - defer mset.mu.RUnlock() + mset.clsMu.RLock() + defer mset.clsMu.RUnlock() // Consumers that are direct are not recorded at the store level. - for _, o := range mset.consumers { + for _, o := range mset.cList { o.mu.RLock() if o.cfg.Direct { num++