mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Fix race between stream stop and monitorStream (#3350)
* Fix race between stream stop and monitorStream monitorCluster stops the stream, when doing so, monitorStream needs to be stopped to avoid miscounting of store size. In a test stop and reset of store size happened first and then was followed by storing more messages via monitorStream Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -1748,7 +1748,7 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
|
||||
}
|
||||
}
|
||||
|
||||
const usageTick = 1500 * time.Millisecond
|
||||
var usageTick = 1500 * time.Millisecond
|
||||
|
||||
func (jsa *jsAccount) sendClusterUsageUpdateTimer() {
|
||||
jsa.usageMu.Lock()
|
||||
|
||||
@@ -1478,6 +1478,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, rm *recoveryRemovals) (b
|
||||
if isRecovering {
|
||||
js.setStreamAssignmentRecovering(sa)
|
||||
}
|
||||
|
||||
js.processUpdateStreamAssignment(sa)
|
||||
default:
|
||||
panic("JetStream Cluster Unknown meta entry op type")
|
||||
@@ -1647,7 +1648,9 @@ func currentPeerCount(ci *ClusterInfo, peerSet []string, leaderId string) (curre
|
||||
func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnapshot bool) {
|
||||
s, cc := js.server(), js.cluster
|
||||
defer s.grWG.Done()
|
||||
|
||||
if mset != nil {
|
||||
defer mset.monitorWg.Done()
|
||||
}
|
||||
js.mu.RLock()
|
||||
n := sa.Group.node
|
||||
meta := cc.meta
|
||||
@@ -2008,6 +2011,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
// If we were successful lookup up our stream now.
|
||||
if err == nil {
|
||||
if mset, err = acc.lookupStream(sa.Config.Name); mset != nil {
|
||||
mset.monitorWg.Add(1)
|
||||
defer mset.monitorWg.Done()
|
||||
mset.setStreamAssignment(sa)
|
||||
// Make sure to update our updateC which would have been nil.
|
||||
uch = mset.updateC()
|
||||
@@ -2733,7 +2738,11 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
|
||||
node.StepDown(sa.Group.Preferred)
|
||||
}
|
||||
node.ProposeRemovePeer(ourID)
|
||||
// shut down monitor by shutting down raft
|
||||
node.Delete()
|
||||
}
|
||||
// wait for monitor to be shut down
|
||||
mset.monitorWg.Wait()
|
||||
mset.stop(true, false)
|
||||
}
|
||||
}
|
||||
@@ -2763,6 +2772,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
|
||||
mset.setLeader(false)
|
||||
js.createRaftGroup(acc.GetName(), rg, storage)
|
||||
}
|
||||
mset.monitorWg.Add(1)
|
||||
// Start monitoring..
|
||||
s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) })
|
||||
} else if numReplicas == 1 && alreadyRunning {
|
||||
@@ -2938,6 +2948,9 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
|
||||
// Start our monitoring routine.
|
||||
if rg.node != nil {
|
||||
if !alreadyRunning {
|
||||
if mset != nil {
|
||||
mset.monitorWg.Add(1)
|
||||
}
|
||||
s.startGoRoutine(func() { js.monitorStream(mset, sa, false) })
|
||||
}
|
||||
} else {
|
||||
@@ -3085,6 +3098,12 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
|
||||
// Go ahead and delete the stream if we have it and the account here.
|
||||
if acc, _ = s.LookupAccount(sa.Client.serviceAccount()); acc != nil {
|
||||
if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
|
||||
// shut down monitor by shutting down raft
|
||||
if n := mset.raftNode(); n != nil {
|
||||
n.Delete()
|
||||
}
|
||||
// wait for monitor to be shut down
|
||||
mset.monitorWg.Wait()
|
||||
err = mset.stop(true, wasLeader)
|
||||
stopped = true
|
||||
}
|
||||
|
||||
@@ -2713,6 +2713,12 @@ func TestJetStreamSuperClusterTagInducedMoveCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
|
||||
usageTickOld := usageTick
|
||||
usageTick = 250 * time.Millisecond
|
||||
defer func() {
|
||||
usageTick = usageTickOld
|
||||
}()
|
||||
|
||||
server := map[string]struct{}{}
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
@@ -2821,6 +2827,10 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
|
||||
require_NoError(t, err)
|
||||
defer ncsys.Close()
|
||||
|
||||
time.Sleep(2 * usageTick)
|
||||
aiBefore, err := js.AccountInfo()
|
||||
require_NoError(t, err)
|
||||
|
||||
for _, moveFromSrv := range streamPeerSrv {
|
||||
moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{Server: moveFromSrv, Tags: []string{emptySrv}})
|
||||
require_NoError(t, err)
|
||||
@@ -2846,6 +2856,15 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
|
||||
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return checkSrvInvariant(s, expectedPeers) })
|
||||
}
|
||||
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { return serverEmpty(emptySrv) })
|
||||
checkFor(t, 3*usageTick, 100*time.Millisecond, func() error {
|
||||
if aiAfter, err := js.AccountInfo(); err != nil {
|
||||
return err
|
||||
} else if aiAfter.Store != aiBefore.Store {
|
||||
return fmt.Errorf("store before %d and after %d don't match", aiBefore.Store, aiAfter.Store)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -237,6 +237,8 @@ type stream struct {
|
||||
// Direct get subscription.
|
||||
directSub *subscription
|
||||
lastBySub *subscription
|
||||
|
||||
monitorWg sync.WaitGroup
|
||||
}
|
||||
|
||||
type sourceInfo struct {
|
||||
|
||||
Reference in New Issue
Block a user