diff --git a/server/jetstream.go b/server/jetstream.go index e9f10ce2..0504e38d 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -94,23 +94,24 @@ type JetStreamAPIStats struct { // This is for internal accounting for JetStream for this server. type jetStream struct { // These are here first because of atomics on 32bit systems. - apiInflight int64 - apiTotal int64 - apiErrors int64 - memReserved int64 - storeReserved int64 - memUsed int64 - storeUsed int64 - clustered int32 - mu sync.RWMutex - srv *Server - config JetStreamConfig - cluster *jetStreamCluster - accounts map[string]*jsAccount - apiSubs *Sublist - standAlone bool - disabled bool - oos bool + apiInflight int64 + apiTotal int64 + apiErrors int64 + memReserved int64 + storeReserved int64 + memUsed int64 + storeUsed int64 + clustered int32 + mu sync.RWMutex + srv *Server + config JetStreamConfig + cluster *jetStreamCluster + accounts map[string]*jsAccount + apiSubs *Sublist + metaRecovering bool + standAlone bool + disabled bool + oos bool } type remoteUsage struct { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c5474087..375994c7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -813,6 +813,30 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b return false } +// Mark that the meta layer is recovering. +func (js *jetStream) setMetaRecovering() { + js.mu.Lock() + defer js.mu.Unlock() + if js.cluster != nil { + // metaRecovering + js.metaRecovering = true + } +} + +// Mark that the meta layer is no longer recovering. +func (js *jetStream) clearMetaRecovering() { + js.mu.Lock() + defer js.mu.Unlock() + js.metaRecovering = false +} + +// Return whether the meta layer is recovering. +func (js *jetStream) isMetaRecovering() bool { + js.mu.RLock() + defer js.mu.RUnlock() + return js.metaRecovering +} + // During recovery track any stream and consumer delete operations. type recoveryRemovals struct { streams map[string]*streamAssignment @@ -841,17 +865,16 @@ func (js *jetStream) monitorCluster() { isLeader bool lastSnap []byte lastSnapTime time.Time - isRecovering bool beenLeader bool ) // Set to true to start. - isRecovering = true + js.setMetaRecovering() // Snapshotting function. doSnapshot := func() { // Suppress during recovery. - if isRecovering { + if js.isMetaRecovering() { return } if snap := js.metaSnapshot(); !bytes.Equal(lastSnap, snap) { @@ -878,7 +901,8 @@ func (js *jetStream) monitorCluster() { for _, cei := range ces { if cei == nil { // Signals we have replayed all of our metadata. - isRecovering = false + js.clearMetaRecovering() + // Process any removes that are still valid after recovery. for _, sa := range rm.streams { js.processStreamRemoval(sa) @@ -893,7 +917,7 @@ func (js *jetStream) monitorCluster() { } ce := cei.(*CommittedEntry) // FIXME(dlc) - Deal with errors. - if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, isRecovering, rm); err == nil { + if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, rm); err == nil { _, nb := n.Applied(ce.Index) if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) { // Since we received one make sure we have our own since we do not store @@ -1042,7 +1066,7 @@ func (js *jetStream) metaSnapshot() []byte { return s2.EncodeBetter(nil, b) } -func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error { +func (js *jetStream) applyMetaSnapshot(buf []byte) error { if len(buf) == 0 { return nil } @@ -1111,6 +1135,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error { } } } + isRecovering := js.metaRecovering js.mu.Unlock() // Do removals first. @@ -1125,12 +1150,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error { if isRecovering { js.setStreamAssignmentRecovering(sa) } - // Let the processConsumerAssignment add them in first. - consumers := sa.consumers - sa.consumers = nil js.processStreamAssignment(sa) // We can simply add the consumers. - for _, ca := range consumers { + for _, ca := range sa.consumers { if isRecovering { js.setConsumerAssignmentRecovering(ca) } @@ -1388,11 +1410,13 @@ func (js *jetStream) hasPeerEntries(entries []*Entry) bool { return false } -func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool, rm *recoveryRemovals) (bool, bool, error) { +func (js *jetStream) applyMetaEntries(entries []*Entry, rm *recoveryRemovals) (bool, bool, error) { var didSnap, didRemove bool + isRecovering := js.isMetaRecovering() + for _, e := range entries { if e.Type == EntrySnapshot { - js.applyMetaSnapshot(e.Data, isRecovering) + js.applyMetaSnapshot(e.Data) didSnap = true } else if e.Type == EntryRemovePeer { if !isRecovering { @@ -3181,9 +3205,9 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } // Check if we already had a consumer assignment and its still pending. cca, oca := ca, o.consumerAssignment() - o.mu.Lock() + o.mu.RLock() leader := o.isLeader() - o.mu.Unlock() + o.mu.RUnlock() var sendState bool js.mu.Lock() @@ -3288,14 +3312,17 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state if !alreadyRunning { s.startGoRoutine(func() { js.monitorConsumer(o, ca) }) } - if wasExisting && (o.isLeader() || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) { - // Process if existing as an update. - js.mu.RLock() - client, subject, reply := ca.Client, ca.Subject, ca.Reply - js.mu.RUnlock() - var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - resp.ConsumerInfo = o.info() - s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + // Only send response if not recovering. + if !js.isMetaRecovering() { + if wasExisting && (o.isLeader() || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) { + // Process if existing as an update. + js.mu.RLock() + client, subject, reply := ca.Client, ca.Subject, ca.Reply + js.mu.RUnlock() + var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} + resp.ConsumerInfo = o.info() + s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + } } } } @@ -4037,7 +4064,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) { } // If we have been signaled to check the streams, this is for a bug that left stream - // assignments with no sync subject after and update and no way to sync/catchup outside of the RAFT layer. + // assignments with no sync subject after an update and no way to sync/catchup outside of the RAFT layer. if isLeader && js.cluster.streamsCheck { cc := js.cluster for acc, asa := range cc.streams {