When encountering errors for sequence mismatches that were benign we were returning an error and not processing the rest of the entries.
This would lead to more severe sequence mismatches later on that would cause stream resets.

Also added code to deal with server restarts and the clfs fixup states which should have been reset properly.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-11-02 14:38:22 -07:00
parent 1097ac9234
commit 1af3ab1b4e
4 changed files with 89 additions and 15 deletions

View File

@@ -1752,7 +1752,11 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
return err
// Only return in place if we are going to reset stream or we are out of space.
if isClusterResetErr(err) || isOutOfSpaceErr(err) {
return err
}
s.Debugf("Apply stream entries error processing message: %v", err)
}
case deleteMsgOp:
md, err := decodeMsgDelete(buf[1:])

View File

@@ -9342,6 +9342,68 @@ func TestJetStreamClusterStreamUpdateMissingBeginning(t *testing.T) {
}
}
// Issue #2666
func TestJetStreamClusterKVMultipleConcurrentCreate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 1, TTL: 150 * time.Millisecond, Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startCh := make(chan bool)
var wg sync.WaitGroup
for n := 0; n < 5; n++ {
wg.Add(1)
go func() {
defer wg.Done()
<-startCh
if r, err := kv.Create("name", []byte("dlc")); err == nil {
if _, err = kv.Update("name", []byte("rip"), r); err != nil {
t.Log("Unexpected Update error: ", err)
}
}
}()
}
// Wait for Go routines to start.
time.Sleep(100 * time.Millisecond)
close(startCh)
wg.Wait()
// Just make sure its there and picks up the phone.
if _, err := js.StreamInfo("KV_TEST"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now make sure we do ok when servers are restarted and we need to deal with dangling clfs state.
// First non-leader.
rs := c.randomNonStreamLeader("$G", "KV_TEST")
rs.Shutdown()
rs = c.restartServer(rs)
c.waitOnStreamCurrent(rs, "$G", "KV_TEST")
if _, err := kv.Put("name", []byte("ik")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now the actual leader.
sl := c.streamLeader("$G", "KV_TEST")
sl.Shutdown()
sl = c.restartServer(sl)
c.waitOnStreamLeader("$G", "KV_TEST")
c.waitOnStreamCurrent(sl, "$G", "KV_TEST")
if _, err := kv.Put("name", []byte("mh")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
time.Sleep(time.Second)
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -85,11 +85,6 @@ type WAL interface {
Delete() error
}
type LeadChange struct {
Leader bool
Previous string
}
type Peer struct {
ID string
Current bool

View File

@@ -201,6 +201,7 @@ type stream struct {
clMu sync.Mutex
clseq uint64
clfs uint64
leader string
lqsent time.Time
catchups map[string]uint64
}
@@ -528,6 +529,8 @@ func (mset *stream) setLeader(isLeader bool) error {
mset.mu.Unlock()
return err
}
// Clear and fixup state we had for last state.
mset.clfs = 0
} else {
// Stop responding to sync requests.
mset.stopClusterSubs()
@@ -535,6 +538,14 @@ func (mset *stream) setLeader(isLeader bool) error {
mset.unsubscribeToStream()
// Clear catchup state
mset.clearAllCatchupPeers()
// Check on any fixup state and optionally clear.
if mset.isClustered() && mset.leader != _EMPTY_ && mset.leader != mset.node.GroupLeader() {
mset.clfs = 0
}
}
// Track group leader.
if mset.isClustered() {
mset.leader = mset.node.GroupLeader()
}
mset.mu.Unlock()
return nil
@@ -2760,16 +2771,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
outq := mset.outq
// Dedupe detection.
msgId = getMsgId(hdr)
if dde := mset.checkMsgId(msgId); dde != nil {
mset.clfs++
mset.mu.Unlock()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
outq.sendMsg(reply, response)
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
if dde := mset.checkMsgId(msgId); dde != nil {
mset.clfs++
mset.mu.Unlock()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
outq.sendMsg(reply, response)
}
return errMsgIdDuplicate
}
return errMsgIdDuplicate
}
// Expected stream.
@@ -2838,6 +2850,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check for any rollups.
if rollup := getRollup(hdr); rollup != _EMPTY_ {
if !mset.cfg.AllowRollup || mset.cfg.DenyPurge {
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}