When taking over make sure to sync and reset clfs for clustered streams.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-08-03 10:41:10 -07:00
parent 9de5e3e64d
commit 081140ee67
3 changed files with 218 additions and 7 deletions

View File

@@ -2291,9 +2291,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case isLeader = <-lch:
if isLeader {
if sendSnapshot && mset != nil && n != nil {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
if mset != nil && n != nil {
// Send a snapshot if being asked or if we are tracking
// a failed state so that followers sync.
if clfs := mset.clearCLFS(); clfs > 0 || sendSnapshot {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
}
if isRestore {
acc, _ := s.LookupAccount(sa.Client.serviceAccount())
@@ -2714,15 +2718,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Grab last sequence and CLFS.
last, clfs := mset.lastSeqAndCLFS()
// We can skip if we know this is less than what we already have.
if lseq-clfs < last {
s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
mset.account(), mset.name(), lseq+1-clfs, last)
// Check for any preAcks in case we are interest based.
mset.mu.Lock()
seq := lseq + 1 - mset.clfs
mset.clearAllPreAcks(seq)
// Check for any preAcks in case we are interest based.
mset.clearAllPreAcks(lseq + 1 - mset.clfs)
mset.mu.Unlock()
continue
}

View File

@@ -4785,3 +4785,203 @@ func TestJetStreamAccountUsageDrifts(t *testing.T) {
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
}
}
func TestJetStreamClusterStreamFailTracking(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
m := nats.NewMsg("foo")
m.Data = []byte("OK")
b, bsz := 0, 5
sendBatch := func() {
for i := b * bsz; i < b*bsz+bsz; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}
b++
}
sendBatch()
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
sendBatch()
// Now stop one and restart.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
mset, err := nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
// Reset raft
mset.resetClusteredState(nil)
time.Sleep(100 * time.Millisecond)
nl.Shutdown()
nl.WaitForShutdown()
sendBatch()
nl = c.restartServer(nl)
sendBatch()
for {
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
if nl == c.streamLeader(globalAccountName, "TEST") {
break
}
}
sendBatch()
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)
// Make sure all in order.
errCh := make(chan error, 100)
var wg sync.WaitGroup
wg.Add(1)
expected, seen := b*bsz, 0
sub, err := js.Subscribe("foo", func(msg *nats.Msg) {
expectedID := fmt.Sprintf("ID:%d", seen)
if v := msg.Header.Get(JSMsgId); v != expectedID {
errCh <- err
wg.Done()
msg.Sub.Unsubscribe()
return
}
seen++
if seen >= expected {
wg.Done()
msg.Sub.Unsubscribe()
}
})
require_NoError(t, err)
defer sub.Unsubscribe()
wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}
func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
m := nats.NewMsg("foo")
m.Data = []byte("OK")
// Send 1000 a dupe every msgID.
for i := 0; i < 1000; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}
// Now stop one.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
nl.Shutdown()
nl.WaitForShutdown()
// Now send more and make sure leader snapshots.
for i := 1000; i < 2000; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}
sl := c.streamLeader(globalAccountName, "TEST")
mset, err := sl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())
// Now restart nl
nl = c.restartServer(nl)
c.waitOnServerCurrent(nl)
// Move leader to NL
for {
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
if nl == c.streamLeader(globalAccountName, "TEST") {
break
}
}
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)
// Make sure all in order.
errCh := make(chan error, 100)
var wg sync.WaitGroup
wg.Add(1)
expected, seen := 2000, 0
sub, err := js.Subscribe("foo", func(msg *nats.Msg) {
expectedID := fmt.Sprintf("ID:%d", seen)
if v := msg.Header.Get(JSMsgId); v != expectedID {
errCh <- err
wg.Done()
msg.Sub.Unsubscribe()
return
}
seen++
if seen >= expected {
wg.Done()
msg.Sub.Unsubscribe()
}
})
require_NoError(t, err)
defer sub.Unsubscribe()
wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}

View File

@@ -838,6 +838,14 @@ func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
return mset.lseq, mset.clfs
}
func (mset *stream) clearCLFS() uint64 {
mset.mu.Lock()
defer mset.mu.Unlock()
clfs := mset.clfs
mset.clfs = 0
return clfs
}
func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq