diff --git a/.github/workflows/stale-issues.yaml b/.github/workflows/stale-issues.yaml new file mode 100644 index 00000000..1d518508 --- /dev/null +++ b/.github/workflows/stale-issues.yaml @@ -0,0 +1,21 @@ +name: stale-issues +on: + schedule: + - cron: "30 1 * * *" + +permissions: + issues: write + pull-requests: write + +jobs: + stale: + runs-on: ubuntu-latest + + steps: + - uses: actions/stale@v8 + with: + debug-only: true # Set until the behavior is tuned. + days-before-stale: 56 # Mark stale after 8 weeks (56 days) of inactivity + days-before-close: -1 # Disable auto-closing + exempt-all-milestones: true # Any issue/PR within a milestone will be omitted + #exempt-assigness: "foo,bar" # Exempt issues/PRs assigned to particular users diff --git a/server/const.go b/server/const.go index ff1eea5f..713506ce 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.0-beta.38" + VERSION = "2.10.0-beta.39" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream.go b/server/jetstream.go index e72d78a9..a225834d 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1819,6 +1819,17 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) } s := js.srv + // We need to collect the stream stores before we acquire the usage lock since in storeUpdates the + // stream lock could be held if deletion are inline with storing a new message, e.g. via limits. + var stores []StreamStore + for _, mset := range jsa.streams { + mset.mu.RLock() + if mset.tier == tierName && mset.stype == storeType && mset.store != nil { + stores = append(stores, mset.store) + } + mset.mu.RUnlock() + } + // Now range and qualify, hold usage lock to prevent updates. jsa.usageMu.Lock() defer jsa.usageMu.Unlock() @@ -1828,15 +1839,12 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) return } + // Collect current total for all stream stores that matched. var total int64 var state StreamState - for _, mset := range jsa.streams { - mset.mu.RLock() - if mset.tier == tierName && mset.stype == storeType { - mset.store.FastState(&state) - total += int64(state.Bytes) - } - mset.mu.RUnlock() + for _, store := range stores { + store.FastState(&state) + total += int64(state.Bytes) } var needClusterUpdate bool diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 60354874..0c6e0fd7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -456,6 +456,9 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { } // Make sure to clear out the raft node if still present in the meta layer. if rg := sa.Group; rg != nil && rg.node != nil { + if rg.node.State() != Closed { + rg.node.Stop() + } rg.node = nil } js.mu.Unlock() @@ -493,7 +496,7 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { // For R1 it will make sure the stream is present on this server. func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { js.mu.Lock() - cc := js.cluster + s, cc := js.srv, js.cluster if cc == nil { // Non-clustered mode js.mu.Unlock() @@ -523,7 +526,12 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { if !mset.isCatchingUp() { return true } + } else if node != nil && node != mset.raftNode() { + s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName) + node.Delete() + mset.resetClusteredState(nil) } + return false } @@ -7590,6 +7598,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { } } + // Do not let this go on forever. + const maxRetries = 3 + var numRetries int + RETRY: // On retry, we need to release the semaphore we got. Call will be no-op // if releaseSem boolean has not been set to true on successfully getting @@ -7606,13 +7618,20 @@ RETRY: sub = nil } - // Block here if we have too many requests in flight. - <-s.syncOutSem - releaseSem = true if !s.isRunning() { return ErrServerNotRunning } + numRetries++ + if numRetries >= maxRetries { + // Force a hard reset here. + return errFirstSequenceMismatch + } + + // Block here if we have too many requests in flight. + <-s.syncOutSem + releaseSem = true + // We may have been blocked for a bit, so the reset need to ensure that we // consume the already fired timer. if !notActive.Stop() {