Merge pull request #1908 from nats-io/offbyone

Set pindex to wrong setting on snapshot restore with no WAL
This commit is contained in:
Derek Collison
2021-02-13 08:51:21 -07:00
committed by GitHub
2 changed files with 38 additions and 4 deletions

View File

@@ -8822,7 +8822,7 @@ func TestJetStreamPubPerf(t *testing.T) {
nc := clientConnectToServer(t, s)
defer nc.Close()
toSend := 5000000
toSend := 5_000_000
numProducers := 1
payload := []byte("Hello World")
@@ -8870,7 +8870,7 @@ func TestJetStreamPubWithAsyncResponsePerf(t *testing.T) {
msetConfig := StreamConfig{
Name: "sr33",
Storage: MemoryStorage,
Storage: FileStorage,
Subjects: []string{"foo"},
}
@@ -8881,7 +8881,7 @@ func TestJetStreamPubWithAsyncResponsePerf(t *testing.T) {
nc := clientConnectToServer(t, s)
defer nc.Close()
toSend := 1000000
toSend := 1_000_000
payload := []byte("Hello World")
start := time.Now()
@@ -8895,6 +8895,38 @@ func TestJetStreamPubWithAsyncResponsePerf(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}
func TestJetStreamPubWithSyncPerf(t *testing.T) {
// Comment out to run, holding place for now.
//t.SkipNow()
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "foo"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 1_000_000
payload := []byte("Hello World")
start := time.Now()
for i := 0; i < toSend; i++ {
js.Publish("foo", payload)
}
tt := time.Since(start)
fmt.Printf("time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}
func TestJetStreamConsumerPerf(t *testing.T) {
// Comment out to run, holding place for now.
t.SkipNow()

View File

@@ -360,6 +360,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
n.commit = first.commit
}
}
// Replay the log.
// Since doing this in place we need to make sure we have enough room on the applyc.
needed := state.Msgs + 1 // 1 is for nil to mark end of replay.
@@ -788,7 +789,7 @@ func (n *raft) setupLastSnapshot() {
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applyc <- &CommittedEntry{n.commit, []*Entry{&Entry{EntrySnapshot, snap.data}}}
n.wal.Compact(snap.lastIndex)
n.wal.Compact(snap.lastIndex + 1)
}
n.Unlock()
}
@@ -2256,6 +2257,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
if ae.pindex != seq-1 {
fmt.Printf("[%s] n is %+v\n\n", n.s, n)
fmt.Printf("[%s] n.catchup is %+v\n", n.s, n.catchup)
fmt.Printf("[%s] n.wal is %+v\n", n.s, n.wal.State())
panic(fmt.Sprintf("[%s-%s] Placed an entry at the wrong index, ae is %+v, seq is %d, n.pindex is %d\n\n", n.s, n.group, ae, seq, n.pindex))
}