From b8d1ac94758379947d3a538d7168ef45d5da774d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 1 Aug 2023 17:34:14 -0700 Subject: [PATCH 1/4] Allow long form resolver config to be of type MEM Signed-off-by: Derek Collison --- server/opts.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/server/opts.go b/server/opts.go index 0699f77b..ca45d585 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1186,17 +1186,22 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error *errors = append(*errors, &configErr{tk, err.Error()}) return } - if dir == "" { - *errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"}) - return - } - if info, _ := os.Stat(dir); info != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) { - *errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"}) - return + + checkDir := func() { + if dir == _EMPTY_ { + *errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"}) + return + } + if info, _ := os.Stat(dir); info != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) { + *errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"}) + return + } } + var res AccountResolver switch strings.ToUpper(dirType) { case "CACHE": + checkDir() if sync != 0 { *errors = append(*errors, &configErr{tk, "CACHE does not accept sync"}) } @@ -1208,6 +1213,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error } res, err = NewCacheDirAccResolver(dir, limit, ttl, opts...) case "FULL": + checkDir() if ttl != 0 { *errors = append(*errors, &configErr{tk, "FULL does not accept ttl"}) } @@ -1223,6 +1229,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error } } res, err = NewDirAccResolver(dir, limit, sync, delete, opts...) + case "MEM", "MEMORY": + res = &MemAccResolver{} } if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) From 97827be97d3ff629c992084d5f778148d5d24776 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 1 Aug 2023 17:36:40 -0700 Subject: [PATCH 2/4] Call up to upper layers on Stop() for accounting purposes Signed-off-by: Derek Collison --- server/filestore.go | 11 +++++++++-- server/memstore.go | 3 ++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index c95b1126..7b167aef 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5421,13 +5421,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { var purged, bytes uint64 - // We have to delete interior messages. fs.mu.Lock() + // Same as purge all. if lseq := fs.state.LastSeq; seq > lseq { fs.mu.Unlock() return fs.purge(seq) } - + // We have to delete interior messages. smb := fs.selectMsgBlock(seq) if smb == nil { fs.mu.Unlock() @@ -6345,6 +6345,9 @@ func (fs *fileStore) Stop() error { fs.cancelSyncTimer() fs.cancelAgeChk() + // We should update the upper usage layer on a stop. + cb, bytes := fs.scb, int64(fs.state.Bytes) + var _cfs [256]ConsumerStore cfs := append(_cfs[:0], fs.cfs...) fs.cfs = nil @@ -6354,6 +6357,10 @@ func (fs *fileStore) Stop() error { o.Stop() } + if bytes > 0 && cb != nil { + cb(0, -bytes, 0, _EMPTY_) + } + return nil } diff --git a/server/memstore.go b/server/memstore.go index 0b71e906..b35e3d29 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1144,11 +1144,12 @@ func memStoreMsgSize(subj string, hdr, msg []byte) uint64 { // Delete is same as Stop for memory store. func (ms *memStore) Delete() error { - ms.Purge() return ms.Stop() } func (ms *memStore) Stop() error { + // These can't come back, so stop is same as Delete. + ms.Purge() ms.mu.Lock() if ms.ageChk != nil { ms.ageChk.Stop() From 5c8db8950636efb34beebfd944d900ca0024bf77 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 1 Aug 2023 17:37:28 -0700 Subject: [PATCH 3/4] Make sure we do not drift on accounting. Three issues were found and resolved. 1. Purge replays after recovery could execute full purge. 2. Callback was registered without lock, which could lead to skew. 3. Cluster reset could stop stream store and recreate it, which could lead to double accounting. Signed-off-by: Derek Collison --- server/jetstream_api.go | 4 +- server/jetstream_cluster.go | 13 +- server/jetstream_cluster_3_test.go | 221 ++++++++++++++++++++++++++++- server/stream.go | 4 +- 4 files changed, 232 insertions(+), 10 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 18f26686..7013a2cc 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2656,7 +2656,7 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac } // Request to have the meta leader stepdown. -// These will only be received the meta leaders, so less checking needed. +// These will only be received by the meta leader, so less checking needed. func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1c94322c..9be5bdf1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2808,12 +2808,15 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } panic(err.Error()) } - // Ignore if we are recovering and we have already processed. - if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) { + // If no explicit request, fill in with leader stamped last sequence to protect ourselves on replay during server start. + if sp.Request == nil || sp.Request.Sequence == 0 { + purgeSeq := sp.LastSeq + 1 if sp.Request == nil { - sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq} - } else { - sp.Request.Sequence = sp.LastSeq + sp.Request = &JSApiStreamPurgeRequest{Sequence: purgeSeq} + } else if sp.Request.Keep == 0 { + sp.Request.Sequence = purgeSeq + } else if isRecovering { + continue } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4f4fdcf8..159cf269 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) @@ -399,7 +400,7 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) { }) require_NoError(t, err) - // Check upadte now. + // Check update now. _, err = js.UpdateStream(&nats.StreamConfig{ Name: name, Replicas: -11, @@ -4566,3 +4567,221 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) { t.Fatalf("Expected server not to start") } } + +func TestJetStreamAccountUsageDrifts(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + opFrag := ` + operator: %s + system_account: %s + resolver: { type: MEM } + resolver_preload = { + %s : %s + %s : %s + } + ` + + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + + template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + c := createJetStreamClusterWithTemplate(t, template, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds)) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST2", + Subjects: []string{"bar"}, + }) + require_NoError(t, err) + + // These expected store values can come directory from stream info's state bytes. + // We will *= 3 for R3 + checkAccount := func(r1u, r3u uint64) { + t.Helper() + r3u *= 3 + + // Remote usage updates can be delayed, so wait for a bit for values we want. + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + info, err := js.AccountInfo() + require_NoError(t, err) + require_True(t, len(info.Tiers) >= 2) + // These can move. + if u := info.Tiers["R1"].Store; u != r1u { + return fmt.Errorf("Expected R1 to be %v, got %v", friendlyBytes(r1u), friendlyBytes(u)) + } + if u := info.Tiers["R3"].Store; u != r3u { + return fmt.Errorf("Expected R3 to be %v, got %v", friendlyBytes(r3u), friendlyBytes(u)) + } + return nil + }) + } + + checkAccount(0, 0) + + // Now add in some R3 data. + msg := bytes.Repeat([]byte("Z"), 32*1024) // 32k + smallMsg := bytes.Repeat([]byte("Z"), 4*1024) // 4k + + for i := 0; i < 1000; i++ { + js.Publish("foo", msg) + } + sir3, err := js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(0, sir3.State.Bytes) + + // Now add in some R1 data. + for i := 0; i < 100; i++ { + js.Publish("bar", msg) + } + + sir1, err := js.StreamInfo("TEST2") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // We will now test a bunch of scenarios to see that we are doing accounting correctly. + + // Since our R3 has a limit of 1000 msgs, let's add in more msgs and drop older ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Move our R3 stream leader and make sure acounting is correct. + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now scale down. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 1, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes+sir3.State.Bytes, 0) + + // Add in more msgs which will replace the older and bigger ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + // Now scale back up. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Test Purge. + err = js.PurgeStream("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, 0) + + for i := 0; i < 1000; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + requestLeaderStepDown := func() { + ml := c.leader() + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if cml := c.leader(); cml == ml { + nc.Request(JSApiLeaderStepDown, nil, time.Second) + return fmt.Errorf("Metaleader has not moved yet") + } + return nil + }) + } + + // Test meta leader stepdowns. + for i := 0; i < len(c.servers); i++ { + requestLeaderStepDown() + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } + + // Now test cluster reset operations where we internally reset the NRG and optionally the stream too. + nl := c.randomNonStreamLeader(aExpPub, "TEST1") + acc, err := nl.LookupAccount(aExpPub) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST1") + require_NoError(t, err) + // NRG only + mset.resetClusteredState(nil) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + // Now NRG and Stream state itself. + mset.resetClusteredState(errFirstSequenceMismatch) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now test server restart + for _, s := range c.servers { + s.Shutdown() + s.WaitForShutdown() + s = c.restartServer(s) + + // Wait on healthz and leader etc. + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + if hs := s.healthz(nil); hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + c.waitOnLeader() + c.waitOnStreamLeader(aExpPub, "TEST1") + c.waitOnStreamLeader(aExpPub, "TEST2") + + // Now check account again. + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } +} diff --git a/server/stream.go b/server/stream.go index b343fb9c..c9280648 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3364,9 +3364,9 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { // Register our server. fs.registerServer(s) } - mset.mu.Unlock() - + // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates) + mset.mu.Unlock() return nil } From 787b0d922f0d996b1e9baea41e0af1eed247f967 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 1 Aug 2023 20:28:37 -0700 Subject: [PATCH 4/4] Do not hold onto no interest subjects from a client in the unlocked cache. If sending lots of different subjects all with no interest performance could be affected. Signed-off-by: Derek Collison --- server/client.go | 18 ++++++++++-------- server/sublist.go | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/server/client.go b/server/client.go index 4886a18e..b6f73442 100644 --- a/server/client.go +++ b/server/client.go @@ -3670,14 +3670,16 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { // Go back to the sublist data structure. if !ok { r = acc.sl.Match(string(c.pa.subject)) - c.in.results[string(c.pa.subject)] = r - // Prune the results cache. Keeps us from unbounded growth. Random delete. - if len(c.in.results) > maxResultCacheSize { - n := 0 - for subject := range c.in.results { - delete(c.in.results, subject) - if n++; n > pruneSize { - break + if len(r.psubs)+len(r.qsubs) > 0 { + c.in.results[string(c.pa.subject)] = r + // Prune the results cache. Keeps us from unbounded growth. Random delete. + if len(c.in.results) > maxResultCacheSize { + n := 0 + for subject := range c.in.results { + delete(c.in.results, subject) + if n++; n > pruneSize { + break + } } } } diff --git a/server/sublist.go b/server/sublist.go index 48375b6b..d7ffac0c 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -48,7 +48,7 @@ const ( // cacheMax is used to bound limit the frontend cache slCacheMax = 1024 // If we run a sweeper we will drain to this count. - slCacheSweep = 512 + slCacheSweep = 256 // plistMin is our lower bounds to create a fast plist for Match. plistMin = 256 )