diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 848d4c62..b2293d40 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -586,6 +586,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { return true } } + return false } @@ -1295,17 +1296,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { // Make sure we do not leave the apply channel to fill up and block the raft layer. defer func() { - if n.State() != Closed { - if n.Leader() { - n.StepDown() - } - // Drain the commit channel.. - for len(ach) > 0 { - select { - case <-ach: - default: - return - } + if n.State() == Closed { + return + } + if n.Leader() { + n.StepDown() + } + // Drain the commit channel.. + for len(ach) > 0 { + select { + case <-ach: + default: + return } } }() @@ -1529,11 +1531,35 @@ func (mset *stream) resetClusteredState() bool { js.mu.Lock() sa.Group.node = nil js.mu.Unlock() - go js.processClusterCreateStream(acc, sa) + go js.restartClustered(acc, sa) } return true } +// This will reset the stream and consumers. +// Should be done in separate go routine. +func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) { + js.processClusterCreateStream(acc, sa) + + // Check consumers. + js.mu.Lock() + var consumers []*consumerAssignment + if cc := js.cluster; cc != nil && cc.meta != nil { + ourID := cc.meta.ID() + for _, ca := range sa.consumers { + if rg := ca.Group; rg != nil && rg.isMember(ourID) { + rg.node = nil // Erase group raft/node state. + consumers = append(consumers, ca) + } + } + } + js.mu.Unlock() + + for _, ca := range consumers { + js.processClusterCreateConsumer(ca, nil) + } +} + func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error { for _, e := range ce.Entries { if e.Type == EntryNormal { @@ -1561,7 +1587,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // We can skip if we know this is less than what we already have. if lseq < last { - s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last) + if !isRecovering { + s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last) + } continue } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 02e07790..9e26ed00 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -20,8 +20,10 @@ import ( "fmt" "io/ioutil" "math/rand" + "os" "path" "reflect" + "runtime" "strings" "sync" "sync/atomic" @@ -7763,7 +7765,7 @@ func TestJetStreamPullConsumerLeakedSubs(t *testing.T) { defer sub.Unsubscribe() // Load up a bunch of requests. - numRequests := 20 //100_000 + numRequests := 20 for i := 0; i < numRequests; i++ { js.PublishAsync("Domains.Domain", []byte("QUESTION")) } @@ -8078,6 +8080,175 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) { wg.Wait() } +// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers. +func TestJetStreamClusterStreamReset(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Replicas: 2, + Retention: nats.WorkQueuePolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + numRequests := 20 + for i := 0; i < numRequests; i++ { + js.Publish("foo.created", []byte("REQ")) + } + + // Durable. + sub, err := js.SubscribeSync("foo.created", nats.Durable("d1")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != uint64(numRequests) { + t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State) + } + // Let settle a bit. + time.Sleep(250 * time.Millisecond) + + // Grab number go routines. + base := runtime.NumGoroutine() + + // Grab a server that is the consumer leader for the durable. + cl := c.consumerLeader("$G", "TEST", "d1") + mset, err := cl.GlobalAccount().lookupStream("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Do a hard reset here by hand. + mset.resetClusteredState() + // Wait til we have the leader elected. + c.waitOnConsumerLeader("$G", "TEST", "d1") + + // So do not wait 10s in call in checkFor. + js2, _ := nc.JetStream(nats.MaxWait(100 * time.Millisecond)) + // Make sure we can get the consumer info eventually. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + _, err := js2.ConsumerInfo("TEST", "d1") + return err + }) + + // Grab number go routines. + if after := runtime.NumGoroutine(); base > after { + t.Fatalf("Expected %d go routines, got %d", base, after) + } +} + +// Issue #2397 +func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R2S", 2) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Hold onto servers. + sl := c.streamLeader("$G", "TEST") + if sl == nil { + t.Fatalf("Did not get a server") + } + nsl := c.randomNonStreamLeader("$G", "TEST") + if nsl == nil { + t.Fatalf("Did not get a server") + } + // Grab low level stream and raft node. + mset, err := nsl.GlobalAccount().lookupStream("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + node := mset.raftNode() + if node == nil { + t.Fatalf("Could not get stream group name") + } + gname := node.Group() + + numRequests := 100 + for i := 0; i < numRequests; i++ { + // This will force a snapshot which will prune the normal log. + // We will remove the snapshot to simulate the error condition. + if i == 10 { + if err := node.InstallSnapshot(mset.stateSnapshot()); err != nil { + t.Fatalf("Error installing snapshot: %v", err) + } + } + js.Publish("foo.created", []byte("REQ")) + } + + config := nsl.JetStreamConfig() + if config == nil { + t.Fatalf("No config") + } + lconfig := sl.JetStreamConfig() + if lconfig == nil { + t.Fatalf("No config") + } + + nc.Close() + c.stopAll() + // Remove all state by truncating for the non-leader. + for _, fn := range []string{"1.blk", "1.idx", "1.fss"} { + fname := path.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn) + fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms) + if err != nil { + continue + } + fd.Truncate(0) + fd.Close() + } + // For both make sure we have no raft snapshots. + snapDir := path.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots") + os.RemoveAll(snapDir) + snapDir = path.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots") + os.RemoveAll(snapDir) + + // Now restart. + c.restartAll() + for _, cs := range c.servers { + c.waitOnStreamCurrent(cs, "$G", "TEST") + } + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + if _, err := js.Publish("foo.created", []byte("REQ")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.LastSeq != 101 { + t.Fatalf("bad state after restart: %+v", si.State) + } +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/stream.go b/server/stream.go index 03fe86a7..719c922a 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2600,8 +2600,9 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // For clustering the lower layers will pass our expected lseq. If it is present check for that here. if lseq > 0 && lseq != (mset.lseq+mset.clfs) { isMisMatch := true - // If our first message for this mirror, see if we have to adjust our starting sequence. - if mset.cfg.Mirror != nil { + // We may be able to recover here if we have no state whatsoever, or we are a mirror. + // See if we have to adjust our starting sequence. + if mset.lseq == 0 || mset.cfg.Mirror != nil { var state StreamState mset.store.FastState(&state) if state.FirstSeq == 0 {