CAS operations improved, hold lock past store. Use separate lock for consumer list and storage updates.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-08-24 17:13:26 -07:00
parent 14a7bf89df
commit d04763eb7d
3 changed files with 197 additions and 113 deletions

View File

@@ -4150,7 +4150,8 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
// If it was pending process it like an ack.
// TODO(dlc) - we could do a term here instead with a reason to generate the advisory.
if wasPending {
o.processAckMsg(sseq, p.Sequence, rdc, false)
// We could have lock for stream so do this in a go routine.
go o.processTerm(sseq, p.Sequence, rdc)
}
}

View File

@@ -20,6 +20,7 @@ import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"encoding/json"
"fmt"
@@ -5444,3 +5445,74 @@ func TestNoRaceJetStreamFileStoreLargeKVAccessTiming(t *testing.T) {
t.Fatalf("Took too long to look up last key by subject vs first: %v vs %v", base, slow)
}
}
func TestNoRaceJetStreamKVLock(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "LOCKS"})
require_NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var wg sync.WaitGroup
start := make(chan bool)
var tracker int64
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
nc, js := jsClientConnect(t, s)
defer nc.Close()
kv, err := js.KeyValue("LOCKS")
require_NoError(t, err)
<-start
for {
last, err := kv.Create("MY_LOCK", []byte("Z"))
if err != nil {
select {
case <-time.After(10 * time.Millisecond):
continue
case <-ctx.Done():
return
}
}
if v := atomic.AddInt64(&tracker, 1); v != 1 {
t.Logf("TRACKER NOT 1 -> %d\n", v)
cancel()
}
time.Sleep(10 * time.Millisecond)
if v := atomic.AddInt64(&tracker, -1); v != 0 {
t.Logf("TRACKER NOT 0 AFTER RELEASE -> %d\n", v)
cancel()
}
err = kv.Delete("MY_LOCK", nats.LastRevision(last))
if err != nil {
t.Logf("Could not unlock for last %d: %v", last, err)
}
if ctx.Err() != nil {
return
}
}
}()
}
close(start)
wg.Wait()
}

View File

