diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7a15b7f2..b121387f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1318,7 +1318,9 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco var didSnap bool for _, e := range ce.Entries { if e.Type == EntrySnapshot { - mset.processSnapshot(e.Data) + if !isRecovering { + mset.processSnapshot(e.Data) + } didSnap = true } else if e.Type == EntryRemovePeer { js.mu.RLock() @@ -3239,7 +3241,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, subject, reply strin } } - ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now()} + ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} cc.meta.Propose(encodeAddConsumerAssignment(ca)) } @@ -3563,8 +3565,27 @@ func (mset *Stream) processSnapshot(buf []byte) { js := s.getJetStream() + var sub *subscription + var err error + + const activityInterval = 5 * time.Second + notActive := time.NewTimer(activityInterval) + defer notActive.Stop() + + defer func() { + if sub != nil { + s.sysUnsubscribe(sub) + } + }() + RETRY: + // If we have a sub clear that here. + if sub != nil { + s.sysUnsubscribe(sub) + sub = nil + } + // Grab sync request again on failures. if sreq == nil { mset.mu.Lock() @@ -3576,11 +3597,11 @@ RETRY: } } - msgsC := make(chan []byte, 1024) + msgsC := make(chan []byte, 8*1024) // Send our catchup request here. reply := syncReplySubject() - sub, err := s.sysSubscribe(reply, func(_ *subscription, _ *client, _, reply string, msg []byte) { + sub, err = s.sysSubscribe(reply, func(_ *subscription, _ *client, _, reply string, msg []byte) { // Make copies - https://github.com/go101/go101/wiki // TODO(dlc) - Since we are using a buffer from the inbound client/route. if len(msg) > 0 { @@ -3594,7 +3615,6 @@ RETRY: if err != nil { return } - defer s.sysUnsubscribe(sub) b, _ := json.Marshal(sreq) s.sendInternalMsgLocked(subject, reply, nil, b) @@ -3603,10 +3623,6 @@ RETRY: last := sreq.LastSeq sreq = nil - const activityInterval = 5 * time.Second - notActive := time.NewTimer(activityInterval) - defer notActive.Stop() - // Run our own select loop here. for qch, lch := n.QuitC(), n.LeadChangeC(); ; { select { @@ -3616,7 +3632,6 @@ RETRY: if len(msg) == 0 { goto RETRY } - if lseq, err := mset.processCatchupMsg(msg); err == nil { if lseq >= last { return @@ -3626,6 +3641,7 @@ RETRY: } case <-notActive.C: s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.Name()) + notActive.Reset(activityInterval) goto RETRY case <-s.quitCh: return