mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Make sure to process consumer entries on recovery in case state was not committed.
Also sync other consumers when taking over as leader but no need to process snapshots when we are in fact the leader. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -4271,17 +4271,15 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
}
|
||||
// Check our state if we are under an interest based stream.
|
||||
o.checkStateForInterestStream()
|
||||
} else if !recovering {
|
||||
if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
|
||||
ne, nb := n.Applied(ce.Index)
|
||||
ce.ReturnToPool()
|
||||
// If we have at least min entries to compact, go ahead and snapshot/compact.
|
||||
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
|
||||
doSnapshot(false)
|
||||
}
|
||||
} else {
|
||||
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
|
||||
} else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
|
||||
ne, nb := n.Applied(ce.Index)
|
||||
ce.ReturnToPool()
|
||||
// If we have at least min entries to compact, go ahead and snapshot/compact.
|
||||
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
|
||||
doSnapshot(false)
|
||||
}
|
||||
} else {
|
||||
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
|
||||
}
|
||||
}
|
||||
aq.recycle(&ces)
|
||||
@@ -4290,6 +4288,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
js.setConsumerAssignmentRecovering(ca)
|
||||
}
|
||||
|
||||
// Synchronize everyone to our state.
|
||||
if isLeader && n != nil {
|
||||
// Only send out if we have state.
|
||||
if _, _, applied := n.Progress(); applied > 0 {
|
||||
if snap, err := o.store.EncodedState(); err == nil {
|
||||
n.SendSnapshot(snap)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process the change.
|
||||
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
|
||||
doSnapshot(true)
|
||||
@@ -4389,26 +4397,27 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLeader bool) error {
|
||||
for _, e := range ce.Entries {
|
||||
if e.Type == EntrySnapshot {
|
||||
// No-op needed?
|
||||
state, err := decodeConsumerState(e.Data)
|
||||
if err != nil {
|
||||
if mset, node := o.streamAndNode(); mset != nil && node != nil {
|
||||
s := js.srv
|
||||
s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]",
|
||||
mset.account(), mset.name(), o, node.Group())
|
||||
if !isLeader {
|
||||
// No-op needed?
|
||||
state, err := decodeConsumerState(e.Data)
|
||||
if err != nil {
|
||||
if mset, node := o.streamAndNode(); mset != nil && node != nil {
|
||||
s := js.srv
|
||||
s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]",
|
||||
mset.account(), mset.name(), o, node.Group())
|
||||
}
|
||||
panic(err.Error())
|
||||
}
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
if err = o.store.Update(state); err != nil {
|
||||
o.mu.RLock()
|
||||
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
|
||||
o.mu.RUnlock()
|
||||
if s != nil && mset != nil {
|
||||
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
|
||||
if err = o.store.Update(state); err != nil {
|
||||
o.mu.RLock()
|
||||
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
|
||||
o.mu.RUnlock()
|
||||
if s != nil && mset != nil {
|
||||
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
|
||||
}
|
||||
} else {
|
||||
o.checkStateForInterestStream()
|
||||
}
|
||||
} else {
|
||||
o.checkStateForInterestStream()
|
||||
}
|
||||
|
||||
} else if e.Type == EntryRemovePeer {
|
||||
|
||||
@@ -5784,9 +5784,6 @@ func TestJetStreamClusterConsumerDeliverNewMaxRedeliveriesAndServerRestart(t *te
|
||||
t.Fatalf("Expected timeout, got msg=%+v err=%v", msg, err)
|
||||
}
|
||||
|
||||
// Give a chance to things to be persisted
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Check server restart
|
||||
nc.Close()
|
||||
c.stopAll()
|
||||
|
||||
Reference in New Issue
Block a user