@@ -219,6 +219,10 @@ type stream struct {
// For republishing.
tr *transform
// For processing consumers as a list without main stream lock.
clsMu sync.RWMutex
cList []*consumer
// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment
@@ -1534,23 +1538,17 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
// Purge will remove all messages from the stream and underlying store based on the request.
func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) {
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
mset.mu.RLock()
if mset.client == nil || mset.store == nil {
mset.mu.RUnlock()
return 0, errors.New("invalid stream")
}
if mset.cfg.Sealed {
mset.mu.RUnlock()
return 0, errors.New("sealed stream")
}
var _obs [4]*consumer
obs := _obs[:0]
for _, o := range mset.consumers {
if preq != nil && !o.isFilteredMatch(preq.Subject) {
continue
}
obs = append(obs, o)
}
mset.mu.Unlock()
store := mset.store
mset.mu.RUnlock()
if preq != nil {
purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep)
@@ -1563,19 +1561,24 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// Purge consumers.
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq
lseq := state.LastSeq
store.FastState(&state)
fseq, lseq := state.FirstSeq, state.LastSeq
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := mset.store.FilteredState(state.FirstSeq, preq.Subject)
ss := store.FilteredState(state.FirstSeq, preq.Subject)
fseq = ss.First
}
for _, o := range obs {
mset.clsMu.RLock()
for _, o := range mset.cList {
if preq != nil && !o.isFilteredMatch(preq.Subject) {
continue
}
o.purge(fseq, lseq)
}
mset.clsMu.RUnlock()
return purged, nil
}
@@ -3203,21 +3206,12 @@ func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) {
// If we have a single negative update then we will process our consumers for stream pending.
// Purge and Store handled separately inside individual calls.
if md == -1 && seq > 0 {
// We need to pull these out here and release the lock, even and RLock. RLocks are allowed to
// be reentrant, however once anyone signals interest in a write lock any subsequent RLocks
// will block. decStreamPending can try to re-acquire the RLock for this stream.
var _cl [8]*consumer
cl := _cl[:0]
mset.mu.RLock()
for _, o := range mset.consumers {
cl = append(cl, o)
}
mset.mu.RUnlock()
for _, o := range cl {
// We use our consumer list mutex here instead of the main stream lock since it may be held already.
mset.clsMu.RLock()
for _, o := range mset.cList {
o.decStreamPending(seq, subj)
}
mset.clsMu.RUnlock()
}
if mset.jsa != nil {
@@ -3686,7 +3680,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if sm != nil {
fseq = sm.seq
}
// If seq passed in is zero that signals we expect no msg to be present.
if err == ErrStoreMsgNotFound && seq == 0 {
fseq, err = 0, nil
}
@@ -3773,8 +3766,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamMessageExceedsMaximumError()
b, _ := json.Marshal(resp)
mset.outq.sendMsg(reply, b)
response, _ = json.Marshal(resp)
mset.outq.sendMsg(reply, response)
}
return ErrMaxPayload
}
@@ -3785,8 +3778,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamHeaderExceedsMaximumError()
b, _ := json.Marshal(resp)
mset.outq.sendMsg(reply, b)
response, _ = json.Marshal(resp)
mset.outq.sendMsg(reply, response)
}
return ErrMaxPayload
}
@@ -3799,8 +3792,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSInsufficientResourcesError()
b, _ := json.Marshal(resp)
mset.outq.sendMsg(reply, b)
response, _ = json.Marshal(resp)
mset.outq.sendMsg(reply, response)
}
// Stepdown regardless.
if node := mset.raftNode(); node != nil {
@@ -3818,12 +3811,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
} else if mset.numFilter > 0 {
// Assume no interest and check to disqualify.
noInterest = true
for _, o := range mset.consumers {
mset.clsMu.RLock()
for _, o := range mset.cList {
if o.cfg.FilterSubject == _EMPTY_ || subjectIsSubsetMatch(subject, o.cfg.FilterSubject) {
noInterest = false
break
}
}
mset.clsMu.RUnlock()
}
}
@@ -3836,17 +3831,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if noInterest {
mset.lseq = store.SkipMsg()
mset.lmsgId = msgId
mset.mu.Unlock()
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}
if canRespond {
response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...)
response = append(response, '}')
mset.outq.sendMsg(reply, response)
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
mset.mu.Unlock()
return nil
}
@@ -3870,11 +3864,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
republish := tsubj != _EMPTY_ && isLeader
// We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions.
// Currently can not hold while calling store b/c we have inline storage update calls that may need the lock.
// Note that upstream that sets seq/ts should be serialized as much as possible.
mset.mu.Unlock()
// If we are republishing grab last sequence for this exact subject. Aids in gap detection for lightweight clients.
if republish {
var smv StoreMsg
@@ -3894,7 +3883,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if err != nil {
// If we did not succeed put those values back and increment clfs in case we are clustered.
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
@@ -3914,8 +3902,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamStoreFailedError(err, Unless(err))
response, _ = json.Marshal(resp)
mset.outq.sendMsg(reply, response)
}
} else if exceeded, apiErr := jsa.limitsExceeded(stype, tierName); exceeded {
return err
}
if exceeded, apiErr := jsa.limitsExceeded(stype, tierName); exceeded {
s.RateLimitWarnf("JetStream resource limits exceeded for account: %q", accName)
if canRespond {
resp.PubAck = &PubAck{Stream: name}
@@ -3925,67 +3917,71 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.Error = apiErr
}
response, _ = json.Marshal(resp)
mset.outq.sendMsg(reply, response)
}
// If we did not succeed put those values back.
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
mset.mu.Unlock()
store.RemoveMsg(seq)
seq = 0
} else {
// No errors, this is the normal path.
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
if rollupSub {
mset.purge(&JSApiStreamPurgeRequest{Subject: subject, Keep: 1})
} else if rollupAll {
mset.purge(&JSApiStreamPurgeRequest{Keep: 1})
}
if canRespond {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
// Check for republish.
if republish {
var rpMsg []byte
if len(hdr) == 0 {
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
if !thdrsOnly {
hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tlseq))
rpMsg = copyBytes(msg)
} else {
hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tlseq, len(msg)))
}
return nil
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}
// If here we succeeded in storing the message.
mset.mu.Unlock()
// No errors, this is the normal path.
if rollupSub {
mset.purge(&JSApiStreamPurgeRequest{Subject: subject, Keep: 1})
} else if rollupAll {
mset.purge(&JSApiStreamPurgeRequest{Keep: 1})
}
// Check for republish.
if republish {
var rpMsg []byte
if len(hdr) == 0 {
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
if !thdrsOnly {
hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tlseq))
rpMsg = copyBytes(msg)
} else {
// Slow path.
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, subject)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10))
hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10))
if !thdrsOnly {
rpMsg = copyBytes(msg)
} else {
hdr = genHeader(hdr, JSMsgSize, strconv.Itoa(len(msg)))
}
hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tlseq, len(msg)))
}
} else {
// Slow path.
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, subject)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10))
hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10))
if !thdrsOnly {
rpMsg = copyBytes(msg)
} else {
hdr = genHeader(hdr, JSMsgSize, strconv.Itoa(len(msg)))
}
mset.outq.send(newJSPubMsg(tsubj, _EMPTY_, _EMPTY_, copyBytes(hdr), rpMsg, nil, seq))
}
mset.outq.send(newJSPubMsg(tsubj, _EMPTY_, _EMPTY_, copyBytes(hdr), rpMsg, nil, seq))
}
// Send response here.
if canRespond {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
mset.outq.sendMsg(reply, response)
}
if err == nil && seq > 0 && numConsumers > 0 {
mset.mu.RLock()
for _, o := range mset.consumers {
// Signal consumers for new messages.
if numConsumers > 0 {
mset.clsMu.RLock()
for _, o := range mset.cList {
o.mu.Lock()
if o.isLeader() && o.isFilteredMatch(subject) {
if seq > o.npcm {
@@ -3995,10 +3991,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
o.mu.Unlock()
}
mset.mu.RUnlock()
mset.clsMu.RUnlock()
}
return err
return nil
}
// Internal message for use by jetstream subsystem.
@@ -4258,6 +4254,9 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
for _, o := range mset.consumers {
obs = append(obs, o)
}
mset.clsMu.Lock()
mset.consumers, mset.cList = nil, nil
mset.clsMu.Unlock()
// Check if we are a mirror.
if mset.mirror != nil && mset.mirror.sub != nil {
@@ -4271,8 +4270,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.cancelSourceConsumer(si.iname)
}
}
mset.mu.Unlock()
for _, o := range obs {
// Third flag says do not broadcast a signal.
// TODO(dlc) - If we have an err here we don't want to stop
@@ -4403,16 +4402,11 @@ func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) {
}, nil
}
// getConsumers will return all the current consumers for this stream.
// getConsumers will return a copy of all the current consumers for this stream.
func (mset *stream) getConsumers() []*consumer {
mset.mu.RLock()
defer mset.mu.RUnlock()
var obs []*consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
return obs
mset.clsMu.RLock()
defer mset.clsMu.RUnlock()
return append([]*consumer(nil), mset.cList...)
}
// Lock should be held for this one.
@@ -4422,11 +4416,11 @@ func (mset *stream) numPublicConsumers() int {
// This returns all consumers that are not DIRECT.
func (mset *stream) getPublicConsumers() []*consumer {
mset.mu.RLock()
defer mset.mu.RUnlock()
mset.clsMu.RLock()
defer mset.clsMu.RUnlock()
var obs []*consumer
for _, o := range mset.consumers {
for _, o := range mset.cList {
if !o.cfg.Direct {
obs = append(obs, o)
}
@@ -4441,6 +4435,18 @@ func (mset *stream) numConsumers() int {
return len(mset.consumers)
}
// Lock should be held
// Don't expect this to be called at high rates.
func (mset *stream) updateConsumerList() {
mset.clsMu.Lock()
defer mset.clsMu.Unlock()
mset.cList = make([]*consumer, 0, len(mset.consumers))
for _, o := range mset.consumers {
mset.cList = append(mset.cList, o)
}
}
// Lock should be held.
func (mset *stream) setConsumer(o *consumer) {
mset.consumers[o.name] = o
if o.cfg.FilterSubject != _EMPTY_ {
@@ -4449,8 +4455,11 @@ func (mset *stream) setConsumer(o *consumer) {
if o.cfg.Direct {
mset.directs++
}
// Now update consumers list as well
mset.updateConsumerList()
}
// Lock should be held.
func (mset *stream) removeConsumer(o *consumer) {
if o.cfg.FilterSubject != _EMPTY_ && mset.numFilter > 0 {
mset.numFilter--
@@ -4459,6 +4468,8 @@ func (mset *stream) removeConsumer(o *consumer) {
mset.directs--
}
delete(mset.consumers, o.name)
// Now update consumers list as well
mset.updateConsumerList()
}
// lookupConsumer will retrieve a consumer by name.
@@ -4469,11 +4480,11 @@ func (mset *stream) lookupConsumer(name string) *consumer {
}
func (mset *stream) numDirectConsumers() (num int) {
mset.mu.RLock()
defer mset.mu.RUnlock()
mset.clsMu.RLock()
defer mset.clsMu.RUnlock()
// Consumers that are direct are not recorded at the store level.
for _, o := range mset.consumers {
for _, o := range mset.cList {
o.mu.RLock()
if o.cfg.Direct {
num++