From c6e37cf7afdda697d8f18b4297f10279159b70fb Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 10 Aug 2022 19:01:21 +0200 Subject: [PATCH] Fix race between stream stop and monitorStream (#3350) * Fix race between stream stop and monitorStream monitorCluster stops the stream, when doing so, monitorStream needs to be stopped to avoid miscounting of store size. In a test stop and reset of store size happened first and then was followed by storing more messages via monitorStream Signed-off-by: Matthias Hanel --- server/jetstream.go | 2 +- server/jetstream_cluster.go | 21 ++++++++++++++++++++- server/jetstream_super_cluster_test.go | 19 +++++++++++++++++++ server/stream.go | 2 ++ 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 977b47fa..24580346 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1748,7 +1748,7 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta } } -const usageTick = 1500 * time.Millisecond +var usageTick = 1500 * time.Millisecond func (jsa *jsAccount) sendClusterUsageUpdateTimer() { jsa.usageMu.Lock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b9530142..3992618e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1478,6 +1478,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, rm *recoveryRemovals) (b if isRecovering { js.setStreamAssignmentRecovering(sa) } + js.processUpdateStreamAssignment(sa) default: panic("JetStream Cluster Unknown meta entry op type") @@ -1647,7 +1648,9 @@ func currentPeerCount(ci *ClusterInfo, peerSet []string, leaderId string) (curre func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnapshot bool) { s, cc := js.server(), js.cluster defer s.grWG.Done() - + if mset != nil { + defer mset.monitorWg.Done() + } js.mu.RLock() n := sa.Group.node meta := cc.meta @@ -2008,6 +2011,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // If we were successful lookup up our stream now. if err == nil { if mset, err = acc.lookupStream(sa.Config.Name); mset != nil { + mset.monitorWg.Add(1) + defer mset.monitorWg.Done() mset.setStreamAssignment(sa) // Make sure to update our updateC which would have been nil. uch = mset.updateC() @@ -2733,7 +2738,11 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { node.StepDown(sa.Group.Preferred) } node.ProposeRemovePeer(ourID) + // shut down monitor by shutting down raft + node.Delete() } + // wait for monitor to be shut down + mset.monitorWg.Wait() mset.stop(true, false) } } @@ -2763,6 +2772,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss mset.setLeader(false) js.createRaftGroup(acc.GetName(), rg, storage) } + mset.monitorWg.Add(1) // Start monitoring.. s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) }) } else if numReplicas == 1 && alreadyRunning { @@ -2938,6 +2948,9 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // Start our monitoring routine. if rg.node != nil { if !alreadyRunning { + if mset != nil { + mset.monitorWg.Add(1) + } s.startGoRoutine(func() { js.monitorStream(mset, sa, false) }) } } else { @@ -3085,6 +3098,12 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, // Go ahead and delete the stream if we have it and the account here. if acc, _ = s.LookupAccount(sa.Client.serviceAccount()); acc != nil { if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { + // shut down monitor by shutting down raft + if n := mset.raftNode(); n != nil { + n.Delete() + } + // wait for monitor to be shut down + mset.monitorWg.Wait() err = mset.stop(true, wasLeader) stopped = true } diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 70715e45..1e3fd644 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2713,6 +2713,12 @@ func TestJetStreamSuperClusterTagInducedMoveCancel(t *testing.T) { } func TestJetStreamSuperClusterMoveCancel(t *testing.T) { + usageTickOld := usageTick + usageTick = 250 * time.Millisecond + defer func() { + usageTick = usageTickOld + }() + server := map[string]struct{}{} sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, func(serverName, clusterName, storeDir, conf string) string { @@ -2821,6 +2827,10 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) { require_NoError(t, err) defer ncsys.Close() + time.Sleep(2 * usageTick) + aiBefore, err := js.AccountInfo() + require_NoError(t, err) + for _, moveFromSrv := range streamPeerSrv { moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{Server: moveFromSrv, Tags: []string{emptySrv}}) require_NoError(t, err) @@ -2846,6 +2856,15 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) { checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return checkSrvInvariant(s, expectedPeers) }) } checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { return serverEmpty(emptySrv) }) + checkFor(t, 3*usageTick, 100*time.Millisecond, func() error { + if aiAfter, err := js.AccountInfo(); err != nil { + return err + } else if aiAfter.Store != aiBefore.Store { + return fmt.Errorf("store before %d and after %d don't match", aiBefore.Store, aiAfter.Store) + } else { + return nil + } + }) } } diff --git a/server/stream.go b/server/stream.go index 159dbe84..af36aaa3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -237,6 +237,8 @@ type stream struct { // Direct get subscription. directSub *subscription lastBySub *subscription + + monitorWg sync.WaitGroup } type sourceInfo struct {