mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 02:30:40 -07:00
Merge pull request #3162 from nats-io/restart_adv_3
Mark meta recovering state and use to suppress api responses and api audits during restarts.
This commit is contained in:
@@ -94,23 +94,24 @@ type JetStreamAPIStats struct {
|
||||
// This is for internal accounting for JetStream for this server.
|
||||
type jetStream struct {
|
||||
// These are here first because of atomics on 32bit systems.
|
||||
apiInflight int64
|
||||
apiTotal int64
|
||||
apiErrors int64
|
||||
memReserved int64
|
||||
storeReserved int64
|
||||
memUsed int64
|
||||
storeUsed int64
|
||||
clustered int32
|
||||
mu sync.RWMutex
|
||||
srv *Server
|
||||
config JetStreamConfig
|
||||
cluster *jetStreamCluster
|
||||
accounts map[string]*jsAccount
|
||||
apiSubs *Sublist
|
||||
standAlone bool
|
||||
disabled bool
|
||||
oos bool
|
||||
apiInflight int64
|
||||
apiTotal int64
|
||||
apiErrors int64
|
||||
memReserved int64
|
||||
storeReserved int64
|
||||
memUsed int64
|
||||
storeUsed int64
|
||||
clustered int32
|
||||
mu sync.RWMutex
|
||||
srv *Server
|
||||
config JetStreamConfig
|
||||
cluster *jetStreamCluster
|
||||
accounts map[string]*jsAccount
|
||||
apiSubs *Sublist
|
||||
metaRecovering bool
|
||||
standAlone bool
|
||||
disabled bool
|
||||
oos bool
|
||||
}
|
||||
|
||||
type remoteUsage struct {
|
||||
|
||||
@@ -813,6 +813,30 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b
|
||||
return false
|
||||
}
|
||||
|
||||
// Mark that the meta layer is recovering.
|
||||
func (js *jetStream) setMetaRecovering() {
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
if js.cluster != nil {
|
||||
// metaRecovering
|
||||
js.metaRecovering = true
|
||||
}
|
||||
}
|
||||
|
||||
// Mark that the meta layer is no longer recovering.
|
||||
func (js *jetStream) clearMetaRecovering() {
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
js.metaRecovering = false
|
||||
}
|
||||
|
||||
// Return whether the meta layer is recovering.
|
||||
func (js *jetStream) isMetaRecovering() bool {
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
return js.metaRecovering
|
||||
}
|
||||
|
||||
// During recovery track any stream and consumer delete operations.
|
||||
type recoveryRemovals struct {
|
||||
streams map[string]*streamAssignment
|
||||
@@ -841,17 +865,16 @@ func (js *jetStream) monitorCluster() {
|
||||
isLeader bool
|
||||
lastSnap []byte
|
||||
lastSnapTime time.Time
|
||||
isRecovering bool
|
||||
beenLeader bool
|
||||
)
|
||||
|
||||
// Set to true to start.
|
||||
isRecovering = true
|
||||
js.setMetaRecovering()
|
||||
|
||||
// Snapshotting function.
|
||||
doSnapshot := func() {
|
||||
// Suppress during recovery.
|
||||
if isRecovering {
|
||||
if js.isMetaRecovering() {
|
||||
return
|
||||
}
|
||||
if snap := js.metaSnapshot(); !bytes.Equal(lastSnap, snap) {
|
||||
@@ -878,7 +901,8 @@ func (js *jetStream) monitorCluster() {
|
||||
for _, cei := range ces {
|
||||
if cei == nil {
|
||||
// Signals we have replayed all of our metadata.
|
||||
isRecovering = false
|
||||
js.clearMetaRecovering()
|
||||
|
||||
// Process any removes that are still valid after recovery.
|
||||
for _, sa := range rm.streams {
|
||||
js.processStreamRemoval(sa)
|
||||
@@ -893,7 +917,7 @@ func (js *jetStream) monitorCluster() {
|
||||
}
|
||||
ce := cei.(*CommittedEntry)
|
||||
// FIXME(dlc) - Deal with errors.
|
||||
if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, isRecovering, rm); err == nil {
|
||||
if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, rm); err == nil {
|
||||
_, nb := n.Applied(ce.Index)
|
||||
if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) {
|
||||
// Since we received one make sure we have our own since we do not store
|
||||
@@ -1042,7 +1066,7 @@ func (js *jetStream) metaSnapshot() []byte {
|
||||
return s2.EncodeBetter(nil, b)
|
||||
}
|
||||
|
||||
func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error {
|
||||
func (js *jetStream) applyMetaSnapshot(buf []byte) error {
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -1111,6 +1135,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
isRecovering := js.metaRecovering
|
||||
js.mu.Unlock()
|
||||
|
||||
// Do removals first.
|
||||
@@ -1125,12 +1150,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error {
|
||||
if isRecovering {
|
||||
js.setStreamAssignmentRecovering(sa)
|
||||
}
|
||||
// Let the processConsumerAssignment add them in first.
|
||||
consumers := sa.consumers
|
||||
sa.consumers = nil
|
||||
js.processStreamAssignment(sa)
|
||||
// We can simply add the consumers.
|
||||
for _, ca := range consumers {
|
||||
for _, ca := range sa.consumers {
|
||||
if isRecovering {
|
||||
js.setConsumerAssignmentRecovering(ca)
|
||||
}
|
||||
@@ -1388,11 +1410,13 @@ func (js *jetStream) hasPeerEntries(entries []*Entry) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool, rm *recoveryRemovals) (bool, bool, error) {
|
||||
func (js *jetStream) applyMetaEntries(entries []*Entry, rm *recoveryRemovals) (bool, bool, error) {
|
||||
var didSnap, didRemove bool
|
||||
isRecovering := js.isMetaRecovering()
|
||||
|
||||
for _, e := range entries {
|
||||
if e.Type == EntrySnapshot {
|
||||
js.applyMetaSnapshot(e.Data, isRecovering)
|
||||
js.applyMetaSnapshot(e.Data)
|
||||
didSnap = true
|
||||
} else if e.Type == EntryRemovePeer {
|
||||
if !isRecovering {
|
||||
@@ -3181,9 +3205,9 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
}
|
||||
// Check if we already had a consumer assignment and its still pending.
|
||||
cca, oca := ca, o.consumerAssignment()
|
||||
o.mu.Lock()
|
||||
o.mu.RLock()
|
||||
leader := o.isLeader()
|
||||
o.mu.Unlock()
|
||||
o.mu.RUnlock()
|
||||
|
||||
var sendState bool
|
||||
js.mu.Lock()
|
||||
@@ -3288,14 +3312,17 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
if !alreadyRunning {
|
||||
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
|
||||
}
|
||||
if wasExisting && (o.isLeader() || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
|
||||
// Process if existing as an update.
|
||||
js.mu.RLock()
|
||||
client, subject, reply := ca.Client, ca.Subject, ca.Reply
|
||||
js.mu.RUnlock()
|
||||
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
|
||||
resp.ConsumerInfo = o.info()
|
||||
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
|
||||
// Only send response if not recovering.
|
||||
if !js.isMetaRecovering() {
|
||||
if wasExisting && (o.isLeader() || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
|
||||
// Process if existing as an update.
|
||||
js.mu.RLock()
|
||||
client, subject, reply := ca.Client, ca.Subject, ca.Reply
|
||||
js.mu.RUnlock()
|
||||
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
|
||||
resp.ConsumerInfo = o.info()
|
||||
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4037,7 +4064,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
|
||||
}
|
||||
|
||||
// If we have been signaled to check the streams, this is for a bug that left stream
|
||||
// assignments with no sync subject after and update and no way to sync/catchup outside of the RAFT layer.
|
||||
// assignments with no sync subject after an update and no way to sync/catchup outside of the RAFT layer.
|
||||
if isLeader && js.cluster.streamsCheck {
|
||||
cc := js.cluster
|
||||
for acc, asa := range cc.streams {
|
||||
|
||||
Reference in New Issue
Block a user