mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge branch 'main' into dev
This commit is contained in:
21
.github/workflows/stale-issues.yaml
vendored
Normal file
21
.github/workflows/stale-issues.yaml
vendored
Normal file
@@ -0,0 +1,21 @@
|
||||
name: stale-issues
|
||||
on:
|
||||
schedule:
|
||||
- cron: "30 1 * * *"
|
||||
|
||||
permissions:
|
||||
issues: write
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
stale:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/stale@v8
|
||||
with:
|
||||
debug-only: true # Set until the behavior is tuned.
|
||||
days-before-stale: 56 # Mark stale after 8 weeks (56 days) of inactivity
|
||||
days-before-close: -1 # Disable auto-closing
|
||||
exempt-all-milestones: true # Any issue/PR within a milestone will be omitted
|
||||
#exempt-assigness: "foo,bar" # Exempt issues/PRs assigned to particular users
|
||||
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.10.0-beta.38"
|
||||
VERSION = "2.10.0-beta.39"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -1819,6 +1819,17 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType)
|
||||
}
|
||||
s := js.srv
|
||||
|
||||
// We need to collect the stream stores before we acquire the usage lock since in storeUpdates the
|
||||
// stream lock could be held if deletion are inline with storing a new message, e.g. via limits.
|
||||
var stores []StreamStore
|
||||
for _, mset := range jsa.streams {
|
||||
mset.mu.RLock()
|
||||
if mset.tier == tierName && mset.stype == storeType && mset.store != nil {
|
||||
stores = append(stores, mset.store)
|
||||
}
|
||||
mset.mu.RUnlock()
|
||||
}
|
||||
|
||||
// Now range and qualify, hold usage lock to prevent updates.
|
||||
jsa.usageMu.Lock()
|
||||
defer jsa.usageMu.Unlock()
|
||||
@@ -1828,15 +1839,12 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType)
|
||||
return
|
||||
}
|
||||
|
||||
// Collect current total for all stream stores that matched.
|
||||
var total int64
|
||||
var state StreamState
|
||||
for _, mset := range jsa.streams {
|
||||
mset.mu.RLock()
|
||||
if mset.tier == tierName && mset.stype == storeType {
|
||||
mset.store.FastState(&state)
|
||||
total += int64(state.Bytes)
|
||||
}
|
||||
mset.mu.RUnlock()
|
||||
for _, store := range stores {
|
||||
store.FastState(&state)
|
||||
total += int64(state.Bytes)
|
||||
}
|
||||
|
||||
var needClusterUpdate bool
|
||||
|
||||
@@ -456,6 +456,9 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
|
||||
}
|
||||
// Make sure to clear out the raft node if still present in the meta layer.
|
||||
if rg := sa.Group; rg != nil && rg.node != nil {
|
||||
if rg.node.State() != Closed {
|
||||
rg.node.Stop()
|
||||
}
|
||||
rg.node = nil
|
||||
}
|
||||
js.mu.Unlock()
|
||||
@@ -493,7 +496,7 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
|
||||
// For R1 it will make sure the stream is present on this server.
|
||||
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
|
||||
js.mu.Lock()
|
||||
cc := js.cluster
|
||||
s, cc := js.srv, js.cluster
|
||||
if cc == nil {
|
||||
// Non-clustered mode
|
||||
js.mu.Unlock()
|
||||
@@ -523,7 +526,12 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
|
||||
if !mset.isCatchingUp() {
|
||||
return true
|
||||
}
|
||||
} else if node != nil && node != mset.raftNode() {
|
||||
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
|
||||
node.Delete()
|
||||
mset.resetClusteredState(nil)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -7590,6 +7598,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Do not let this go on forever.
|
||||
const maxRetries = 3
|
||||
var numRetries int
|
||||
|
||||
RETRY:
|
||||
// On retry, we need to release the semaphore we got. Call will be no-op
|
||||
// if releaseSem boolean has not been set to true on successfully getting
|
||||
@@ -7606,13 +7618,20 @@ RETRY:
|
||||
sub = nil
|
||||
}
|
||||
|
||||
// Block here if we have too many requests in flight.
|
||||
<-s.syncOutSem
|
||||
releaseSem = true
|
||||
if !s.isRunning() {
|
||||
return ErrServerNotRunning
|
||||
}
|
||||
|
||||
numRetries++
|
||||
if numRetries >= maxRetries {
|
||||
// Force a hard reset here.
|
||||
return errFirstSequenceMismatch
|
||||
}
|
||||
|
||||
// Block here if we have too many requests in flight.
|
||||
<-s.syncOutSem
|
||||
releaseSem = true
|
||||
|
||||
// We may have been blocked for a bit, so the reset need to ensure that we
|
||||
// consume the already fired timer.
|
||||
if !notActive.Stop() {
|
||||
|
||||
Reference in New Issue
Block a user