diff --git a/server/consumer.go b/server/consumer.go index 0b51b67f..def22b41 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -690,9 +690,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerBadDurableNameError() } - // Select starting sequence number - o.selectStartingSeqNo() - // Setup our storage if not a direct consumer. if !config.Direct { store, err := mset.store.ConsumerStore(o.name, config) @@ -704,6 +701,14 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.store = store } + if o.store != nil && o.store.HasState() { + // Restore our saved state. + o.readStoredState(0) + } else { + // Select starting sequence number + o.selectStartingSeqNo() + } + // Now register with mset and create the ack subscription. // Check if we already have this one registered. if eo, ok := mset.consumers[o.name]; ok { @@ -883,10 +888,22 @@ func (o *consumer) setLeader(isLeader bool) { o.mu.Lock() o.rdq, o.rdqi = nil, nil + // Restore our saved state. During non-leader status we just update our underlying store. - hadState, _ := o.readStoredState(lseq) - // Setup initial pending and proper start sequence. - o.setInitialPendingAndStart(hadState) + o.readStoredState(lseq) + + // Setup initial num pending. + o.streamNumPending() + + // Cleanup lss when we take over in clustered mode. + if o.hasSkipListPending() && o.sseq >= o.lss.resume { + o.lss = nil + } + + // Update the group on the our starting sequence if we are starting but we skipped some in the stream. + if o.dseq == 1 && o.sseq > 1 { + o.updateSkipped() + } // Do info sub. if o.infoSub == nil && jsa != nil { @@ -1929,19 +1946,18 @@ func (o *consumer) checkRedelivered(slseq uint64) { // This will restore the state from disk. // Lock should be held. -func (o *consumer) readStoredState(slseq uint64) (hadState bool, err error) { +func (o *consumer) readStoredState(slseq uint64) error { if o.store == nil { - return false, nil + return nil } state, err := o.store.State() - if err == nil && state != nil && (state.Delivered.Consumer != 0 || state.Delivered.Stream != 0) { + if err == nil { o.applyState(state) - hadState = true if len(o.rdc) > 0 { o.checkRedelivered(slseq) } } - return hadState, err + return err } // Apply the consumer stored state. @@ -1950,8 +1966,12 @@ func (o *consumer) applyState(state *ConsumerState) { return } + // If o.sseq is greater don't update. Don't go backwards on o.sseq. + if o.sseq <= state.Delivered.Stream { + o.sseq = state.Delivered.Stream + 1 + } o.dseq = state.Delivered.Consumer + 1 - o.sseq = state.Delivered.Stream + 1 + o.adflr = state.AckFloor.Consumer o.asflr = state.AckFloor.Stream o.pending = state.Pending @@ -3138,6 +3158,10 @@ func (o *consumer) numPending() uint64 { if o.npsm == 0 { o.streamNumPending() } + // This can wrap based on possibly having a dec before the inc. Account for that here. + if o.npc&(1<<63) != 0 { + return 0 + } return o.npc } @@ -3667,6 +3691,11 @@ func (o *consumer) selectStartingSeqNo() { o.adflr = o.dseq - 1 // Set ack store floor to store-1 o.asflr = o.sseq - 1 + + // Set our starting sequence state. + if o.store != nil && o.sseq > 0 { + o.store.SetStarting(o.sseq - 1) + } } // Test whether a config represents a durable subscriber. @@ -4001,67 +4030,6 @@ func (o *consumer) requestNextMsgSubject() string { return o.nextMsgSubj } -// Will set the initial pending and start sequence. -// mset lock should be held. -func (o *consumer) setInitialPendingAndStart(hadState bool) { - mset := o.mset - if mset == nil || mset.store == nil { - return - } - - // !filtered means we want all messages. - filtered, dp := o.cfg.FilterSubject != _EMPTY_, o.cfg.DeliverPolicy - if filtered { - // Check to see if we directly match the configured stream. - // Many clients will always send a filtered subject. - cfg := &mset.cfg - if len(cfg.Subjects) == 1 && cfg.Subjects[0] == o.cfg.FilterSubject { - filtered = false - } - } - - if filtered || dp == DeliverLastPerSubject || dp == DeliverNew { - // Here we are filtered. - if dp == DeliverLastPerSubject && o.hasSkipListPending() && o.sseq < o.lss.resume { - if !hadState { - o.sseq = o.lss.seqs[0] - } - } else if ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject); ss.Msgs > 0 { - // See if we should update our starting sequence. - if dp == DeliverLast || dp == DeliverLastPerSubject { - if !hadState { - o.sseq = ss.Last - } - } else if dp == DeliverNew { - // If our original is larger we will ignore, we don't want to go backwards with DeliverNew. - // If its greater, we need to adjust pending. - if ss.Last >= o.sseq { - if !hadState { - o.sseq = ss.Last + 1 - } - } - } else if !hadState { - // DeliverAll, DeliverByStartSequence, DeliverByStartTime - o.sseq = ss.First - } - // Cleanup lss when we take over in clustered mode. - if dp == DeliverLastPerSubject && o.hasSkipListPending() && o.sseq >= o.lss.resume { - o.lss = nil - } - } - o.updateSkipped() - } - - // Update our persisted state if something has changed. - if store := o.store; store != nil { - if state, _ := store.State(); state != nil { - if o.dseq-1 > state.Delivered.Consumer || o.sseq-1 > state.Delivered.Stream { - o.writeStoreStateUnlocked() - } - } - } -} - func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() // Update our cached num pending. Only do so if we think deliverMsg has not done so. diff --git a/server/filestore.go b/server/filestore.go index cd552641..f3a30662 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5446,6 +5446,26 @@ func (o *consumerFileStore) flushLoop() { } } +// SetStarting sets our starting stream sequence. +func (o *consumerFileStore) SetStarting(sseq uint64) error { + o.mu.Lock() + o.state.Delivered.Stream = sseq + buf, err := o.encodeState() + o.mu.Unlock() + if err != nil { + return err + } + return o.writeState(buf) +} + +// HasState returns if this store has a recorded state. +func (o *consumerFileStore) HasState() bool { + o.mu.Lock() + _, err := os.Stat(o.ifn) + o.mu.Unlock() + return err == nil +} + // UpdateDelivered is called whenever a new message has been delivered. func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) error { o.mu.Lock() diff --git a/server/jetstream.go b/server/jetstream.go index 0291b37c..e9f10ce2 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1272,7 +1272,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } lseq := e.mset.lastSeq() obs.mu.Lock() - _, err = obs.readStoredState(lseq) + err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 689a3b9f..7fc1350e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3598,7 +3598,9 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea o.mu.Lock() if !o.isLeader() { var le = binary.LittleEndian - o.sseq = le.Uint64(buf[1:]) + if sseq := le.Uint64(buf[1:]); sseq > o.sseq { + o.sseq = sseq + } } o.mu.Unlock() case addPendingRequest: @@ -5943,8 +5945,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { for _, o := range mset.consumers { o.mu.Lock() if o.isLeader() { - // This expects mset lock to be held. - o.setInitialPendingAndStart(false) + o.streamNumPending() } o.mu.Unlock() } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 809ec554..31f18ae1 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3408,7 +3408,7 @@ func TestJetStreamClusterPeerRemovalAPI(t *testing.T) { t.Fatalf("Expected advisory about %s being removed, got %+v", rs.Name(), adv) } - checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { for _, s := range ml.JetStreamClusterPeers() { if s == rs.Name() { return fmt.Errorf("Still in the peer list") @@ -10731,3 +10731,87 @@ func TestJetStreamClusterStreamRepublish(t *testing.T) { lseq[m.Subject] = seq } } + +func TestJetStreamClusterConsumerDeliverNewNotConsumingBeforeStepDownOrRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + inbox := nats.NewInbox() + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverSubject: inbox, + Durable: "dur", + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverNewPolicy, + FilterSubject: "foo", + }) + require_NoError(t, err) + + c.waitOnConsumerLeader(globalAccountName, "TEST", "dur") + + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, "foo", "msg") + } + + checkCount := func(expected int) { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + ci, err := js.ConsumerInfo("TEST", "dur") + if err != nil { + return err + } + if n := int(ci.NumPending); n != expected { + return fmt.Errorf("Expected %v pending, got %v", expected, n) + } + return nil + }) + } + checkCount(10) + + resp, err := nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dur"), nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var cdResp JSApiConsumerLeaderStepDownResponse + if err := json.Unmarshal(resp.Data, &cdResp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if cdResp.Error != nil { + t.Fatalf("Unexpected error: %+v", cdResp.Error) + } + + c.waitOnConsumerLeader(globalAccountName, "TEST", "dur") + checkCount(10) + + // Check also servers restart + nc.Close() + c.stopAll() + c.restartAll() + + c.waitOnConsumerLeader(globalAccountName, "TEST", "dur") + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + checkCount(10) + + // Make sure messages can be consumed + sub := natsSubSync(t, nc, inbox) + for i := 0; i < 10; i++ { + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("i=%v next msg error: %v", i, err) + } + msg.AckSync() + } + checkCount(0) +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index ebcafcd9..b2d6d094 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -31,11 +31,11 @@ import ( func init() { // Speed up raft for tests. - hbInterval = 200 * time.Millisecond + hbInterval = 50 * time.Millisecond minElectionTimeout = 1 * time.Second maxElectionTimeout = 3 * time.Second lostQuorumInterval = time.Second - lostQuorumCheck = hbInterval + lostQuorumCheck = 4 * hbInterval } // Used to setup superclusters for tests. diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index 83d571ae..45ec44fb 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -287,7 +287,7 @@ func TestJetStreamJWTMove(t *testing.T) { require_NoError(t, err) require_Equal(t, ci.Cluster.Name, "C1") - checkFor(t, 10*time.Second, 10*time.Millisecond, func() error { + checkFor(t, 15*time.Second, 50*time.Millisecond, func() error { if si, err := js.StreamInfo("MOVE-ME"); err != nil { return err } else if si.Cluster.Name != "C2" { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 0d580f99..036fece7 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -1569,7 +1569,7 @@ func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) { require_NoError(t, err) // Put messages in.. - num := 200 + num := 100 for i := 0; i < num; i++ { js.PublishAsync("T", []byte("OK")) } @@ -1586,7 +1586,7 @@ func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) { }) require_NoError(t, err) - if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 200 { + if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 { t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered) } @@ -1603,7 +1603,7 @@ func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) { ci, err = js.ConsumerInfo("T", "d") require_NoError(t, err) - if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 200 { + if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 { t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered) } if ci.NumPending != 0 { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 7a3c8023..329d95e7 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -17540,6 +17540,78 @@ func TestJetStreamStreamRepublishCycle(t *testing.T) { expectFail() } +func TestJetStreamConsumerDeliverNewNotConsumingBeforeRestart(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + inbox := nats.NewInbox() + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverSubject: inbox, + Durable: "dur", + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverNewPolicy, + FilterSubject: "foo", + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, "foo", "msg") + } + + checkCount := func(expected int) { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + ci, err := js.ConsumerInfo("TEST", "dur") + if err != nil { + return err + } + if n := int(ci.NumPending); n != expected { + return fmt.Errorf("Expected %v pending, got %v", expected, n) + } + return nil + }) + } + checkCount(10) + + time.Sleep(300 * time.Millisecond) + + // Check server restart + nc.Close() + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + checkCount(10) + + // Make sure messages can be consumed + sub := natsSubSync(t, nc, inbox) + for i := 0; i < 10; i++ { + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("i=%v next msg error: %v", i, err) + } + msg.AckSync() + } + checkCount(0) +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/memstore.go b/server/memstore.go index a31779f4..a6adb84f 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -986,6 +986,19 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { return nil } +// SetStarting sets our starting stream sequence. +func (o *consumerMemStore) SetStarting(sseq uint64) error { + o.mu.Lock() + o.state.Delivered.Stream = sseq + o.mu.Unlock() + return nil +} + +// HasState returns if this store has a recorded state. +func (o *consumerMemStore) HasState() bool { + return false +} + func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) error { o.mu.Lock() defer o.mu.Unlock() diff --git a/server/store.go b/server/store.go index e9ee6644..ed5786eb 100644 --- a/server/store.go +++ b/server/store.go @@ -170,6 +170,8 @@ type SnapshotResult struct { // ConsumerStore stores state on consumers for streams. type ConsumerStore interface { + SetStarting(sseq uint64) error + HasState() bool UpdateDelivered(dseq, sseq, dc uint64, ts int64) error UpdateAcks(dseq, sseq uint64) error UpdateConfig(cfg *ConsumerConfig) error diff --git a/server/stream.go b/server/stream.go index a90b7931..a65c665e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4307,7 +4307,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error obs.setCreatedTime(cfg.Created) } obs.mu.Lock() - _, err = obs.readStoredState(lseq) + err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { mset.stop(true, false)