From cc77d662bb0c126cec4df10020e8bd727cc6de52 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 13 Apr 2023 18:22:13 -0700 Subject: [PATCH] Make sure to process consumer entries on recovery in case state was not committed. Also sync other consumers when taking over as leader but no need to process snapshots when we are in fact the leader. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 65 +++++++++++++++++------------- server/jetstream_cluster_2_test.go | 3 -- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cb3aae71..7ddc4dbb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4271,17 +4271,15 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } // Check our state if we are under an interest based stream. o.checkStateForInterestStream() - } else if !recovering { - if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { - ne, nb := n.Applied(ce.Index) - ce.ReturnToPool() - // If we have at least min entries to compact, go ahead and snapshot/compact. - if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { - doSnapshot(false) - } - } else { - s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) + } else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { + ne, nb := n.Applied(ce.Index) + ce.ReturnToPool() + // If we have at least min entries to compact, go ahead and snapshot/compact. + if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { + doSnapshot(false) } + } else { + s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) } } aq.recycle(&ces) @@ -4290,6 +4288,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { js.setConsumerAssignmentRecovering(ca) } + // Synchronize everyone to our state. + if isLeader && n != nil { + // Only send out if we have state. + if _, _, applied := n.Progress(); applied > 0 { + if snap, err := o.store.EncodedState(); err == nil { + n.SendSnapshot(snap) + } + } + } + // Process the change. if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { doSnapshot(true) @@ -4389,26 +4397,27 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLeader bool) error { for _, e := range ce.Entries { if e.Type == EntrySnapshot { - // No-op needed? - state, err := decodeConsumerState(e.Data) - if err != nil { - if mset, node := o.streamAndNode(); mset != nil && node != nil { - s := js.srv - s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]", - mset.account(), mset.name(), o, node.Group()) + if !isLeader { + // No-op needed? + state, err := decodeConsumerState(e.Data) + if err != nil { + if mset, node := o.streamAndNode(); mset != nil && node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]", + mset.account(), mset.name(), o, node.Group()) + } + panic(err.Error()) } - panic(err.Error()) - } - - if err = o.store.Update(state); err != nil { - o.mu.RLock() - s, acc, mset, name := o.srv, o.acc, o.mset, o.name - o.mu.RUnlock() - if s != nil && mset != nil { - s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err) + if err = o.store.Update(state); err != nil { + o.mu.RLock() + s, acc, mset, name := o.srv, o.acc, o.mset, o.name + o.mu.RUnlock() + if s != nil && mset != nil { + s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err) + } + } else { + o.checkStateForInterestStream() } - } else { - o.checkStateForInterestStream() } } else if e.Type == EntryRemovePeer { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 775ea9c5..a1c8353b 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -5784,9 +5784,6 @@ func TestJetStreamClusterConsumerDeliverNewMaxRedeliveriesAndServerRestart(t *te t.Fatalf("Expected timeout, got msg=%+v err=%v", msg, err) } - // Give a chance to things to be persisted - time.Sleep(300 * time.Millisecond) - // Check server restart nc.Close() c.stopAll()