mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Additional stability improvements for catchup.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
committed by
Ivan Kozlovic
parent
3407112292
commit
e635de7526
@@ -6399,11 +6399,11 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapsho
|
||||
func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
|
||||
state := mset.state()
|
||||
|
||||
// Adjust if FirstSeq has moved.
|
||||
if snap.FirstSeq > state.FirstSeq && state.FirstSeq != 0 {
|
||||
// Always adjust if FirstSeq has moved beyond our state.
|
||||
if snap.FirstSeq > state.FirstSeq {
|
||||
mset.store.Compact(snap.FirstSeq)
|
||||
state = mset.store.State()
|
||||
mset.setLastSeq(snap.LastSeq)
|
||||
mset.setLastSeq(state.LastSeq)
|
||||
}
|
||||
// Range the deleted and delete if applicable.
|
||||
for _, dseq := range snap.Deleted {
|
||||
@@ -6536,11 +6536,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
|
||||
var ErrStreamStopped = errors.New("stream has been stopped")
|
||||
|
||||
defer func() {
|
||||
if e == ErrServerNotRunning || e == ErrStreamStopped {
|
||||
// Wipe our raft state if exiting with these errors.
|
||||
n.Wipe()
|
||||
// Don't bother resuming if server or stream is gone.
|
||||
if e != ErrStreamStopped && e != ErrServerNotRunning {
|
||||
n.ResumeApply()
|
||||
}
|
||||
n.ResumeApply()
|
||||
}()
|
||||
|
||||
// Set our catchup state.
|
||||
@@ -6561,7 +6560,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
|
||||
if gotMsgs || activityInterval == maxActivityInterval {
|
||||
return maxActivityInterval
|
||||
}
|
||||
activityInterval *= 2
|
||||
activityInterval *= 5
|
||||
if activityInterval > maxActivityInterval {
|
||||
activityInterval = maxActivityInterval
|
||||
}
|
||||
|
||||
@@ -12178,6 +12178,7 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
|
||||
|
||||
// We want to make sure we do not send unnecessary skip msgs when we know we do not have all of these messages.
|
||||
nc, _ = jsClientConnect(t, sl, nats.UserInfo("admin", "s3cr3t!"))
|
||||
defer nc.Close()
|
||||
sub, err := nc.SubscribeSync("$JSC.R.>")
|
||||
require_NoError(t, err)
|
||||
|
||||
@@ -12199,3 +12200,75 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
|
||||
t.Fatalf("Expected only 1 sync catchup msg to be sent signaling eof, but got %d", nmsgs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamResetWithLargeFirstSeq(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
MaxAge: 5 * time.Second,
|
||||
Replicas: 1,
|
||||
}
|
||||
_, err := js.AddStream(cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
// Fake a very large first seq.
|
||||
sl := c.streamLeader("$G", "TEST")
|
||||
mset, err := sl.GlobalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
mset.mu.Lock()
|
||||
mset.store.Compact(1_000_000)
|
||||
mset.mu.Unlock()
|
||||
// Restart
|
||||
sl.Shutdown()
|
||||
sl = c.restartServer(sl)
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
nc, js = jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Make sure we have the correct state after restart.
|
||||
si, err := js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
require_True(t, si.State.FirstSeq == 1_000_000)
|
||||
|
||||
// Now add in 10,000 messages.
|
||||
num := 10_000
|
||||
for i := 0; i < num; i++ {
|
||||
js.PublishAsync("foo", []byte("SNAP"))
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
si, err = js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
require_True(t, si.State.FirstSeq == 1_000_000)
|
||||
require_True(t, si.State.LastSeq == uint64(1_000_000+num-1))
|
||||
|
||||
// We want to make sure we do not send unnecessary skip msgs when we know we do not have all of these messages.
|
||||
ncs, _ := jsClientConnect(t, sl, nats.UserInfo("admin", "s3cr3t!"))
|
||||
defer nc.Close()
|
||||
sub, err := ncs.SubscribeSync("$JSC.R.>")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Now scale up to R3.
|
||||
cfg.Replicas = 3
|
||||
_, err = js.UpdateStream(cfg)
|
||||
require_NoError(t, err)
|
||||
nl := c.randomNonStreamLeader("$G", "TEST")
|
||||
c.waitOnStreamCurrent(nl, "$G", "TEST")
|
||||
|
||||
// Make sure we only sent the number of catchup msgs we expected.
|
||||
if nmsgs, _, _ := sub.Pending(); nmsgs != (cfg.Replicas-1)*(num+1) {
|
||||
t.Fatalf("Expected %d catchup msgs, but got %d", (cfg.Replicas-1)*(num+1), nmsgs)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user