From 8729ff4015e361a6419176e5b72fb8c89bb0c45e Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 23 Feb 2023 10:48:20 +0000 Subject: [PATCH 1/5] Set code owners, tweak PR template --- .github/CODEOWNERS | 1 + .github/PULL_REQUEST_TEMPLATE.md | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..f77be596 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @nats-io/server diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 2dd0085a..c55a8777 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -13,5 +13,3 @@ Resolves # - - - - -/cc @nats-io/core From d347cb116ab31377e40625e1fe8258755fa63f29 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 23 Feb 2023 10:06:59 -0800 Subject: [PATCH 2/5] When becoming leader optionally send current snapshot to followers if caught up. This can help sync on restarts and improve ghost ephemerals. Also added more code to suppress respnses and API audits when we know we are recovering. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 132 +++++++++++++++++++----------------- server/norace_test.go | 113 ++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+), 61 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 153fab1c..99e98786 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -127,9 +127,10 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - responded bool - err error + consumers map[string]*consumerAssignment + responded bool + recovering bool + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -144,9 +145,10 @@ type consumerAssignment struct { Reply string `json:"reply"` State *ConsumerState `json:"state,omitempty"` // Internal - responded bool - deleted bool - err error + responded bool + recovering bool + deleted bool + err error } // streamPurge is what the stream leader will replicate when purging a stream. @@ -987,7 +989,6 @@ func (js *jetStream) monitorCluster() { isLeader bool lastSnap []byte lastSnapTime time.Time - beenLeader bool ) // Highwayhash key for generating hashes. @@ -998,17 +999,21 @@ func (js *jetStream) monitorCluster() { js.setMetaRecovering() // Snapshotting function. - doSnapshot := func() { + doSnapshot := func() []byte { // Suppress during recovery. if js.isMetaRecovering() { - return + return nil } snap := js.metaSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() + return snap + } else { + s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } + return nil } ru := &recoveryUpdates{ @@ -1059,6 +1064,15 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) + // If we processed a snapshot and are recovering remove our pending state. + if didSnap && js.isMetaRecovering() { + ru = &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]*consumerAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]*consumerAssignment), + } + } if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. @@ -1070,16 +1084,16 @@ func (js *jetStream) monitorCluster() { } aq.recycle(&ces) case isLeader = <-lch: - // We want to make sure we are updated on statsz so ping the extended cluster. + js.processLeaderChange(isLeader) + if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) - } - js.processLeaderChange(isLeader) - if isLeader && !beenLeader { - beenLeader = true - if n.NeedSnapshot() { - if err := n.InstallSnapshot(js.metaSnapshot()); err != nil { - s.Warnf("Error snapshotting JetStream cluster state: %v", err) + // Install a snapshot as we become leader. We will also send to the cluster. + if snap := doSnapshot(); snap != nil { + // If we are caught up distribute our current state to followers. + if ne, _ := n.Size(); ne == 0 { + // Send our snapshot to others to make sure all in sync. + n.SendSnapshot(snap) } } js.checkClusterSize() @@ -1279,27 +1293,20 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { } } } - isRecovering := js.metaRecovering js.mu.Unlock() // Do removals first. for _, sa := range saDel { - if isRecovering { - js.setStreamAssignmentRecovering(sa) - } + js.setStreamAssignmentRecovering(sa) js.processStreamRemoval(sa) } // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { - if isRecovering { - js.setStreamAssignmentRecovering(sa) - } + js.setStreamAssignmentRecovering(sa) js.processStreamAssignment(sa) // We can simply add the consumers. for _, ca := range sa.consumers { - if isRecovering { - js.setConsumerAssignmentRecovering(ca) - } + js.setConsumerAssignmentRecovering(ca) js.processConsumerAssignment(ca) } } @@ -1307,23 +1314,17 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // Perform updates on those in saChk. These were existing so make // sure to process any changes. for _, sa := range saChk { - if isRecovering { - js.setStreamAssignmentRecovering(sa) - } + js.setStreamAssignmentRecovering(sa) js.processUpdateStreamAssignment(sa) } // Now do the deltas for existing stream's consumers. for _, ca := range caDel { - if isRecovering { - js.setConsumerAssignmentRecovering(ca) - } + js.setConsumerAssignmentRecovering(ca) js.processConsumerRemoval(ca) } for _, ca := range caAdd { - if isRecovering { - js.setConsumerAssignmentRecovering(ca) - } + js.setConsumerAssignmentRecovering(ca) js.processConsumerAssignment(ca) } @@ -1335,6 +1336,7 @@ func (js *jetStream) setStreamAssignmentRecovering(sa *streamAssignment) { js.mu.Lock() defer js.mu.Unlock() sa.responded = true + sa.recovering = true sa.Restore = nil if sa.Group != nil { sa.Group.Preferred = _EMPTY_ @@ -1346,6 +1348,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) { js.mu.Lock() defer js.mu.Unlock() ca.responded = true + ca.recovering = true if ca.Group != nil { ca.Group.Preferred = _EMPTY_ } @@ -2977,6 +2980,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss storage, cfg := sa.Config.Storage, sa.Config hasResponded := sa.responded sa.responded = true + recovering := sa.recovering js.mu.Unlock() mset, err := acc.lookupStream(cfg.Name) @@ -3002,7 +3006,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss js.mu.Unlock() } // Call update. - if err = mset.update(cfg); err != nil { + if err = mset.updateWithAdvisory(cfg, !recovering); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err) } // Set the new stream assignment. @@ -3036,9 +3040,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss return } - mset.mu.RLock() - isLeader := mset.isLeader() - mset.mu.RUnlock() + isLeader := mset.IsLeader() // Check for missing syncSubject bug. if isLeader && osa != nil && osa.Sync == _EMPTY_ { @@ -3054,7 +3056,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss } // Check if we should bail. - if !isLeader || hasResponded { + if !isLeader || hasResponded || recovering { return } @@ -3114,19 +3116,21 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme if sa.Group.Name == osa.Group.Name && reflect.DeepEqual(sa.Group.Peers, osa.Group.Peers) { // Since this already exists we know it succeeded, just respond to this caller. js.mu.RLock() - client, subject, reply := sa.Client, sa.Subject, sa.Reply + client, subject, reply, recovering := sa.Client, sa.Subject, sa.Reply, sa.recovering js.mu.RUnlock() - var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} - resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + if !recovering { + var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} + resp.StreamInfo = &StreamInfo{ + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + } + s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } - s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) return } else { // We had a bug where we could have multiple assignments for the same @@ -3342,6 +3346,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() } + recovering := sa.recovering js.mu.RUnlock() stopped := false @@ -3400,7 +3405,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, } // Do not respond if the account does not exist any longer - if acc == nil { + if acc == nil || recovering { return } @@ -3653,7 +3658,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // This consumer exists. // Only update if config is really different. cfg := o.config() - if !reflect.DeepEqual(&cfg, ca.Config) { + if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate { // Call into update, ignore consumer exists error here since this means an old deliver subject is bound // which can happen on restart etc. if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() { @@ -3693,7 +3698,9 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // If we look like we are scaling up, let's send our current state to the group. sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil // Signal that this is an update - isConfigUpdate = true + if ca.Reply != _EMPTY_ { + isConfigUpdate = true + } } js.mu.RUnlock() @@ -3794,13 +3801,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // For existing consumer, only send response if not recovering. if wasExisting && !js.isMetaRecovering() { if o.IsLeader() || (!didCreate && needsLocalResponse) { - // Process if existing as an update. + // Process if existing as an update. Double check that this is not recovered. js.mu.RLock() - client, subject, reply := ca.Client, ca.Subject, ca.Reply + client, subject, reply, recovering := ca.Client, ca.Subject, ca.Reply, ca.recovering js.mu.RUnlock() - var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - resp.ConsumerInfo = o.info() - s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + if !recovering { + var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} + resp.ConsumerInfo = o.info() + s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + } } } } @@ -3819,6 +3828,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() } + recovering := ca.recovering js.mu.RUnlock() stopped := false @@ -3856,8 +3866,8 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb } } - // Do not respond if the account does not exist any longer - if acc == nil { + // Do not respond if the account does not exist any longer or this is during recovery. + if acc == nil || recovering { return } diff --git a/server/norace_test.go b/server/norace_test.go index 29936184..8bbb719b 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6408,3 +6408,116 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) { t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold) } } + +func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "GHOST", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + for j := 0; j < 10; j++ { + require_NoError(t, nc.Publish(fmt.Sprintf("events.%d.%d", i, j), []byte(`test`))) + } + } + + fetch := func(id int) { + subject := fmt.Sprintf("events.%d.*", id) + subscription, err := js.PullSubscribe(subject, + _EMPTY_, // ephemeral consumer + nats.DeliverAll(), + nats.ReplayInstant(), + nats.BindStream("TEST"), + nats.ConsumerReplicas(1), + nats.ConsumerMemoryStorage(), + ) + if err != nil { + return + } + defer subscription.Unsubscribe() + + info, err := subscription.ConsumerInfo() + if err != nil { + return + } + + subscription.Fetch(int(info.NumPending)) + } + + replay := func(ctx context.Context, id int) { + for { + select { + case <-ctx.Done(): + return + default: + fetch(id) + } + } + } + + ctx, cancel := context.WithCancel(context.Background()) + + go replay(ctx, 0) + go replay(ctx, 1) + go replay(ctx, 2) + go replay(ctx, 3) + go replay(ctx, 4) + go replay(ctx, 5) + go replay(ctx, 6) + go replay(ctx, 7) + go replay(ctx, 8) + go replay(ctx, 9) + + time.Sleep(5 * time.Second) + + for _, server := range c.servers { + server.Shutdown() + restarted := c.restartServer(server) + checkFor(t, time.Second, 200*time.Millisecond, func() error { + hs := restarted.healthz(&HealthzOptions{ + JSEnabled: true, + JSServerOnly: true, + }) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + c.waitOnStreamLeader(globalAccountName, "TEST") + time.Sleep(time.Second * 2) + go replay(ctx, 5) + go replay(ctx, 6) + go replay(ctx, 7) + go replay(ctx, 8) + go replay(ctx, 9) + } + + time.Sleep(5 * time.Second) + cancel() + + getMissing := func() []string { + m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10) + require_NoError(t, err) + + var resp JSApiConsumerListResponse + err = json.Unmarshal(m.Data, &resp) + require_NoError(t, err) + return resp.Missing + } + + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { + missing := getMissing() + if len(missing) == 0 { + return nil + } + return fmt.Errorf("Still have missing: %+v", missing) + }) +} From 45859e64760cdb1fdae06acfd8dd5784025aeba1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 23 Feb 2023 12:15:57 -0800 Subject: [PATCH 3/5] Make sure preferred peer for stepdown is healthy. Signed-off-by: Derek Collison --- server/jetstream_cluster_2_test.go | 4 +-- server/raft.go | 46 +++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 2b38c45f..bd530ca1 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -199,10 +199,10 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 250*time.Millisecond, func() error { si, _ := js2.StreamInfo("TEST") if si == nil || si.Cluster == nil { - t.Fatalf("Did not get stream info") + return fmt.Errorf("No stream info or cluster") } for _, pi := range si.Cluster.Replicas { if !pi.Current { diff --git a/server/raft.go b/server/raft.go index 97b2f00e..017f8947 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1281,7 +1281,6 @@ func (n *raft) StepDown(preferred ...string) error { n.debug("Being asked to stepdown") // See if we have up to date followers. - nowts := time.Now().UnixNano() maybeLeader := noLeader if len(preferred) > 0 { if preferred[0] != _EMPTY_ { @@ -1290,21 +1289,42 @@ func (n *raft) StepDown(preferred ...string) error { preferred = nil } } + // Can't pick ourselves. + if maybeLeader == n.id { + maybeLeader = noLeader + preferred = nil + } - for peer, ps := range n.peers { - // If not us and alive and caughtup. - if peer != n.id && (nowts-ps.ts) < int64(hbInterval*3) { - if maybeLeader != noLeader && maybeLeader != peer { - continue - } - if si, ok := n.s.nodeToInfo.Load(peer); !ok || si.(nodeInfo).offline { - continue - } - n.debug("Looking at %q which is %v behind", peer, time.Duration(nowts-ps.ts)) - maybeLeader = peer - break + nowts := time.Now().UnixNano() + + // If we have a preferred check it first. + if maybeLeader != noLeader { + var isHealthy bool + if ps, ok := n.peers[maybeLeader]; ok { + si, ok := n.s.nodeToInfo.Load(maybeLeader) + isHealthy = ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3) + } + if !isHealthy { + maybeLeader = noLeader } } + + // If we do not have a preferred at this point pick the first healthy one. + // Make sure not ourselves. + if maybeLeader == noLeader { + for peer, ps := range n.peers { + if peer == n.id { + continue + } + si, ok := n.s.nodeToInfo.Load(peer) + isHealthy := ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3) + if isHealthy { + maybeLeader = peer + break + } + } + } + stepdown := n.stepdown n.Unlock() From 02ba78454daac24352ae70d370cfa7705ecbbffc Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 9 Feb 2023 23:38:33 +0100 Subject: [PATCH 4/5] Fix new replicas late MaxAge expiry This commit fixes the issue when scaling Stream with MaxAge and some older messages stored. Until now, old messages were not properly expired on new replicas, because new replicas first expiry timer was set to MaxAge duration. This commit adds a check if received messages expiry happens before MaxAge, meaning they're messages older than the replica. https://github.com/nats-io/nats-server/issues/3848 Signed-off-by: Tomasz Pietrek --- server/filestore.go | 49 +++++++++------- server/jetstream_cluster_3_test.go | 90 ++++++++++++++++++++++++++++++ server/memstore.go | 44 +++++++++++---- 3 files changed, 153 insertions(+), 30 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 53cede18..f5af9eec 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -104,26 +104,27 @@ type psi struct { } type fileStore struct { - mu sync.RWMutex - state StreamState - ld *LostStreamData - scb StorageUpdateHandler - ageChk *time.Timer - syncTmr *time.Timer - cfg FileStreamInfo - fcfg FileStoreConfig - prf keyGen - aek cipher.AEAD - lmb *msgBlock - blks []*msgBlock - bim map[uint32]*msgBlock - psim map[string]*psi - hh hash.Hash64 - qch chan struct{} - cfs []ConsumerStore - sips int - closed bool - fip bool + mu sync.RWMutex + state StreamState + ld *LostStreamData + scb StorageUpdateHandler + ageChk *time.Timer + syncTmr *time.Timer + cfg FileStreamInfo + fcfg FileStoreConfig + prf keyGen + aek cipher.AEAD + lmb *msgBlock + blks []*msgBlock + bim map[uint32]*msgBlock + psim map[string]*psi + hh hash.Hash64 + qch chan struct{} + cfs []ConsumerStore + sips int + closed bool + fip bool + receivedAny bool } // Represents a message store block and its data. @@ -2100,6 +2101,14 @@ func (fs *fileStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts in fs.mu.Lock() err := fs.storeRawMsg(subj, hdr, msg, seq, ts) cb := fs.scb + // Check if first message timestamp requires expiry + // sooner than initial replica expiry timer set to MaxAge when initializing. + if !fs.receivedAny && fs.cfg.MaxAge != 0 && ts > 0 { + fs.receivedAny = true + // don't block here by calling expireMsgs directly. + // Instead, set short timeout. + fs.resetAgeChk(int64(time.Millisecond * 50)) + } fs.mu.Unlock() if err == nil && cb != nil { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4646bc79..4b6ec601 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2835,3 +2835,93 @@ func TestJetStreamClusterWALBuildupOnNoOpPull(t *testing.T) { t.Fatalf("got %d entries, expected less than %d entries", entries, max) } } + +// Found in https://github.com/nats-io/nats-server/issues/3848 +// When Max Age was specified and stream was scaled up, new replicas +// were expiring messages much later than the leader. +func TestJetStreamClusterStreamMaxAgeScaleUp(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for _, test := range []struct { + name string + storage nats.StorageType + stream string + purge bool + }{ + {name: "file", storage: nats.FileStorage, stream: "A", purge: false}, + {name: "memory", storage: nats.MemoryStorage, stream: "B", purge: false}, + {name: "file with purge", storage: nats.FileStorage, stream: "C", purge: true}, + {name: "memory with purge", storage: nats.MemoryStorage, stream: "D", purge: true}, + } { + + t.Run(test.name, func(t *testing.T) { + ttl := time.Second * 5 + // Add stream with one replica and short MaxAge. + _, err := js.AddStream(&nats.StreamConfig{ + Name: test.stream, + Replicas: 1, + Subjects: []string{test.stream}, + MaxAge: ttl, + Storage: test.storage, + }) + require_NoError(t, err) + + // Add some messages. + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, test.stream, "HELLO") + } + // We need to also test if we properly set expiry + // if first sequence is not 1. + if test.purge { + err = js.PurgeStream(test.stream) + require_NoError(t, err) + // Add some messages. + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, test.stream, "HELLO") + } + } + // Mark the time when all messages were published. + start := time.Now() + + // Sleep for half of the MaxAge time. + time.Sleep(ttl / 2) + + // Scale up the Stream to 3 replicas. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: test.stream, + Replicas: 3, + Subjects: []string{test.stream}, + MaxAge: ttl, + Storage: test.storage, + }) + require_NoError(t, err) + + // All messages should still be there. + info, err := js.StreamInfo(test.stream) + require_NoError(t, err) + require_True(t, info.State.Msgs == 10) + + // Wait until MaxAge is reached. + time.Sleep(ttl - time.Since(start) + (10 * time.Millisecond)) + + // Check if all messages are expired. + info, err = js.StreamInfo(test.stream) + require_NoError(t, err) + require_True(t, info.State.Msgs == 0) + + // Now switch leader to one of replicas + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, test.stream), nil, time.Second) + require_NoError(t, err) + c.waitOnStreamLeader("$G", test.stream) + + // and make sure that it also expired all messages + info, err = js.StreamInfo(test.stream) + require_NoError(t, err) + require_True(t, info.State.Msgs == 0) + }) + } +} diff --git a/server/memstore.go b/server/memstore.go index 921c1df1..ebcd8085 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -23,15 +23,16 @@ import ( // TODO(dlc) - This is a fairly simplistic approach but should do for now. type memStore struct { - mu sync.RWMutex - cfg StreamConfig - state StreamState - msgs map[uint64]*StoreMsg - fss map[string]*SimpleState - maxp int64 - scb StorageUpdateHandler - ageChk *time.Timer - consumers int + mu sync.RWMutex + cfg StreamConfig + state StreamState + msgs map[uint64]*StoreMsg + fss map[string]*SimpleState + maxp int64 + scb StorageUpdateHandler + ageChk *time.Timer + consumers int + receivedAny bool } func newMemStore(cfg *StreamConfig) (*memStore, error) { @@ -190,7 +191,6 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int if ms.ageChk == nil && ms.cfg.MaxAge != 0 { ms.startAgeChk() } - return nil } @@ -199,6 +199,13 @@ func (ms *memStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts int ms.mu.Lock() err := ms.storeRawMsg(subj, hdr, msg, seq, ts) cb := ms.scb + // Check if first message timestamp requires expiry + // sooner than initial replica expiry timer set to MaxAge when initializing. + if !ms.receivedAny && ms.cfg.MaxAge != 0 && ts > 0 { + ms.receivedAny = true + // Calculate duration when the next expireMsgs should be called. + ms.resetAgeChk(int64(time.Millisecond) * 50) + } ms.mu.Unlock() if err == nil && cb != nil { @@ -411,6 +418,23 @@ func (ms *memStore) startAgeChk() { } } +// Lock should be held. +func (ms *memStore) resetAgeChk(delta int64) { + if ms.cfg.MaxAge == 0 { + return + } + + fireIn := ms.cfg.MaxAge + if delta > 0 && time.Duration(delta) < fireIn { + fireIn = time.Duration(delta) + } + if ms.ageChk != nil { + ms.ageChk.Reset(fireIn) + } else { + ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs) + } +} + // Will expire msgs that are too old. func (ms *memStore) expireMsgs() { ms.mu.Lock() From ea2bfad8ea5ed3b96db8b31f903d820627973589 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 23 Feb 2023 22:19:37 -0800 Subject: [PATCH 5/5] Fixed bug where snapshot would not compact through applied. This mean a subsequent request for exactly applied would return that entry only not the full state snapshot. Fixed bug where we would not snapshot when we should. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 17 +++++------------ server/raft.go | 7 ++++--- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 99e98786..ed2e6d5f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -999,21 +999,19 @@ func (js *jetStream) monitorCluster() { js.setMetaRecovering() // Snapshotting function. - doSnapshot := func() []byte { + doSnapshot := func() { // Suppress during recovery. if js.isMetaRecovering() { - return nil + return } snap := js.metaSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - return snap } else { s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } - return nil } ru := &recoveryUpdates{ @@ -1088,16 +1086,11 @@ func (js *jetStream) monitorCluster() { if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) - // Install a snapshot as we become leader. We will also send to the cluster. - if snap := doSnapshot(); snap != nil { - // If we are caught up distribute our current state to followers. - if ne, _ := n.Size(); ne == 0 { - // Send our snapshot to others to make sure all in sync. - n.SendSnapshot(snap) - } - } + // Optionally install a snapshot as we become leader. + doSnapshot() js.checkClusterSize() } + case <-t.C: doSnapshot() // Periodically check the cluster size. diff --git a/server/raft.go b/server/raft.go index 017f8947..eda9bd40 100644 --- a/server/raft.go +++ b/server/raft.go @@ -939,9 +939,9 @@ func (n *raft) InstallSnapshot(data []byte) error { var state StreamState n.wal.FastState(&state) - if state.FirstSeq >= n.applied { + if n.applied == 0 || len(data) == 0 { n.Unlock() - return nil + return errNoSnapAvailable } n.debug("Installing snapshot of %d bytes", len(data)) @@ -976,7 +976,7 @@ func (n *raft) InstallSnapshot(data []byte) error { // Remember our latest snapshot file. n.snapfile = sfile - if _, err := n.wal.Compact(snap.lastIndex); err != nil { + if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) n.Unlock() return err @@ -1289,6 +1289,7 @@ func (n *raft) StepDown(preferred ...string) error { preferred = nil } } + // Can't pick ourselves. if maybeLeader == n.id { maybeLeader = noLeader