Merge pull request #1887 from nats-io/fixes

Noticed stream snapshots were being processed on recovery
This commit is contained in:
Derek Collison
2021-02-04 13:47:56 -07:00
committed by GitHub

View File

@@ -1318,7 +1318,9 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
var didSnap bool
for _, e := range ce.Entries {
if e.Type == EntrySnapshot {
mset.processSnapshot(e.Data)
if !isRecovering {
mset.processSnapshot(e.Data)
}
didSnap = true
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
@@ -3239,7 +3241,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, subject, reply strin
}
}
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now()}
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
cc.meta.Propose(encodeAddConsumerAssignment(ca))
}
@@ -3563,8 +3565,27 @@ func (mset *Stream) processSnapshot(buf []byte) {
js := s.getJetStream()
var sub *subscription
var err error
const activityInterval = 5 * time.Second
notActive := time.NewTimer(activityInterval)
defer notActive.Stop()
defer func() {
if sub != nil {
s.sysUnsubscribe(sub)
}
}()
RETRY:
// If we have a sub clear that here.
if sub != nil {
s.sysUnsubscribe(sub)
sub = nil
}
// Grab sync request again on failures.
if sreq == nil {
mset.mu.Lock()
@@ -3576,11 +3597,11 @@ RETRY:
}
}
msgsC := make(chan []byte, 1024)
msgsC := make(chan []byte, 8*1024)
// Send our catchup request here.
reply := syncReplySubject()
sub, err := s.sysSubscribe(reply, func(_ *subscription, _ *client, _, reply string, msg []byte) {
sub, err = s.sysSubscribe(reply, func(_ *subscription, _ *client, _, reply string, msg []byte) {
// Make copies - https://github.com/go101/go101/wiki
// TODO(dlc) - Since we are using a buffer from the inbound client/route.
if len(msg) > 0 {
@@ -3594,7 +3615,6 @@ RETRY:
if err != nil {
return
}
defer s.sysUnsubscribe(sub)
b, _ := json.Marshal(sreq)
s.sendInternalMsgLocked(subject, reply, nil, b)
@@ -3603,10 +3623,6 @@ RETRY:
last := sreq.LastSeq
sreq = nil
const activityInterval = 5 * time.Second
notActive := time.NewTimer(activityInterval)
defer notActive.Stop()
// Run our own select loop here.
for qch, lch := n.QuitC(), n.LeadChangeC(); ; {
select {
@@ -3616,7 +3632,6 @@ RETRY:
if len(msg) == 0 {
goto RETRY
}
if lseq, err := mset.processCatchupMsg(msg); err == nil {
if lseq >= last {
return
@@ -3626,6 +3641,7 @@ RETRY:
}
case <-notActive.C:
s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.Name())
notActive.Reset(activityInterval)
goto RETRY
case <-s.quitCh:
return