From d5ae96f54d6b0fc2c5cf4357e316f8c1f1939e35 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 3 Jun 2023 11:09:42 -0700 Subject: [PATCH 1/4] When a server was killed on restart before an encrypted stream was recovered the keyfile was removed and could cause the stream to not be recoverable. We only needed to delete the key file when converting ciphers and right before we add the stream itself. Signed-off-by: Derek Collison --- server/jetstream.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 0c0381ce..3ca32c3a 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1234,9 +1234,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } buf = nbuf plaintext = false - - // Remove the key file to have system regenerate with the new cipher. - os.Remove(keyFile) } var cfg FileStreamInfo @@ -1288,6 +1285,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Noticef(" Encrypting stream '%s > %s'", a.Name, cfg.StreamConfig.Name) } else if convertingCiphers { s.Noticef(" Converting from %s to %s for stream '%s > %s'", osc, sc, a.Name, cfg.StreamConfig.Name) + // Remove the key file to have system regenerate with the new cipher. + os.Remove(keyFile) } } From 4c1b93d0232096c709861f5a1018cbf3744547d3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 3 Jun 2023 11:15:06 -0700 Subject: [PATCH 2/4] Make sure to put the keyfile back if we did not recover the stream. Signed-off-by: Derek Collison --- server/jetstream.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 3ca32c3a..8841f4ed 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1209,22 +1209,23 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // Check if we are encrypted. keyFile := filepath.Join(mdir, JetStreamMetaFileKey) - if key, err := os.ReadFile(keyFile); err == nil { + keyBuf, err := os.ReadFile(keyFile) + if err == nil { s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") - if len(key) < minMetaKeySize { - s.Warnf(" Bad stream encryption key length of %d", len(key)) + if len(keyBuf) < minMetaKeySize { + s.Warnf(" Bad stream encryption key length of %d", len(keyBuf)) continue } // Decode the buffer before proceeding. - nbuf, err := s.decryptMeta(sc, key, buf, a.Name, fi.Name()) + nbuf, err := s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) if err != nil { // See if we are changing ciphers. switch sc { case ChaCha: - nbuf, err = s.decryptMeta(AES, key, buf, a.Name, fi.Name()) + nbuf, err = s.decryptMeta(AES, keyBuf, buf, a.Name, fi.Name()) osc, convertingCiphers = AES, true case AES: - nbuf, err = s.decryptMeta(ChaCha, key, buf, a.Name, fi.Name()) + nbuf, err = s.decryptMeta(ChaCha, keyBuf, buf, a.Name, fi.Name()) osc, convertingCiphers = ChaCha, true } if err != nil { @@ -1294,6 +1295,13 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro mset, err := a.addStream(&cfg.StreamConfig) if err != nil { s.Warnf(" Error recreating stream %q: %v", cfg.Name, err) + // If we removed a keyfile from above make sure to put it back. + if convertingCiphers { + err := os.WriteFile(keyFile, keyBuf, defaultFilePerms) + if err != nil { + s.Warnf(" Error replacing meta keyfile for stream %q: %v", cfg.Name, err) + } + } continue } if !cfg.Created.IsZero() { From 238282d97468171b0b8e4d69f0535bbeda36359f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 3 Jun 2023 13:58:15 -0700 Subject: [PATCH 3/4] Fix some data races detected in internal testing Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 5 ++++- server/raft.go | 2 ++ server/stream.go | 4 +++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 035f8658..13aad33b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3400,6 +3400,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme s, rg := js.srv, sa.Group alreadyRunning := rg.node != nil storage := sa.Config.Storage + restore := sa.Restore js.mu.RUnlock() // Process the raft group and make sure it's running if needed. @@ -3408,11 +3409,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // If we are restoring, create the stream if we are R>1 and not the preferred who handles the // receipt of the snapshot itself. shouldCreate := true - if sa.Restore != nil { + if restore != nil { if len(rg.Peers) == 1 || rg.node != nil && rg.node.ID() == rg.Preferred { shouldCreate = false } else { + js.mu.Lock() sa.Restore = nil + js.mu.Unlock() } } diff --git a/server/raft.go b/server/raft.go index 1d6b2d62..3e1088d9 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1816,9 +1816,11 @@ func (n *raft) runAsFollower() { } else if n.isCatchingUp() { n.debug("Not switching to candidate, catching up") // Check to see if our catchup has stalled. + n.Lock() if n.catchupStalled() { n.cancelCatchup() } + n.Unlock() } else { n.switchToCandidate() return diff --git a/server/stream.go b/server/stream.go index 9a3355d7..f841fcaa 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4490,7 +4490,9 @@ func (mset *stream) resetAndWaitOnConsumers() { } node.Delete() } - o.monitorWg.Wait() + if o.isMonitorRunning() { + o.monitorWg.Wait() + } } } From dee532495d58cd3d2bf5685e6a12318eaad5a7f3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 3 Jun 2023 17:49:45 -0700 Subject: [PATCH 4/4] Make sure to process extended purge operations correctly when being replayed on a restart. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 10 +-- server/jetstream_cluster_3_test.go | 100 +++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 13aad33b..7ebec4ba 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2808,12 +2808,12 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco panic(err.Error()) } // Ignore if we are recovering and we have already processed. - if isRecovering { - if mset.state().FirstSeq <= sp.LastSeq { - // Make sure all messages from the purge are gone. - mset.store.Compact(sp.LastSeq + 1) + if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) { + if sp.Request == nil { + sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq} + } else { + sp.Request.Sequence = sp.LastSeq } - continue } s := js.server() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index dfe75bb1..df109c33 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4280,3 +4280,103 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) { // Each cluster hop that has the export/import mapping will add another T message copy. checkSubsPending(t, tsub, num*4) } + +// https://github.com/nats-io/nats-server/pull/4197 +func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "P3F", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"TEST.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "TEST.0", "OK") + sendStreamMsg(t, nc, "TEST.1", "OK") + sendStreamMsg(t, nc, "TEST.2", "OK") + + runTest := func(f func(js nats.JetStreamManager)) *nats.StreamInfo { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // install snapshot, then execute interior func, ensuring the purge will be recovered later + fsl := c.streamLeader(globalAccountName, "TEST") + fsl.JetStreamSnapshotStream(globalAccountName, "TEST") + + f(js) + time.Sleep(250 * time.Millisecond) + + fsl.Shutdown() + fsl.WaitForShutdown() + fsl = c.restartServer(fsl) + c.waitOnServerCurrent(fsl) + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + c.waitOnStreamLeader(globalAccountName, "TEST") + sl := c.streamLeader(globalAccountName, "TEST") + + // keep stepping down so the stream leader matches the initial leader + // we need to check if it restored from the snapshot properly + for sl != fsl { + _, err := nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + sl = c.streamLeader(globalAccountName, "TEST") + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + return si + } + si := runTest(func(js nats.JetStreamManager) { + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.0"}) + require_NoError(t, err) + }) + if si.State.Msgs != 2 { + t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs) + } + if si.State.FirstSeq != 2 || si.State.LastSeq != 3 { + t.Fatalf("Expected FirstSeq=2, LastSeq=3 after restart, got FirstSeq=%d, LastSeq=%d", + si.State.FirstSeq, si.State.LastSeq) + } + + si = runTest(func(js nats.JetStreamManager) { + err = js.PurgeStream("TEST") + require_NoError(t, err) + // Send 2 more messages. + sendStreamMsg(t, nc, "TEST.1", "OK") + sendStreamMsg(t, nc, "TEST.2", "OK") + }) + if si.State.Msgs != 2 { + t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs) + } + if si.State.FirstSeq != 4 || si.State.LastSeq != 5 { + t.Fatalf("Expected FirstSeq=4, LastSeq=5 after restart, got FirstSeq=%d, LastSeq=%d", + si.State.FirstSeq, si.State.LastSeq) + } + + // Now test a keep + si = runTest(func(js nats.JetStreamManager) { + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1}) + require_NoError(t, err) + // Send 3 more messages. + sendStreamMsg(t, nc, "TEST.1", "OK") + sendStreamMsg(t, nc, "TEST.2", "OK") + sendStreamMsg(t, nc, "TEST.3", "OK") + }) + if si.State.Msgs != 4 { + t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs) + } + if si.State.FirstSeq != 5 || si.State.LastSeq != 8 { + t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d", + si.State.FirstSeq, si.State.LastSeq) + } +}