From 5c8db8950636efb34beebfd944d900ca0024bf77 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 1 Aug 2023 17:37:28 -0700 Subject: [PATCH] Make sure we do not drift on accounting. Three issues were found and resolved. 1. Purge replays after recovery could execute full purge. 2. Callback was registered without lock, which could lead to skew. 3. Cluster reset could stop stream store and recreate it, which could lead to double accounting. Signed-off-by: Derek Collison --- server/jetstream_api.go | 4 +- server/jetstream_cluster.go | 13 +- server/jetstream_cluster_3_test.go | 221 ++++++++++++++++++++++++++++- server/stream.go | 4 +- 4 files changed, 232 insertions(+), 10 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 18f26686..7013a2cc 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2656,7 +2656,7 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac } // Request to have the meta leader stepdown. -// These will only be received the meta leaders, so less checking needed. +// These will only be received by the meta leader, so less checking needed. func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1c94322c..9be5bdf1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2808,12 +2808,15 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } panic(err.Error()) } - // Ignore if we are recovering and we have already processed. - if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) { + // If no explicit request, fill in with leader stamped last sequence to protect ourselves on replay during server start. + if sp.Request == nil || sp.Request.Sequence == 0 { + purgeSeq := sp.LastSeq + 1 if sp.Request == nil { - sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq} - } else { - sp.Request.Sequence = sp.LastSeq + sp.Request = &JSApiStreamPurgeRequest{Sequence: purgeSeq} + } else if sp.Request.Keep == 0 { + sp.Request.Sequence = purgeSeq + } else if isRecovering { + continue } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4f4fdcf8..159cf269 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) @@ -399,7 +400,7 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) { }) require_NoError(t, err) - // Check upadte now. + // Check update now. _, err = js.UpdateStream(&nats.StreamConfig{ Name: name, Replicas: -11, @@ -4566,3 +4567,221 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) { t.Fatalf("Expected server not to start") } } + +func TestJetStreamAccountUsageDrifts(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + opFrag := ` + operator: %s + system_account: %s + resolver: { type: MEM } + resolver_preload = { + %s : %s + %s : %s + } + ` + + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + + template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + c := createJetStreamClusterWithTemplate(t, template, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds)) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST2", + Subjects: []string{"bar"}, + }) + require_NoError(t, err) + + // These expected store values can come directory from stream info's state bytes. + // We will *= 3 for R3 + checkAccount := func(r1u, r3u uint64) { + t.Helper() + r3u *= 3 + + // Remote usage updates can be delayed, so wait for a bit for values we want. + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + info, err := js.AccountInfo() + require_NoError(t, err) + require_True(t, len(info.Tiers) >= 2) + // These can move. + if u := info.Tiers["R1"].Store; u != r1u { + return fmt.Errorf("Expected R1 to be %v, got %v", friendlyBytes(r1u), friendlyBytes(u)) + } + if u := info.Tiers["R3"].Store; u != r3u { + return fmt.Errorf("Expected R3 to be %v, got %v", friendlyBytes(r3u), friendlyBytes(u)) + } + return nil + }) + } + + checkAccount(0, 0) + + // Now add in some R3 data. + msg := bytes.Repeat([]byte("Z"), 32*1024) // 32k + smallMsg := bytes.Repeat([]byte("Z"), 4*1024) // 4k + + for i := 0; i < 1000; i++ { + js.Publish("foo", msg) + } + sir3, err := js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(0, sir3.State.Bytes) + + // Now add in some R1 data. + for i := 0; i < 100; i++ { + js.Publish("bar", msg) + } + + sir1, err := js.StreamInfo("TEST2") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // We will now test a bunch of scenarios to see that we are doing accounting correctly. + + // Since our R3 has a limit of 1000 msgs, let's add in more msgs and drop older ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Move our R3 stream leader and make sure acounting is correct. + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now scale down. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 1, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes+sir3.State.Bytes, 0) + + // Add in more msgs which will replace the older and bigger ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + // Now scale back up. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Test Purge. + err = js.PurgeStream("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, 0) + + for i := 0; i < 1000; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + requestLeaderStepDown := func() { + ml := c.leader() + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if cml := c.leader(); cml == ml { + nc.Request(JSApiLeaderStepDown, nil, time.Second) + return fmt.Errorf("Metaleader has not moved yet") + } + return nil + }) + } + + // Test meta leader stepdowns. + for i := 0; i < len(c.servers); i++ { + requestLeaderStepDown() + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } + + // Now test cluster reset operations where we internally reset the NRG and optionally the stream too. + nl := c.randomNonStreamLeader(aExpPub, "TEST1") + acc, err := nl.LookupAccount(aExpPub) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST1") + require_NoError(t, err) + // NRG only + mset.resetClusteredState(nil) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + // Now NRG and Stream state itself. + mset.resetClusteredState(errFirstSequenceMismatch) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now test server restart + for _, s := range c.servers { + s.Shutdown() + s.WaitForShutdown() + s = c.restartServer(s) + + // Wait on healthz and leader etc. + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + if hs := s.healthz(nil); hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + c.waitOnLeader() + c.waitOnStreamLeader(aExpPub, "TEST1") + c.waitOnStreamLeader(aExpPub, "TEST2") + + // Now check account again. + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } +} diff --git a/server/stream.go b/server/stream.go index b343fb9c..c9280648 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3364,9 +3364,9 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { // Register our server. fs.registerServer(s) } - mset.mu.Unlock() - + // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates) + mset.mu.Unlock() return nil }