diff --git a/server/filestore.go b/server/filestore.go index 001a9443..21de257e 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -165,6 +165,7 @@ type fileStore struct { cfg FileStreamInfo fcfg FileStoreConfig prf keyGen + oldprf keyGen aek cipher.AEAD lmb *msgBlock blks []*msgBlock @@ -332,10 +333,10 @@ const ( ) func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { - return newFileStoreWithCreated(fcfg, cfg, time.Now().UTC(), nil) + return newFileStoreWithCreated(fcfg, cfg, time.Now().UTC(), nil, nil) } -func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time, prf keyGen) (*fileStore, error) { +func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time, prf, oldprf keyGen) (*fileStore, error) { if cfg.Name == _EMPTY_ { return nil, fmt.Errorf("name required") } @@ -375,12 +376,13 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim dios <- struct{}{} fs := &fileStore{ - fcfg: fcfg, - psim: make(map[string]*psi), - bim: make(map[uint32]*msgBlock), - cfg: FileStreamInfo{Created: created, StreamConfig: cfg}, - prf: prf, - qch: make(chan struct{}), + fcfg: fcfg, + psim: make(map[string]*psi), + bim: make(map[uint32]*msgBlock), + cfg: FileStreamInfo{Created: created, StreamConfig: cfg}, + prf: prf, + oldprf: oldprf, + qch: make(chan struct{}), } // Set flush in place to AsyncFlush which by default is false. @@ -948,52 +950,67 @@ func (mb *msgBlock) convertCipher() error { if len(ekey) < minBlkKeySize { return errBadKeySize } - // Recover key encryption key. - rb, err := fs.prf([]byte(fmt.Sprintf("%s:%d", fs.cfg.Name, mb.index))) - if err != nil { - return err + type prfWithCipher struct { + keyGen + StoreCipher } - kek, err := genEncryptionKey(osc, rb) - if err != nil { - return err + var prfs []prfWithCipher + if fs.prf != nil { + prfs = append(prfs, prfWithCipher{fs.prf, sc}) + prfs = append(prfs, prfWithCipher{fs.prf, osc}) } - ns := kek.NonceSize() - seed, err := kek.Open(nil, ekey[:ns], ekey[ns:], nil) - if err != nil { - return err - } - nonce := ekey[:ns] - - bek, err := genBlockEncryptionKey(osc, seed, nonce) - if err != nil { - return err + if fs.oldprf != nil { + prfs = append(prfs, prfWithCipher{fs.oldprf, sc}) + prfs = append(prfs, prfWithCipher{fs.oldprf, osc}) } - buf, _ := mb.loadBlock(nil) - bek.XORKeyStream(buf, buf) - // Make sure we can parse with old cipher and key file. - if err = mb.indexCacheBuf(buf); err != nil { - return err - } - // Reset the cache since we just read everything in. - mb.cache = nil + for _, prf := range prfs { + // Recover key encryption key. + rb, err := prf.keyGen([]byte(fmt.Sprintf("%s:%d", fs.cfg.Name, mb.index))) + if err != nil { + continue + } + kek, err := genEncryptionKey(prf.StoreCipher, rb) + if err != nil { + continue + } + ns := kek.NonceSize() + seed, err := kek.Open(nil, ekey[:ns], ekey[ns:], nil) + if err != nil { + continue + } + nonce := ekey[:ns] + bek, err := genBlockEncryptionKey(prf.StoreCipher, seed, nonce) + if err != nil { + return err + } - // Generate new keys based on our - if err := fs.genEncryptionKeysForBlock(mb); err != nil { - // Put the old keyfile back. - keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)) - os.WriteFile(keyFile, ekey, defaultFilePerms) - return err - } - mb.bek.XORKeyStream(buf, buf) - if err := os.WriteFile(mb.mfn, buf, defaultFilePerms); err != nil { - return err - } - // If we are here we want to delete other meta, e.g. idx, fss. - os.Remove(mb.ifn) - os.Remove(mb.sfn) + buf, _ := mb.loadBlock(nil) + bek.XORKeyStream(buf, buf) + // Make sure we can parse with old cipher and key file. + if err = mb.indexCacheBuf(buf); err != nil { + return err + } + // Reset the cache since we just read everything in. + mb.cache = nil - return nil + // Generate new keys. If we error for some reason then we will put + // the old keyfile back. + if err := fs.genEncryptionKeysForBlock(mb); err != nil { + keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)) + os.WriteFile(keyFile, ekey, defaultFilePerms) + return err + } + mb.bek.XORKeyStream(buf, buf) + if err := os.WriteFile(mb.mfn, buf, defaultFilePerms); err != nil { + return err + } + // If we are here we want to delete other meta, e.g. idx, fss. + os.Remove(mb.ifn) + os.Remove(mb.sfn) + return nil + } + return fmt.Errorf("unable to recover keys") } // Convert a plaintext block to encrypted. diff --git a/server/filestore_test.go b/server/filestore_test.go index 8444ff35..de75caf8 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -820,7 +820,7 @@ func TestFileStoreCompact(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -865,7 +865,7 @@ func TestFileStoreCompact(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1019,7 +1019,7 @@ func TestFileStoreStreamTruncate(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1083,7 +1083,7 @@ func TestFileStoreStreamTruncate(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3489,7 +3489,7 @@ func TestFileStoreSparseCompaction(t *testing.T) { prf = nil } - fs, err = newFileStoreWithCreated(fcfg, cfg, time.Now(), prf) + fs, err = newFileStoreWithCreated(fcfg, cfg, time.Now(), prf, nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3814,7 +3814,7 @@ func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { fcfg, StreamConfig{Name: "TEST", Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -3844,7 +3844,7 @@ func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { fcfg, StreamConfig{Name: "TEST", Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -4072,7 +4072,7 @@ func TestFileStoreShortIndexWriteBug(t *testing.T) { fcfg, StreamConfig{Name: "TEST", Storage: FileStorage, MaxAge: time.Second}, created, - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -4108,7 +4108,7 @@ func TestFileStoreShortIndexWriteBug(t *testing.T) { fcfg, StreamConfig{Name: "TEST", Storage: FileStorage, MaxAge: time.Second}, created, - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -4136,7 +4136,7 @@ func TestFileStoreDoubleCompactWithWriteInBetweenEncryptedBug(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -4188,7 +4188,7 @@ func TestFileStoreEncryptedKeepIndexNeedBekResetBug(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: ttl}, time.Now(), - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -4478,7 +4478,7 @@ func TestFileStoreEncrypted(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -4505,7 +4505,7 @@ func TestFileStoreEncrypted(t *testing.T) { fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), - prf, + prf, nil, ) require_NoError(t, err) defer fs.Stop() @@ -5622,7 +5622,7 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) { fs, err := newFileStoreWithCreated( fcfg, scfg, time.Now(), - prf, + prf, nil, ) require_NoError(t, err) @@ -5638,7 +5638,7 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) { _, err = newFileStoreWithCreated( fcfg, scfg, time.Now(), - nil, + nil, nil, ) require_Error(t, err, errNoMainKey) } diff --git a/server/jetstream.go b/server/jetstream.go index 6c8963bd..d7e2b33f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -219,8 +219,8 @@ type keyGen func(context []byte) ([]byte, error) // Return a key generation function or nil if encryption not enabled. // keyGen defined in filestore.go - keyGen func(iv, context []byte) []byte -func (s *Server) jsKeyGen(info string) keyGen { - if ek := s.getOpts().JetStreamKey; ek != _EMPTY_ { +func (s *Server) jsKeyGen(jsKey, info string) keyGen { + if ek := jsKey; ek != _EMPTY_ { return func(context []byte) ([]byte, error) { h := hmac.New(sha256.New, []byte(ek)) if _, err := h.Write([]byte(info)); err != nil { @@ -236,37 +236,65 @@ func (s *Server) jsKeyGen(info string) keyGen { } // Decode the encrypted metafile. -func (s *Server) decryptMeta(sc StoreCipher, ekey, buf []byte, acc, context string) ([]byte, error) { +func (s *Server) decryptMeta(sc StoreCipher, ekey, buf []byte, acc, context string) ([]byte, bool, error) { if len(ekey) < minMetaKeySize { - return nil, errBadKeySize + return nil, false, errBadKeySize } - prf := s.jsKeyGen(acc) - if prf == nil { - return nil, errNoEncryption + var osc StoreCipher + switch sc { + case AES: + osc = ChaCha + case ChaCha: + osc = AES } - rb, err := prf([]byte(context)) - if err != nil { - return nil, err + type prfWithCipher struct { + keyGen + StoreCipher + } + var prfs []prfWithCipher + if prf := s.jsKeyGen(s.getOpts().JetStreamKey, acc); prf == nil { + return nil, false, errNoEncryption + } else { + // First of all, try our current encryption keys with both + // store cipher algorithms. + prfs = append(prfs, prfWithCipher{prf, sc}) + prfs = append(prfs, prfWithCipher{prf, osc}) + } + if prf := s.jsKeyGen(s.getOpts().JetStreamOldKey, acc); prf != nil { + // Then, if we have an old encryption key, try with also with + // both store cipher algorithms. + prfs = append(prfs, prfWithCipher{prf, sc}) + prfs = append(prfs, prfWithCipher{prf, osc}) } - kek, err := genEncryptionKey(sc, rb) - if err != nil { - return nil, err + for i, prf := range prfs { + rb, err := prf.keyGen([]byte(context)) + if err != nil { + continue + } + kek, err := genEncryptionKey(prf.StoreCipher, rb) + if err != nil { + continue + } + ns := kek.NonceSize() + seed, err := kek.Open(nil, ekey[:ns], ekey[ns:], nil) + if err != nil { + continue + } + aek, err := genEncryptionKey(prf.StoreCipher, seed) + if err != nil { + continue + } + if aek.NonceSize() != kek.NonceSize() { + continue + } + plain, err := aek.Open(nil, buf[:ns], buf[ns:], nil) + if err != nil { + continue + } + return plain, i > 0, nil } - ns := kek.NonceSize() - seed, err := kek.Open(nil, ekey[:ns], ekey[ns:], nil) - if err != nil { - return nil, err - } - aek, err := genEncryptionKey(sc, seed) - if err != nil { - return nil, err - } - plain, err := aek.Open(nil, buf[:ns], buf[ns:], nil) - if err != nil { - return nil, err - } - return plain, nil + return nil, false, fmt.Errorf("unable to recover keys") } // Check to make sure directory has the jetstream directory. @@ -1216,7 +1244,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } // Track if we are converting ciphers. - var osc StoreCipher var convertingCiphers bool // Check if we are encrypted. @@ -1229,21 +1256,11 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro continue } // Decode the buffer before proceeding. - nbuf, err := s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) + var nbuf []byte + nbuf, convertingCiphers, err = s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) if err != nil { - // See if we are changing ciphers. - switch sc { - case ChaCha: - nbuf, err = s.decryptMeta(AES, keyBuf, buf, a.Name, fi.Name()) - osc, convertingCiphers = AES, true - case AES: - nbuf, err = s.decryptMeta(ChaCha, keyBuf, buf, a.Name, fi.Name()) - osc, convertingCiphers = ChaCha, true - } - if err != nil { - s.Warnf(" Error decrypting our stream metafile: %v", err) - continue - } + s.Warnf(" Error decrypting our stream metafile: %v", err) + continue } buf = nbuf plaintext = false @@ -1297,7 +1314,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro if plaintext { s.Noticef(" Encrypting stream '%s > %s'", a.Name, cfg.StreamConfig.Name) } else if convertingCiphers { - s.Noticef(" Converting from %s to %s for stream '%s > %s'", osc, sc, a.Name, cfg.StreamConfig.Name) + s.Noticef(" Converting to %s for stream '%s > %s'", sc, a.Name, cfg.StreamConfig.Name) // Remove the key file to have system regenerate with the new cipher. os.Remove(keyFile) } @@ -1361,19 +1378,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") // Decode the buffer before proceeding. ctxName := e.mset.name() + tsep + ofi.Name() - nbuf, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) + nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) if err != nil { - // See if we are changing ciphers. - switch sc { - case ChaCha: - nbuf, err = s.decryptMeta(AES, key, buf, a.Name, ctxName) - case AES: - nbuf, err = s.decryptMeta(ChaCha, key, buf, a.Name, ctxName) - } - if err != nil { - s.Warnf(" Error decrypting our consumer metafile: %v", err) - continue - } + s.Warnf(" Error decrypting our consumer metafile: %v", err) + continue } buf = nbuf } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 910ba23f..61c2294e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -754,7 +754,8 @@ func (js *jetStream) setupMetaGroup() error { FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false}, StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage}, time.Now().UTC(), - s.jsKeyGen(defaultMetaGroupName), + s.jsKeyGen(s.getOpts().JetStreamKey, defaultMetaGroupName), + s.jsKeyGen(s.getOpts().JetStreamOldKey, defaultMetaGroupName), ) if err != nil { s.Errorf("Error creating filestore: %v", err) @@ -1965,7 +1966,8 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute}, StreamConfig{Name: rg.Name, Storage: FileStorage}, time.Now().UTC(), - s.jsKeyGen(rg.Name), + s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name), + s.jsKeyGen(s.getOpts().JetStreamOldKey, rg.Name), ) if err != nil { s.Errorf("Error creating filestore WAL: %v", err) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 17831152..baa20b12 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21208,3 +21208,131 @@ func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { }) } } + +func TestJetStreamServerReencryption(t *testing.T) { + storeDir := t.TempDir() + + for i, algo := range []struct { + from string + to string + }{ + {"aes", "aes"}, + {"aes", "chacha"}, + {"chacha", "chacha"}, + {"chacha", "aes"}, + } { + t.Run(fmt.Sprintf("%s_to_%s", algo.from, algo.to), func(t *testing.T) { + streamName := fmt.Sprintf("TEST_%d", i) + subjectName := fmt.Sprintf("foo_%d", i) + expected := 30 + + checkStream := func(js nats.JetStreamContext) { + si, err := js.StreamInfo(streamName) + if err != nil { + t.Fatal(err) + } + + if si.State.Msgs != uint64(expected) { + t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs) + } + + sub, err := js.PullSubscribe(subjectName, "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + c := 0 + for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) { + m.AckSync() + c++ + } + if c != expected { + t.Fatalf("Should have read back %d messages but got %d messages", expected, c) + } + } + + // First off, we start up using the original encryption key and algorithm. + // We'll create a stream and populate it with some messages. + t.Run("setup", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: S22 + listen: 127.0.0.1:-1 + jetstream: { + key: %q, + cipher: %s, + store_dir: %q + } + `, "firstencryptionkey", algo.from, storeDir))) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: streamName, + Subjects: []string{subjectName}, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for i := 0; i < expected; i++ { + if _, err := js.Publish(subjectName, []byte("ENCRYPTED PAYLOAD!!")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + checkStream(js) + }) + + // Next up, we will restart the server, this time with both the new key + // and algorithm and also the old key. At startup, the server will detect + // the change in encryption key and/or algorithm and re-encrypt the stream. + t.Run("reencrypt", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: S22 + listen: 127.0.0.1:-1 + jetstream: { + key: %q, + cipher: %s, + prev_key: %q, + store_dir: %q + } + `, "secondencryptionkey", algo.to, "firstencryptionkey", storeDir))) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + checkStream(js) + }) + + // Finally, we'll restart the server using only the new key and algorithm. + // At this point everything should have been re-encrypted, so we should still + // be able to access the stream. + t.Run("restart", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: S22 + listen: 127.0.0.1:-1 + jetstream: { + key: %q, + cipher: %s, + store_dir: %q + } + `, "secondencryptionkey", algo.to, storeDir))) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + checkStream(js) + }) + }) + } +} diff --git a/server/norace_test.go b/server/norace_test.go index e5e2fcaf..9a5ab69f 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -4433,7 +4433,7 @@ func TestNoRaceJetStreamFileStoreKeyFileCleanup(t *testing.T) { FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, StreamConfig{Name: "TEST", Storage: FileStorage}, time.Now(), - prf) + prf, nil) require_NoError(t, err) defer fs.Stop() diff --git a/server/opts.go b/server/opts.go index f2baa5df..e6fb81ff 100644 --- a/server/opts.go +++ b/server/opts.go @@ -294,6 +294,7 @@ type Options struct { JetStreamDomain string `json:"-"` JetStreamExtHint string `json:"-"` JetStreamKey string `json:"-"` + JetStreamOldKey string `json:"-"` JetStreamCipher StoreCipher `json:"-"` JetStreamUniqueTag string JetStreamLimits JSLimitOpts @@ -2077,6 +2078,8 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e doEnable = mv.(bool) case "key", "ek", "encryption_key": opts.JetStreamKey = mv.(string) + case "prev_key", "prev_ek", "prev_encryption_key": + opts.JetStreamOldKey = mv.(string) case "cipher": switch strings.ToLower(mv.(string)) { case "chacha", "chachapoly": diff --git a/server/stream.go b/server/stream.go index 2c85a9f4..93602998 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3365,12 +3365,13 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { mset.store = ms case FileStorage: s := mset.srv - prf := s.jsKeyGen(mset.acc.Name) + prf := s.jsKeyGen(s.getOpts().JetStreamKey, mset.acc.Name) if prf != nil { // We are encrypted here, fill in correct cipher selection. fsCfg.Cipher = s.getOpts().JetStreamCipher } - fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, prf) + oldprf := s.jsKeyGen(s.getOpts().JetStreamOldKey, mset.acc.Name) + fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, prf, oldprf) if err != nil { mset.mu.Unlock() return err