[FIXED] Fix for a bug that would make normal streams use the wrong block size. (#4478)

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-02 14:14:50 -07:00
committed by GitHub
2 changed files with 92 additions and 4 deletions

View File

@@ -292,7 +292,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}
// Default values.
if fcfg.BlockSize == 0 {
fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, fcfg.Cipher)
fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, prf != nil)
}
if fcfg.BlockSize > maxBlockSize {
return nil, fmt.Errorf("filestore max block size is %s", friendlyBytes(maxBlockSize))
@@ -462,7 +462,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
return nil
}
func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) uint64 {
func dynBlkSize(retention RetentionPolicy, maxBytes int64, encrypted bool) uint64 {
if maxBytes > 0 {
blkSize := (maxBytes / 4) + 1 // (25% overhead)
// Round up to nearest 100
@@ -476,7 +476,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u
} else {
blkSize = defaultMediumBlockSize
}
if cipher != NoCipher && blkSize > maximumEncryptedBlockSize {
if encrypted && blkSize > maximumEncryptedBlockSize {
// Notes on this below.
blkSize = maximumEncryptedBlockSize
}
@@ -484,7 +484,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u
}
switch {
case cipher != NoCipher:
case encrypted:
// In the case of encrypted stores, large blocks can result in worsened perf
// since many writes on disk involve re-encrypting the entire block. For now,
// we will enforce a cap on the block size when encryption is enabled to avoid

View File

@@ -5296,3 +5296,91 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
cia.Created, cib.Created = now, now
checkConsumerInfo(cia, cib)
}
// Discovered that we are not properly setting certain default filestore blkSizes.
func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Nowmal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
// KV
_, err = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)
blkSize := func(fs *fileStore) uint64 {
fs.mu.RLock()
defer fs.mu.RUnlock()
return fs.fcfg.BlockSize
}
// We will check now the following filestores.
// meta
// TEST stream and NRG
// C3 NRG
// KV_TEST stream and NRG
for _, s := range c.servers {
js, cc := s.getJetStreamCluster()
// META
js.mu.RLock()
meta := cc.meta
js.mu.RUnlock()
require_True(t, meta != nil)
fs := meta.(*raft).wal.(*fileStore)
require_True(t, blkSize(fs) == defaultMetaFSBlkSize)
// TEST STREAM
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
mset.mu.RLock()
fs = mset.store.(*fileStore)
mset.mu.RUnlock()
require_True(t, blkSize(fs) == defaultLargeBlockSize)
// KV STREAM
// Now the KV which is different default size.
kv, err := s.GlobalAccount().lookupStream("KV_TEST")
require_NoError(t, err)
kv.mu.RLock()
fs = kv.store.(*fileStore)
kv.mu.RUnlock()
require_True(t, blkSize(fs) == defaultKVBlockSize)
// Now check NRGs
// TEST Stream
n := mset.raftNode()
require_True(t, n != nil)
fs = n.(*raft).wal.(*fileStore)
require_True(t, blkSize(fs) == defaultMediumBlockSize)
// KV TEST Stream
n = kv.raftNode()
require_True(t, n != nil)
fs = n.(*raft).wal.(*fileStore)
require_True(t, blkSize(fs) == defaultMediumBlockSize)
// Consumer
o := mset.lookupConsumer("C3")
require_True(t, o != nil)
n = o.raftNode()
require_True(t, n != nil)
fs = n.(*raft).wal.(*fileStore)
require_True(t, blkSize(fs) == defaultMediumBlockSize)
}
}