diff --git a/server/jetstream.go b/server/jetstream.go index a225834d..b45dced7 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1215,22 +1215,23 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // Check if we are encrypted. keyFile := filepath.Join(mdir, JetStreamMetaFileKey) - if key, err := os.ReadFile(keyFile); err == nil { + keyBuf, err := os.ReadFile(keyFile) + if err == nil { s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") - if len(key) < minMetaKeySize { - s.Warnf(" Bad stream encryption key length of %d", len(key)) + if len(keyBuf) < minMetaKeySize { + s.Warnf(" Bad stream encryption key length of %d", len(keyBuf)) continue } // Decode the buffer before proceeding. - nbuf, err := s.decryptMeta(sc, key, buf, a.Name, fi.Name()) + nbuf, err := s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) if err != nil { // See if we are changing ciphers. switch sc { case ChaCha: - nbuf, err = s.decryptMeta(AES, key, buf, a.Name, fi.Name()) + nbuf, err = s.decryptMeta(AES, keyBuf, buf, a.Name, fi.Name()) osc, convertingCiphers = AES, true case AES: - nbuf, err = s.decryptMeta(ChaCha, key, buf, a.Name, fi.Name()) + nbuf, err = s.decryptMeta(ChaCha, keyBuf, buf, a.Name, fi.Name()) osc, convertingCiphers = ChaCha, true } if err != nil { @@ -1240,9 +1241,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } buf = nbuf plaintext = false - - // Remove the key file to have system regenerate with the new cipher. - os.Remove(keyFile) } var cfg FileStreamInfo @@ -1294,6 +1292,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Noticef(" Encrypting stream '%s > %s'", a.Name, cfg.StreamConfig.Name) } else if convertingCiphers { s.Noticef(" Converting from %s to %s for stream '%s > %s'", osc, sc, a.Name, cfg.StreamConfig.Name) + // Remove the key file to have system regenerate with the new cipher. + os.Remove(keyFile) } } @@ -1301,6 +1301,13 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro mset, err := a.addStream(&cfg.StreamConfig) if err != nil { s.Warnf(" Error recreating stream %q: %v", cfg.Name, err) + // If we removed a keyfile from above make sure to put it back. + if convertingCiphers { + err := os.WriteFile(keyFile, keyBuf, defaultFilePerms) + if err != nil { + s.Warnf(" Error replacing meta keyfile for stream %q: %v", cfg.Name, err) + } + } continue } if !cfg.Created.IsZero() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b29217bf..58d87fa8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2808,12 +2808,12 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco panic(err.Error()) } // Ignore if we are recovering and we have already processed. - if isRecovering { - if mset.state().FirstSeq <= sp.LastSeq { - // Make sure all messages from the purge are gone. - mset.store.Compact(sp.LastSeq + 1) + if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) { + if sp.Request == nil { + sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq} + } else { + sp.Request.Sequence = sp.LastSeq } - continue } s := js.server() @@ -3402,6 +3402,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme s, rg := js.srv, sa.Group alreadyRunning := rg.node != nil storage := sa.Config.Storage + restore := sa.Restore js.mu.RUnlock() // Process the raft group and make sure it's running if needed. @@ -3410,11 +3411,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // If we are restoring, create the stream if we are R>1 and not the preferred who handles the // receipt of the snapshot itself. shouldCreate := true - if sa.Restore != nil { + if restore != nil { if len(rg.Peers) == 1 || rg.node != nil && rg.node.ID() == rg.Preferred { shouldCreate = false } else { + js.mu.Lock() sa.Restore = nil + js.mu.Unlock() } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index d5109ce8..78853cfc 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4280,3 +4280,103 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) { // Each cluster hop that has the export/import mapping will add another T message copy. checkSubsPending(t, tsub, num*4) } + +// https://github.com/nats-io/nats-server/pull/4197 +func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "P3F", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"TEST.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "TEST.0", "OK") + sendStreamMsg(t, nc, "TEST.1", "OK") + sendStreamMsg(t, nc, "TEST.2", "OK") + + runTest := func(f func(js nats.JetStreamManager)) *nats.StreamInfo { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // install snapshot, then execute interior func, ensuring the purge will be recovered later + fsl := c.streamLeader(globalAccountName, "TEST") + fsl.JetStreamSnapshotStream(globalAccountName, "TEST") + + f(js) + time.Sleep(250 * time.Millisecond) + + fsl.Shutdown() + fsl.WaitForShutdown() + fsl = c.restartServer(fsl) + c.waitOnServerCurrent(fsl) + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + c.waitOnStreamLeader(globalAccountName, "TEST") + sl := c.streamLeader(globalAccountName, "TEST") + + // keep stepping down so the stream leader matches the initial leader + // we need to check if it restored from the snapshot properly + for sl != fsl { + _, err := nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + sl = c.streamLeader(globalAccountName, "TEST") + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + return si + } + si := runTest(func(js nats.JetStreamManager) { + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.0"}) + require_NoError(t, err) + }) + if si.State.Msgs != 2 { + t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs) + } + if si.State.FirstSeq != 2 || si.State.LastSeq != 3 { + t.Fatalf("Expected FirstSeq=2, LastSeq=3 after restart, got FirstSeq=%d, LastSeq=%d", + si.State.FirstSeq, si.State.LastSeq) + } + + si = runTest(func(js nats.JetStreamManager) { + err = js.PurgeStream("TEST") + require_NoError(t, err) + // Send 2 more messages. + sendStreamMsg(t, nc, "TEST.1", "OK") + sendStreamMsg(t, nc, "TEST.2", "OK") + }) + if si.State.Msgs != 2 { + t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs) + } + if si.State.FirstSeq != 4 || si.State.LastSeq != 5 { + t.Fatalf("Expected FirstSeq=4, LastSeq=5 after restart, got FirstSeq=%d, LastSeq=%d", + si.State.FirstSeq, si.State.LastSeq) + } + + // Now test a keep + si = runTest(func(js nats.JetStreamManager) { + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1}) + require_NoError(t, err) + // Send 3 more messages. + sendStreamMsg(t, nc, "TEST.1", "OK") + sendStreamMsg(t, nc, "TEST.2", "OK") + sendStreamMsg(t, nc, "TEST.3", "OK") + }) + if si.State.Msgs != 4 { + t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs) + } + if si.State.FirstSeq != 5 || si.State.LastSeq != 8 { + t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d", + si.State.FirstSeq, si.State.LastSeq) + } +} diff --git a/server/raft.go b/server/raft.go index fbfa6865..5d387328 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1816,9 +1816,11 @@ func (n *raft) runAsFollower() { } else if n.isCatchingUp() { n.debug("Not switching to candidate, catching up") // Check to see if our catchup has stalled. + n.Lock() if n.catchupStalled() { n.cancelCatchup() } + n.Unlock() } else { n.switchToCandidate() return diff --git a/server/stream.go b/server/stream.go index 7d4cb8a7..c59ae45e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4493,7 +4493,9 @@ func (mset *stream) resetAndWaitOnConsumers() { } node.Delete() } - o.monitorWg.Wait() + if o.isMonitorRunning() { + o.monitorWg.Wait() + } } }