diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cb3aae71..7ddc4dbb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4271,17 +4271,15 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } // Check our state if we are under an interest based stream. o.checkStateForInterestStream() - } else if !recovering { - if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { - ne, nb := n.Applied(ce.Index) - ce.ReturnToPool() - // If we have at least min entries to compact, go ahead and snapshot/compact. - if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { - doSnapshot(false) - } - } else { - s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) + } else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { + ne, nb := n.Applied(ce.Index) + ce.ReturnToPool() + // If we have at least min entries to compact, go ahead and snapshot/compact. + if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { + doSnapshot(false) } + } else { + s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) } } aq.recycle(&ces) @@ -4290,6 +4288,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { js.setConsumerAssignmentRecovering(ca) } + // Synchronize everyone to our state. + if isLeader && n != nil { + // Only send out if we have state. + if _, _, applied := n.Progress(); applied > 0 { + if snap, err := o.store.EncodedState(); err == nil { + n.SendSnapshot(snap) + } + } + } + // Process the change. if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { doSnapshot(true) @@ -4389,26 +4397,27 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLeader bool) error { for _, e := range ce.Entries { if e.Type == EntrySnapshot { - // No-op needed? - state, err := decodeConsumerState(e.Data) - if err != nil { - if mset, node := o.streamAndNode(); mset != nil && node != nil { - s := js.srv - s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]", - mset.account(), mset.name(), o, node.Group()) + if !isLeader { + // No-op needed? + state, err := decodeConsumerState(e.Data) + if err != nil { + if mset, node := o.streamAndNode(); mset != nil && node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]", + mset.account(), mset.name(), o, node.Group()) + } + panic(err.Error()) } - panic(err.Error()) - } - - if err = o.store.Update(state); err != nil { - o.mu.RLock() - s, acc, mset, name := o.srv, o.acc, o.mset, o.name - o.mu.RUnlock() - if s != nil && mset != nil { - s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err) + if err = o.store.Update(state); err != nil { + o.mu.RLock() + s, acc, mset, name := o.srv, o.acc, o.mset, o.name + o.mu.RUnlock() + if s != nil && mset != nil { + s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err) + } + } else { + o.checkStateForInterestStream() } - } else { - o.checkStateForInterestStream() } } else if e.Type == EntryRemovePeer { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 775ea9c5..a1c8353b 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -5784,9 +5784,6 @@ func TestJetStreamClusterConsumerDeliverNewMaxRedeliveriesAndServerRestart(t *te t.Fatalf("Expected timeout, got msg=%+v err=%v", msg, err) } - // Give a chance to things to be persisted - time.Sleep(300 * time.Millisecond) - // Check server restart nc.Close() c.stopAll()