From 027f2e42c80aa28f3b81a9634b241676455b7703 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 17 Mar 2023 15:09:50 -0700 Subject: [PATCH 1/3] Remove snapshot of cores and maxprocs Signed-off-by: Derek Collison --- main.go | 2 -- server/events.go | 3 ++- server/monitor.go | 17 ++--------------- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/main.go b/main.go index 53652c6e..767719fc 100644 --- a/main.go +++ b/main.go @@ -134,8 +134,6 @@ func main() { s.Warnf("Failed to set GOMAXPROCS: %v", err) } else { defer undo() - // Reset these from the snapshots from init for monitor.go - server.SnapshotMonitorInfo() } s.WaitForShutdown() diff --git a/server/events.go b/server/events.go index 755cd0e6..0ff6a257 100644 --- a/server/events.go +++ b/server/events.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "net/http" + "runtime" "strconv" "strings" "sync" @@ -603,7 +604,7 @@ func (s *Server) updateServerUsage(v *ServerStats) { defer s.mu.Lock() var vss int64 pse.ProcUsage(&v.CPU, &v.Mem, &vss) - v.Cores = numCores + v.Cores = runtime.NumCPU() } // Generate a route stat for our statz update. diff --git a/server/monitor.go b/server/monitor.go index 6c2ead7c..e5895497 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -36,19 +36,6 @@ import ( "github.com/nats-io/nats-server/v2/server/pse" ) -// Snapshot this -var numCores int -var maxProcs int - -func SnapshotMonitorInfo() { - numCores = runtime.NumCPU() - maxProcs = runtime.GOMAXPROCS(0) -} - -func init() { - SnapshotMonitorInfo() -} - // Connz represents detailed information on current client connections. type Connz struct { ID string `json:"server_id"` @@ -1528,8 +1515,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { }, Start: s.start, MaxSubs: opts.MaxSubs, - Cores: numCores, - MaxProcs: maxProcs, + Cores: runtime.NumCPU(), + MaxProcs: runtime.GOMAXPROCS(0), Tags: opts.Tags, TrustedOperatorsJwt: opts.operatorJWT, TrustedOperatorsClaim: opts.TrustedOperators, From 5a16f984276e9159b860e15c6699fab44d8af162 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 19 Mar 2023 10:21:36 -0700 Subject: [PATCH 2/3] Fixed an off by one bug that under certain circumstances could cause large consumer replica states. This could lead to instability in the system. The bug would manifest in replicated consumers when certain messages could be acked out of order, and, the pending list would never go to zero. Signed-off-by: Derek Collison --- server/filestore.go | 12 +- server/filestore_test.go | 4 +- server/jetstream_cluster.go | 31 +++ server/jetstream_cluster_3_test.go | 119 +++++++++++- server/jetstream_helpers_test.go | 60 ++++-- server/norace_test.go | 298 +++++++++++++++++++++++++++++ server/test_test.go | 15 +- 7 files changed, 504 insertions(+), 35 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index fb4eb5cd..3b910e03 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6685,21 +6685,15 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { delete(o.state.Pending, sseq) dseq = p.Sequence // Use the original. } - // Now remove from redelivered. - if len(o.state.Redelivered) > 0 { - delete(o.state.Redelivered, sseq) - } - if len(o.state.Pending) == 0 { o.state.AckFloor.Consumer = o.state.Delivered.Consumer o.state.AckFloor.Stream = o.state.Delivered.Stream } else if dseq == o.state.AckFloor.Consumer+1 { - first := o.state.AckFloor.Consumer == 0 o.state.AckFloor.Consumer = dseq o.state.AckFloor.Stream = sseq - if !first && o.state.Delivered.Consumer > dseq { - for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ { + if o.state.Delivered.Consumer > dseq { + for ss := sseq + 1; ss <= o.state.Delivered.Stream; ss++ { if p, ok := o.state.Pending[ss]; ok { if p.Sequence > 0 { o.state.AckFloor.Consumer = p.Sequence - 1 @@ -6710,6 +6704,8 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { } } } + // We do these regardless. + delete(o.state.Redelivered, sseq) o.kickFlusher() return nil diff --git a/server/filestore_test.go b/server/filestore_test.go index 92ccfd2f..29e3a705 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -2776,8 +2776,8 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) { } } - testAck(1, 100, 1, 100) - testAck(3, 130, 1, 100) + testAck(1, 100, 1, 109) + testAck(3, 130, 1, 109) testAck(2, 110, 3, 149) // We do not track explicit state on previous stream floors, so we take last known -1 testAck(5, 165, 3, 149) testAck(4, 150, 5, 165) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 357ca029..e18ebe71 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -281,6 +281,37 @@ func (s *Server) JetStreamStepdownStream(account, stream string) error { return nil } +func (s *Server) JetStreamStepdownConsumer(account, stream, consumer string) error { + js, cc := s.getJetStreamCluster() + if js == nil { + return NewJSNotEnabledError() + } + if cc == nil { + return NewJSClusterNotActiveError() + } + // Grab account + acc, err := s.LookupAccount(account) + if err != nil { + return err + } + // Grab stream + mset, err := acc.lookupStream(stream) + if err != nil { + return err + } + + o := mset.lookupConsumer(consumer) + if o == nil { + return NewJSConsumerNotFoundError() + } + + if node := o.raftNode(); node != nil && node.Leader() { + node.StepDown() + } + + return nil +} + func (s *Server) JetStreamSnapshotStream(account, stream string) error { js, cc := s.getJetStreamCluster() if js == nil { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 8c94ef47..26e43aae 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3112,3 +3112,120 @@ func TestJetStreamClusterInterestBasedStreamAndConsumerSnapshots(t *testing.T) { require_NoError(t, err) require_True(t, si.State.Msgs == 0) } + +func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe(_EMPTY_, "C", nats.BindStream("TEST"), nats.ManualAck()) + require_NoError(t, err) + + num := 100 + for i := 0; i < num; i++ { + sendStreamMsg(t, nc, "foo", "data") + } + + // This one prevents the state for pending from reaching 0 and resetting, which would not show the bug. + sendStreamMsg(t, nc, "foo", "data") + + // Ack all but one and out of order and make sure all consumers have the same stored state. + msgs, err := sub.Fetch(num, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_True(t, len(msgs) == num) + + _, err = sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + + rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) + for _, m := range msgs { + m.AckSync() + } + + checkConsumerState := func(delivered, ackFloor nats.SequenceInfo, numAckPending int) { + expectedDelivered := uint64(num) + 1 + if delivered.Stream != expectedDelivered || delivered.Consumer != expectedDelivered { + t.Fatalf("Wrong delivered, expected %d got %+v", expectedDelivered, delivered) + } + expectedAck := uint64(num) + if ackFloor.Stream != expectedAck || ackFloor.Consumer != expectedAck { + t.Fatalf("Wrong ackFloor, expected %d got %+v", expectedAck, ackFloor) + } + require_True(t, numAckPending == 1) + } + + ci, err := js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending) + + // Check each consumer on each server for it's store state and make sure it matches as well. + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + require_NotNil(t, mset) + o := mset.lookupConsumer("C") + require_NotNil(t, o) + + state, err := o.store.State() + require_NoError(t, err) + + delivered := nats.SequenceInfo{Stream: state.Delivered.Stream, Consumer: state.Delivered.Consumer} + ackFloor := nats.SequenceInfo{Stream: state.AckFloor.Stream, Consumer: state.AckFloor.Consumer} + checkConsumerState(delivered, ackFloor, len(state.Pending)) + } + + // Now stepdown the consumer and move its leader and check the state after transition. + // Make the restarted server the eventual leader. + seen := make(map[*Server]bool) + cl := c.consumerLeader(globalAccountName, "TEST", "C") + require_NotNil(t, cl) + seen[cl] = true + + allSeen := func() bool { + for _, s := range c.servers { + if !seen[s] { + return false + } + } + return true + } + + checkAllLeaders := func() { + t.Helper() + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + c.waitOnConsumerLeader(globalAccountName, "TEST", "C") + if allSeen() { + return nil + } + cl := c.consumerLeader(globalAccountName, "TEST", "C") + seen[cl] = true + ci, err := js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending) + cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + return fmt.Errorf("Not all servers have been consumer leader yet") + }) + } + + checkAllLeaders() + + // No restart all servers and check again. + c.stopAll() + c.restartAll() + c.waitOnLeader() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + seen = make(map[*Server]bool) + checkAllLeaders() +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 9b9a8230..a17038ca 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -43,6 +43,15 @@ func init() { lostQuorumCheck = 4 * hbInterval } +// Used to setup clusters of clusters for tests. +type cluster struct { + servers []*Server + opts []*Options + name string + t testing.TB + nproxies []*netProxy +} + // Used to setup superclusters for tests. type supercluster struct { t *testing.T @@ -351,6 +360,9 @@ type gwProxy struct { down int } +// For use in normal clusters. +type clusterProxy = gwProxy + // Maps cluster names to proxy settings. type gwProxyMap map[string]*gwProxy @@ -692,9 +704,19 @@ func createJetStreamCluster(t testing.TB, tmpl string, clusterName, snPre string type modifyCb func(serverName, clusterName, storeDir, conf string) string -func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, snPre string, numServers int, portStart int, waitOnReady bool, modify modifyCb) *cluster { +func createJetStreamClusterAndModHook(t testing.TB, tmpl, cName, snPre string, numServers int, portStart int, waitOnReady bool, modify modifyCb) *cluster { + return createJetStreamClusterEx(t, tmpl, cName, snPre, numServers, portStart, waitOnReady, modify, nil) +} + +func createJetStreamClusterWithNetProxy(t testing.TB, cName string, numServers int, cnp *clusterProxy) *cluster { + startPorts := []int{7_122, 9_122, 11_122, 15_122} + port := startPorts[rand.Intn(len(startPorts))] + return createJetStreamClusterEx(t, jsClusterTempl, cName, _EMPTY_, numServers, port, true, nil, cnp) +} + +func createJetStreamClusterEx(t testing.TB, tmpl, cName, snPre string, numServers int, portStart int, wait bool, modify modifyCb, cnp *clusterProxy) *cluster { t.Helper() - if clusterName == _EMPTY_ || numServers < 1 { + if cName == _EMPTY_ || numServers < 1 { t.Fatalf("Bad params") } @@ -715,20 +737,32 @@ func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, sn // Build out the routes that will be shared with all configs. var routes []string + var nproxies []*netProxy for cp := portStart; cp < portStart+numServers; cp++ { - routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp)) + routeURL := fmt.Sprintf("nats-route://127.0.0.1:%d", cp) + if cnp != nil { + np := createNetProxy(cnp.rtt, cnp.up, cnp.down, routeURL, false) + nproxies = append(nproxies, np) + routeURL = np.routeURL() + } + routes = append(routes, routeURL) } routeConfig := strings.Join(routes, ",") // Go ahead and build configurations and start servers. - c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName} + c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: cName, nproxies: nproxies} + + // Start any proxies. + for _, np := range nproxies { + np.start() + } for cp := portStart; cp < portStart+numServers; cp++ { storeDir := t.TempDir() sn := fmt.Sprintf("%sS-%d", snPre, cp-portStart+1) - conf := fmt.Sprintf(tmpl, sn, storeDir, clusterName, cp, routeConfig) + conf := fmt.Sprintf(tmpl, sn, storeDir, cName, cp, routeConfig) if modify != nil { - conf = modify(sn, clusterName, storeDir, conf) + conf = modify(sn, cName, storeDir, conf) } s, o := RunServerWithConfig(createConfFile(t, []byte(conf))) c.servers = append(c.servers, s) @@ -738,7 +772,7 @@ func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, sn // Wait til we are formed and have a leader. c.checkClusterFormed() - if waitOnReady { + if wait { c.waitOnClusterReady() } @@ -1621,18 +1655,14 @@ func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { rl := rate.NewLimiter(rate.Limit(tbw), rbl) - for fr := true; ; { - sr := time.Now() + for { n, err := r.Read(buf[:]) if err != nil { return } // RTT delays - if fr || time.Since(sr) > 250*time.Millisecond { - fr = false - if delay > 0 { - time.Sleep(delay) - } + if delay > 0 { + time.Sleep(delay) } if err := rl.WaitN(ctx, n); err != nil { return diff --git a/server/norace_test.go b/server/norace_test.go index 787a6e5f..b0f4a4fc 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand" "net" "net/http" @@ -49,6 +50,7 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" + "github.com/nats-io/nuid" ) // IMPORTANT: Tests in this file are not executed when running with the -race flag. @@ -6527,3 +6529,299 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { return fmt.Errorf("Still have missing: %+v", missing) }) } + +// This is to test a publish slowdown and general instability experienced in a setup simular to this. +// We have feeder streams that are all sourced to an aggregate stream. All streams are interest retention. +// We want to monitor the avg publish time for the sync publishers to the feeder streams, the ingest rate to +// the aggregate stream, and general health of the consumers on the aggregate stream. +// Target publish rate is ~2k/s with publish time being ~40-60ms but remaining stable. +// We can also simulate max redeliveries that create interior deletes in streams. +func TestNoRaceJetStreamClusterF3Setup(t *testing.T) { + // Uncomment to run. Needs to be on a pretty big machine. Do not want as part of Travis tests atm. + skip(t) + + // These and the settings below achieve ~60ms pub time on avg and ~2k msgs per sec inbound to the aggregate stream. + // On my machine though. + np := clusterProxy{ + rtt: 2 * time.Millisecond, + up: 1 * 1024 * 1024 * 1024, // 1gbit + down: 1 * 1024 * 1024 * 1024, // 1gbit + } + + // Test params. + numSourceStreams := 20 + numConsumersPerSource := 1 + numPullersForAggregate := 50 + numPublishers := 100 + setHighStartSequence := false + simulateMaxRedeliveries := false + testTime := 60 * time.Minute // make sure to do --timeout=65m + + t.Logf("Starting Test: Total Test Time %v", testTime) + + c := createJetStreamClusterWithNetProxy(t, "R3S", 3, &np) + defer c.shutdown() + + // Do some quick sanity checking for latency stuff. + { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + Subjects: []string{"foo"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + defer js.DeleteStream("TEST") + + sl := c.streamLeader(globalAccountName, "TEST") + nc, js = jsClientConnect(t, sl) + defer nc.Close() + start := time.Now() + _, err = js.Publish("foo", []byte("hello")) + require_NoError(t, err) + // This is best case, and with client connection being close to free, this should be at least > rtt + if elapsed := time.Since(start); elapsed < np.rtt { + t.Fatalf("Expected publish time to be > %v, got %v", np.rtt, elapsed) + } + + nl := c.randomNonStreamLeader(globalAccountName, "TEST") + nc, js = jsClientConnect(t, nl) + defer nc.Close() + start = time.Now() + _, err = js.Publish("foo", []byte("hello")) + require_NoError(t, err) + // This is worst case, meaning message has to travel to leader, then to fastest replica, then back. + // So should be at 3x rtt, so check at least > 2x rtt. + if elapsed := time.Since(start); elapsed < 2*np.rtt { + t.Fatalf("Expected publish time to be > %v, got %v", 2*np.rtt, elapsed) + } + } + + // Setup source streams. + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + t.Logf("Creating %d Source Streams", numSourceStreams) + + var sources []string + wg := sync.WaitGroup{} + for i := 0; i < numSourceStreams; i++ { + sname := fmt.Sprintf("EVENT-%s", nuid.Next()) + wg.Add(1) + go func(stream string) { + defer wg.Done() + t.Logf(" %q", stream) + sources = append(sources, stream) + subj := fmt.Sprintf("%s.>", stream) + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Subjects: []string{subj}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + for j := 0; j < numConsumersPerSource; j++ { + consumer := fmt.Sprintf("C%d", j) + _, err := js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + msg.Ack() + }, nats.BindStream(stream), nats.Durable(consumer), nats.ManualAck()) + require_NoError(t, err) + } + }(sname) + } + wg.Wait() + + var streamSources []*nats.StreamSource + for _, src := range sources { + streamSources = append(streamSources, &nats.StreamSource{Name: src}) + + } + + t.Log("Creating Aggregate Stream") + + // Now create the aggregate stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "EVENTS", + Replicas: 3, + Retention: nats.InterestPolicy, + Sources: streamSources, + }) + require_NoError(t, err) + + // Set first sequence to a high number. + if setHighStartSequence { + require_NoError(t, js.PurgeStream("EVENTS", &nats.StreamPurgeRequest{Sequence: 32_000_001})) + } + + // Now create 2 pull consumers. + _, err = js.PullSubscribe(_EMPTY_, "C1", + nats.BindStream("EVENTS"), + nats.MaxDeliver(1), + nats.AckWait(10*time.Second), + nats.ManualAck(), + ) + require_NoError(t, err) + + _, err = js.PullSubscribe(_EMPTY_, "C2", + nats.BindStream("EVENTS"), + nats.MaxDeliver(1), + nats.AckWait(10*time.Second), + nats.ManualAck(), + ) + require_NoError(t, err) + + t.Logf("Creating %d Pull Subscribers", numPullersForAggregate) + + // Now create the pullers. + for _, subName := range []string{"C1", "C2"} { + for i := 0; i < numPullersForAggregate; i++ { + go func(subName string) { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sub, err := js.PullSubscribe(_EMPTY_, subName, + nats.BindStream("EVENTS"), + nats.MaxDeliver(1), + nats.AckWait(10*time.Second), + nats.ManualAck(), + ) + require_NoError(t, err) + + for { + msgs, err := sub.Fetch(25, nats.MaxWait(2*time.Second)) + if err != nil && err != nats.ErrTimeout { + t.Logf("Exiting pull subscriber %q: %v", subName, err) + return + } + // Shuffle + rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) + // Wait for a random interval up to 100ms. + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + for _, m := range msgs { + // If we want to simulate max redeliveries being hit, since not acking + // once will cause it due to subscriber setup. + // 100_000 == 0.01% + if simulateMaxRedeliveries && rand.Intn(100_000) == 0 { + md, err := m.Metadata() + require_NoError(t, err) + t.Logf("** Skipping Ack: %d **", md.Sequence.Stream) + } else { + m.Ack() + } + } + } + }(subName) + } + } + + // Now create feeder publishers. + eventTypes := []string{"PAYMENT", "SUBMISSION", "CANCEL"} + + msg := make([]byte, 2*1024) // 2k payload + rand.Read(msg) + + // For tracking pub times. + var pubs int + var totalPubTime time.Duration + var pmu sync.Mutex + last := time.Now() + + updatePubStats := func(elapsed time.Duration) { + pmu.Lock() + defer pmu.Unlock() + // Reset every 5s + if time.Since(last) > 5*time.Second { + pubs = 0 + totalPubTime = 0 + last = time.Now() + } + pubs++ + totalPubTime += elapsed + } + avgPubTime := func() time.Duration { + pmu.Lock() + np := pubs + tpt := totalPubTime + pmu.Unlock() + return tpt / time.Duration(np) + } + + t.Logf("Creating %d Publishers", numPublishers) + + for i := 0; i < numPublishers; i++ { + go func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for { + // Grab a random source stream + stream := sources[rand.Intn(len(sources))] + // Grab random event type. + evt := eventTypes[rand.Intn(len(eventTypes))] + subj := fmt.Sprintf("%s.%s", stream, evt) + start := time.Now() + _, err := js.Publish(subj, msg) + if err != nil { + t.Logf("Exiting publisher: %v", err) + return + } + elapsed := time.Since(start) + if elapsed > 5*time.Second { + t.Logf("Publish time took more than expected: %v", elapsed) + } + updatePubStats(elapsed) + } + }() + } + + t.Log("Creating Monitoring Routine - Data in ~10s") + + // Create monitoring routine. + go func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + fseq, lseq := uint64(0), uint64(0) + for { + // Grab consumers + var minAckFloor uint64 = math.MaxUint64 + for _, consumer := range []string{"C1", "C2"} { + ci, err := js.ConsumerInfo("EVENTS", consumer) + if err != nil { + t.Logf("Exiting Monitor: %v", err) + return + } + if lseq > 0 { + t.Logf("%s:\n Delivered:\t%d\n AckFloor:\t%d\n AckPending:\t%d\n NumPending:\t%d", + consumer, ci.Delivered.Stream, ci.AckFloor.Stream, ci.NumAckPending, ci.NumPending) + } + if ci.AckFloor.Stream < minAckFloor { + minAckFloor = ci.AckFloor.Stream + } + } + // Now grab aggregate stream state. + si, err := js.StreamInfo("EVENTS") + if err != nil { + t.Logf("Exiting Monitor: %v", err) + return + } + state := si.State + if lseq != 0 { + t.Logf("Stream:\n Msgs: \t%d\n First:\t%d\n Last: \t%d\n Deletes:\t%d\n", + state.Msgs, state.FirstSeq, state.LastSeq, state.NumDeleted) + t.Logf("Publish Stats:\n Msgs/s:\t%0.2f\n Avg Pub:\t%v\n\n", float64(si.State.LastSeq-lseq)/5.0, avgPubTime()) + if si.State.FirstSeq < minAckFloor && si.State.FirstSeq == fseq { + t.Log("Stream first seq < minimum ack floor") + } + } + fseq, lseq = si.State.FirstSeq, si.State.LastSeq + time.Sleep(5 * time.Second) + } + + }() + + time.Sleep(testTime) +} diff --git a/server/test_test.go b/server/test_test.go index a853310f..64717f90 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -52,14 +52,6 @@ func RunRandClientPortServer() *Server { return RunServer(&opts) } -// Used to setup clusters of clusters for tests. -type cluster struct { - servers []*Server - opts []*Options - name string - t testing.TB -} - func require_True(t *testing.T, b bool) { t.Helper() if !b { @@ -276,6 +268,11 @@ func (c *cluster) shutdown() { if c == nil { return } + // Stop any proxies. + for _, np := range c.nproxies { + np.stop() + } + // Shutdown and cleanup servers. for i, s := range c.servers { sd := s.StoreDir() s.Shutdown() From 0c1301ec148cf878dacbb465cba7cddc822a8700 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 19 Mar 2023 10:52:52 -0700 Subject: [PATCH 3/3] Fix for data race Signed-off-by: Derek Collison --- server/raft.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/raft.go b/server/raft.go index a38efe9e..6afea35e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3108,12 +3108,14 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { n.trackResponse(ar) } else if ar.term > n.term { // False here and they have a higher term. + n.Lock() n.term = ar.term n.vote = noVote n.writeTermVote() n.warn("Detected another leader with higher term, will stepdown and reset") n.stepdown.push(noLeader) n.resetWAL() + n.Unlock() } else if ar.reply != _EMPTY_ { n.catchupFollower(ar) }