From 34ae2bf4cb0f03a9681edd2076d927eac0acacb5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 2 Sep 2023 13:56:34 -0700 Subject: [PATCH] Fix for a bug that would make normal streams use the wrong block size. Signed-off-by: Derek Collison --- server/filestore.go | 8 +-- server/jetstream_cluster_3_test.go | 88 ++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 3ca9c7da..34ef6ad8 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -292,7 +292,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } // Default values. if fcfg.BlockSize == 0 { - fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, fcfg.Cipher) + fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, prf != nil) } if fcfg.BlockSize > maxBlockSize { return nil, fmt.Errorf("filestore max block size is %s", friendlyBytes(maxBlockSize)) @@ -462,7 +462,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { return nil } -func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) uint64 { +func dynBlkSize(retention RetentionPolicy, maxBytes int64, encrypted bool) uint64 { if maxBytes > 0 { blkSize := (maxBytes / 4) + 1 // (25% overhead) // Round up to nearest 100 @@ -476,7 +476,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u } else { blkSize = defaultMediumBlockSize } - if cipher != NoCipher && blkSize > maximumEncryptedBlockSize { + if encrypted && blkSize > maximumEncryptedBlockSize { // Notes on this below. blkSize = maximumEncryptedBlockSize } @@ -484,7 +484,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u } switch { - case cipher != NoCipher: + case encrypted: // In the case of encrypted stores, large blocks can result in worsened perf // since many writes on disk involve re-encrypting the entire block. For now, // we will enforce a cap on the block size when encryption is enabled to avoid diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 8cf77adf..7786f414 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5296,3 +5296,91 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { cia.Created, cib.Created = now, now checkConsumerInfo(cia, cib) } + +// Discovered that we are not properly setting certain default filestore blkSizes. +func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Nowmal Stream + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C3", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // KV + _, err = js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "TEST", + Replicas: 3, + }) + require_NoError(t, err) + + blkSize := func(fs *fileStore) uint64 { + fs.mu.RLock() + defer fs.mu.RUnlock() + return fs.fcfg.BlockSize + } + + // We will check now the following filestores. + // meta + // TEST stream and NRG + // C3 NRG + // KV_TEST stream and NRG + for _, s := range c.servers { + js, cc := s.getJetStreamCluster() + // META + js.mu.RLock() + meta := cc.meta + js.mu.RUnlock() + require_True(t, meta != nil) + fs := meta.(*raft).wal.(*fileStore) + require_True(t, blkSize(fs) == defaultMetaFSBlkSize) + + // TEST STREAM + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + mset.mu.RLock() + fs = mset.store.(*fileStore) + mset.mu.RUnlock() + require_True(t, blkSize(fs) == defaultLargeBlockSize) + + // KV STREAM + // Now the KV which is different default size. + kv, err := s.GlobalAccount().lookupStream("KV_TEST") + require_NoError(t, err) + kv.mu.RLock() + fs = kv.store.(*fileStore) + kv.mu.RUnlock() + require_True(t, blkSize(fs) == defaultKVBlockSize) + + // Now check NRGs + // TEST Stream + n := mset.raftNode() + require_True(t, n != nil) + fs = n.(*raft).wal.(*fileStore) + require_True(t, blkSize(fs) == defaultMediumBlockSize) + // KV TEST Stream + n = kv.raftNode() + require_True(t, n != nil) + fs = n.(*raft).wal.(*fileStore) + require_True(t, blkSize(fs) == defaultMediumBlockSize) + // Consumer + o := mset.lookupConsumer("C3") + require_True(t, o != nil) + n = o.raftNode() + require_True(t, n != nil) + fs = n.(*raft).wal.(*fileStore) + require_True(t, blkSize(fs) == defaultMediumBlockSize) + } +}