diff --git a/main.go b/main.go index 53652c6e..767719fc 100644 --- a/main.go +++ b/main.go @@ -134,8 +134,6 @@ func main() { s.Warnf("Failed to set GOMAXPROCS: %v", err) } else { defer undo() - // Reset these from the snapshots from init for monitor.go - server.SnapshotMonitorInfo() } s.WaitForShutdown() diff --git a/server/events.go b/server/events.go index 7c99f46b..939291a1 100644 --- a/server/events.go +++ b/server/events.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "net/http" + "runtime" "strconv" "strings" "sync" @@ -634,7 +635,7 @@ func (s *Server) updateServerUsage(v *ServerStats) { defer s.mu.Lock() var vss int64 pse.ProcUsage(&v.CPU, &v.Mem, &vss) - v.Cores = numCores + v.Cores = runtime.NumCPU() } // Generate a route stat for our statz update. diff --git a/server/filestore.go b/server/filestore.go index fb4eb5cd..3b910e03 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6685,21 +6685,15 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { delete(o.state.Pending, sseq) dseq = p.Sequence // Use the original. } - // Now remove from redelivered. - if len(o.state.Redelivered) > 0 { - delete(o.state.Redelivered, sseq) - } - if len(o.state.Pending) == 0 { o.state.AckFloor.Consumer = o.state.Delivered.Consumer o.state.AckFloor.Stream = o.state.Delivered.Stream } else if dseq == o.state.AckFloor.Consumer+1 { - first := o.state.AckFloor.Consumer == 0 o.state.AckFloor.Consumer = dseq o.state.AckFloor.Stream = sseq - if !first && o.state.Delivered.Consumer > dseq { - for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ { + if o.state.Delivered.Consumer > dseq { + for ss := sseq + 1; ss <= o.state.Delivered.Stream; ss++ { if p, ok := o.state.Pending[ss]; ok { if p.Sequence > 0 { o.state.AckFloor.Consumer = p.Sequence - 1 @@ -6710,6 +6704,8 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { } } } + // We do these regardless. + delete(o.state.Redelivered, sseq) o.kickFlusher() return nil diff --git a/server/filestore_test.go b/server/filestore_test.go index 92ccfd2f..29e3a705 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -2776,8 +2776,8 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) { } } - testAck(1, 100, 1, 100) - testAck(3, 130, 1, 100) + testAck(1, 100, 1, 109) + testAck(3, 130, 1, 109) testAck(2, 110, 3, 149) // We do not track explicit state on previous stream floors, so we take last known -1 testAck(5, 165, 3, 149) testAck(4, 150, 5, 165) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 357ca029..e18ebe71 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -281,6 +281,37 @@ func (s *Server) JetStreamStepdownStream(account, stream string) error { return nil } +func (s *Server) JetStreamStepdownConsumer(account, stream, consumer string) error { + js, cc := s.getJetStreamCluster() + if js == nil { + return NewJSNotEnabledError() + } + if cc == nil { + return NewJSClusterNotActiveError() + } + // Grab account + acc, err := s.LookupAccount(account) + if err != nil { + return err + } + // Grab stream + mset, err := acc.lookupStream(stream) + if err != nil { + return err + } + + o := mset.lookupConsumer(consumer) + if o == nil { + return NewJSConsumerNotFoundError() + } + + if node := o.raftNode(); node != nil && node.Leader() { + node.StepDown() + } + + return nil +} + func (s *Server) JetStreamSnapshotStream(account, stream string) error { js, cc := s.getJetStreamCluster() if js == nil { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 8c94ef47..26e43aae 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3112,3 +3112,120 @@ func TestJetStreamClusterInterestBasedStreamAndConsumerSnapshots(t *testing.T) { require_NoError(t, err) require_True(t, si.State.Msgs == 0) } + +func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe(_EMPTY_, "C", nats.BindStream("TEST"), nats.ManualAck()) + require_NoError(t, err) + + num := 100 + for i := 0; i < num; i++ { + sendStreamMsg(t, nc, "foo", "data") + } + + // This one prevents the state for pending from reaching 0 and resetting, which would not show the bug. + sendStreamMsg(t, nc, "foo", "data") + + // Ack all but one and out of order and make sure all consumers have the same stored state. + msgs, err := sub.Fetch(num, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_True(t, len(msgs) == num) + + _, err = sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + + rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) + for _, m := range msgs { + m.AckSync() + } + + checkConsumerState := func(delivered, ackFloor nats.SequenceInfo, numAckPending int) { + expectedDelivered := uint64(num) + 1 + if delivered.Stream != expectedDelivered || delivered.Consumer != expectedDelivered { + t.Fatalf("Wrong delivered, expected %d got %+v", expectedDelivered, delivered) + } + expectedAck := uint64(num) + if ackFloor.Stream != expectedAck || ackFloor.Consumer != expectedAck { + t.Fatalf("Wrong ackFloor, expected %d got %+v", expectedAck, ackFloor) + } + require_True(t, numAckPending == 1) + } + + ci, err := js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending) + + // Check each consumer on each server for it's store state and make sure it matches as well. + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + require_NotNil(t, mset) + o := mset.lookupConsumer("C") + require_NotNil(t, o) + + state, err := o.store.State() + require_NoError(t, err) + + delivered := nats.SequenceInfo{Stream: state.Delivered.Stream, Consumer: state.Delivered.Consumer} + ackFloor := nats.SequenceInfo{Stream: state.AckFloor.Stream, Consumer: state.AckFloor.Consumer} + checkConsumerState(delivered, ackFloor, len(state.Pending)) + } + + // Now stepdown the consumer and move its leader and check the state after transition. + // Make the restarted server the eventual leader. + seen := make(map[*Server]bool) + cl := c.consumerLeader(globalAccountName, "TEST", "C") + require_NotNil(t, cl) + seen[cl] = true + + allSeen := func() bool { + for _, s := range c.servers { + if !seen[s] { + return false + } + } + return true + } + + checkAllLeaders := func() { + t.Helper() + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + c.waitOnConsumerLeader(globalAccountName, "TEST", "C") + if allSeen() { + return nil + } + cl := c.consumerLeader(globalAccountName, "TEST", "C") + seen[cl] = true + ci, err := js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending) + cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + return fmt.Errorf("Not all servers have been consumer leader yet") + }) + } + + checkAllLeaders() + + // No restart all servers and check again. + c.stopAll() + c.restartAll() + c.waitOnLeader() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + seen = make(map[*Server]bool) + checkAllLeaders() +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 9b9a8230..a17038ca 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -43,6 +43,15 @@ func init() { lostQuorumCheck = 4 * hbInterval } +// Used to setup clusters of clusters for tests. +type cluster struct { + servers []*Server + opts []*Options + name string + t testing.TB + nproxies []*netProxy +} + // Used to setup superclusters for tests. type supercluster struct { t *testing.T @@ -351,6 +360,9 @@ type gwProxy struct { down int } +// For use in normal clusters. +type clusterProxy = gwProxy + // Maps cluster names to proxy settings. type gwProxyMap map[string]*gwProxy @@ -692,9 +704,19 @@ func createJetStreamCluster(t testing.TB, tmpl string, clusterName, snPre string type modifyCb func(serverName, clusterName, storeDir, conf string) string -func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, snPre string, numServers int, portStart int, waitOnReady bool, modify modifyCb) *cluster { +func createJetStreamClusterAndModHook(t testing.TB, tmpl, cName, snPre string, numServers int, portStart int, waitOnReady bool, modify modifyCb) *cluster { + return createJetStreamClusterEx(t, tmpl, cName, snPre, numServers, portStart, waitOnReady, modify, nil) +} + +func createJetStreamClusterWithNetProxy(t testing.TB, cName string, numServers int, cnp *clusterProxy) *cluster { + startPorts := []int{7_122, 9_122, 11_122, 15_122} + port := startPorts[rand.Intn(len(startPorts))] + return createJetStreamClusterEx(t, jsClusterTempl, cName, _EMPTY_, numServers, port, true, nil, cnp) +} + +func createJetStreamClusterEx(t testing.TB, tmpl, cName, snPre string, numServers int, portStart int, wait bool, modify modifyCb, cnp *clusterProxy) *cluster { t.Helper() - if clusterName == _EMPTY_ || numServers < 1 { + if cName == _EMPTY_ || numServers < 1 { t.Fatalf("Bad params") } @@ -715,20 +737,32 @@ func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, sn // Build out the routes that will be shared with all configs. var routes []string + var nproxies []*netProxy for cp := portStart; cp < portStart+numServers; cp++ { - routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp)) + routeURL := fmt.Sprintf("nats-route://127.0.0.1:%d", cp) + if cnp != nil { + np := createNetProxy(cnp.rtt, cnp.up, cnp.down, routeURL, false) + nproxies = append(nproxies, np) + routeURL = np.routeURL() + } + routes = append(routes, routeURL) } routeConfig := strings.Join(routes, ",") // Go ahead and build configurations and start servers. - c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName} + c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: cName, nproxies: nproxies} + + // Start any proxies. + for _, np := range nproxies { + np.start() + } for cp := portStart; cp < portStart+numServers; cp++ { storeDir := t.TempDir() sn := fmt.Sprintf("%sS-%d", snPre, cp-portStart+1) - conf := fmt.Sprintf(tmpl, sn, storeDir, clusterName, cp, routeConfig) + conf := fmt.Sprintf(tmpl, sn, storeDir, cName, cp, routeConfig) if modify != nil { - conf = modify(sn, clusterName, storeDir, conf) + conf = modify(sn, cName, storeDir, conf) } s, o := RunServerWithConfig(createConfFile(t, []byte(conf))) c.servers = append(c.servers, s) @@ -738,7 +772,7 @@ func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, sn // Wait til we are formed and have a leader. c.checkClusterFormed() - if waitOnReady { + if wait { c.waitOnClusterReady() } @@ -1621,18 +1655,14 @@ func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { rl := rate.NewLimiter(rate.Limit(tbw), rbl) - for fr := true; ; { - sr := time.Now() + for { n, err := r.Read(buf[:]) if err != nil { return } // RTT delays - if fr || time.Since(sr) > 250*time.Millisecond { - fr = false - if delay > 0 { - time.Sleep(delay) - } + if delay > 0 { + time.Sleep(delay) } if err := rl.WaitN(ctx, n); err != nil { return diff --git a/server/monitor.go b/server/monitor.go index 5722fa54..5d81deb5 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -38,19 +38,6 @@ import ( "github.com/nats-io/nats-server/v2/server/pse" ) -// Snapshot this -var numCores int -var maxProcs int - -func SnapshotMonitorInfo() { - numCores = runtime.NumCPU() - maxProcs = runtime.GOMAXPROCS(0) -} - -func init() { - SnapshotMonitorInfo() -} - // Connz represents detailed information on current client connections. type Connz struct { ID string `json:"server_id"` @@ -1530,8 +1517,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { }, Start: s.start, MaxSubs: opts.MaxSubs, - Cores: numCores, - MaxProcs: maxProcs, + Cores: runtime.NumCPU(), + MaxProcs: runtime.GOMAXPROCS(0), Tags: opts.Tags, TrustedOperatorsJwt: opts.operatorJWT, TrustedOperatorsClaim: opts.TrustedOperators, diff --git a/server/norace_test.go b/server/norace_test.go index 787a6e5f..b0f4a4fc 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand" "net" "net/http" @@ -49,6 +50,7 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" + "github.com/nats-io/nuid" ) // IMPORTANT: Tests in this file are not executed when running with the -race flag. @@ -6527,3 +6529,299 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { return fmt.Errorf("Still have missing: %+v", missing) }) } + +// This is to test a publish slowdown and general instability experienced in a setup simular to this. +// We have feeder streams that are all sourced to an aggregate stream. All streams are interest retention. +// We want to monitor the avg publish time for the sync publishers to the feeder streams, the ingest rate to +// the aggregate stream, and general health of the consumers on the aggregate stream. +// Target publish rate is ~2k/s with publish time being ~40-60ms but remaining stable. +// We can also simulate max redeliveries that create interior deletes in streams. +func TestNoRaceJetStreamClusterF3Setup(t *testing.T) { + // Uncomment to run. Needs to be on a pretty big machine. Do not want as part of Travis tests atm. + skip(t) + + // These and the settings below achieve ~60ms pub time on avg and ~2k msgs per sec inbound to the aggregate stream. + // On my machine though. + np := clusterProxy{ + rtt: 2 * time.Millisecond, + up: 1 * 1024 * 1024 * 1024, // 1gbit + down: 1 * 1024 * 1024 * 1024, // 1gbit + } + + // Test params. + numSourceStreams := 20 + numConsumersPerSource := 1 + numPullersForAggregate := 50 + numPublishers := 100 + setHighStartSequence := false + simulateMaxRedeliveries := false + testTime := 60 * time.Minute // make sure to do --timeout=65m + + t.Logf("Starting Test: Total Test Time %v", testTime) + + c := createJetStreamClusterWithNetProxy(t, "R3S", 3, &np) + defer c.shutdown() + + // Do some quick sanity checking for latency stuff. + { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + Subjects: []string{"foo"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + defer js.DeleteStream("TEST") + + sl := c.streamLeader(globalAccountName, "TEST") + nc, js = jsClientConnect(t, sl) + defer nc.Close() + start := time.Now() + _, err = js.Publish("foo", []byte("hello")) + require_NoError(t, err) + // This is best case, and with client connection being close to free, this should be at least > rtt + if elapsed := time.Since(start); elapsed < np.rtt { + t.Fatalf("Expected publish time to be > %v, got %v", np.rtt, elapsed) + } + + nl := c.randomNonStreamLeader(globalAccountName, "TEST") + nc, js = jsClientConnect(t, nl) + defer nc.Close() + start = time.Now() + _, err = js.Publish("foo", []byte("hello")) + require_NoError(t, err) + // This is worst case, meaning message has to travel to leader, then to fastest replica, then back. + // So should be at 3x rtt, so check at least > 2x rtt. + if elapsed := time.Since(start); elapsed < 2*np.rtt { + t.Fatalf("Expected publish time to be > %v, got %v", 2*np.rtt, elapsed) + } + } + + // Setup source streams. + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + t.Logf("Creating %d Source Streams", numSourceStreams) + + var sources []string + wg := sync.WaitGroup{} + for i := 0; i < numSourceStreams; i++ { + sname := fmt.Sprintf("EVENT-%s", nuid.Next()) + wg.Add(1) + go func(stream string) { + defer wg.Done() + t.Logf(" %q", stream) + sources = append(sources, stream) + subj := fmt.Sprintf("%s.>", stream) + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Subjects: []string{subj}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + for j := 0; j < numConsumersPerSource; j++ { + consumer := fmt.Sprintf("C%d", j) + _, err := js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + msg.Ack() + }, nats.BindStream(stream), nats.Durable(consumer), nats.ManualAck()) + require_NoError(t, err) + } + }(sname) + } + wg.Wait() + + var streamSources []*nats.StreamSource + for _, src := range sources { + streamSources = append(streamSources, &nats.StreamSource{Name: src}) + + } + + t.Log("Creating Aggregate Stream") + + // Now create the aggregate stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "EVENTS", + Replicas: 3, + Retention: nats.InterestPolicy, + Sources: streamSources, + }) + require_NoError(t, err) + + // Set first sequence to a high number. + if setHighStartSequence { + require_NoError(t, js.PurgeStream("EVENTS", &nats.StreamPurgeRequest{Sequence: 32_000_001})) + } + + // Now create 2 pull consumers. + _, err = js.PullSubscribe(_EMPTY_, "C1", + nats.BindStream("EVENTS"), + nats.MaxDeliver(1), + nats.AckWait(10*time.Second), + nats.ManualAck(), + ) + require_NoError(t, err) + + _, err = js.PullSubscribe(_EMPTY_, "C2", + nats.BindStream("EVENTS"), + nats.MaxDeliver(1), + nats.AckWait(10*time.Second), + nats.ManualAck(), + ) + require_NoError(t, err) + + t.Logf("Creating %d Pull Subscribers", numPullersForAggregate) + + // Now create the pullers. + for _, subName := range []string{"C1", "C2"} { + for i := 0; i < numPullersForAggregate; i++ { + go func(subName string) { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sub, err := js.PullSubscribe(_EMPTY_, subName, + nats.BindStream("EVENTS"), + nats.MaxDeliver(1), + nats.AckWait(10*time.Second), + nats.ManualAck(), + ) + require_NoError(t, err) + + for { + msgs, err := sub.Fetch(25, nats.MaxWait(2*time.Second)) + if err != nil && err != nats.ErrTimeout { + t.Logf("Exiting pull subscriber %q: %v", subName, err) + return + } + // Shuffle + rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) + // Wait for a random interval up to 100ms. + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + for _, m := range msgs { + // If we want to simulate max redeliveries being hit, since not acking + // once will cause it due to subscriber setup. + // 100_000 == 0.01% + if simulateMaxRedeliveries && rand.Intn(100_000) == 0 { + md, err := m.Metadata() + require_NoError(t, err) + t.Logf("** Skipping Ack: %d **", md.Sequence.Stream) + } else { + m.Ack() + } + } + } + }(subName) + } + } + + // Now create feeder publishers. + eventTypes := []string{"PAYMENT", "SUBMISSION", "CANCEL"} + + msg := make([]byte, 2*1024) // 2k payload + rand.Read(msg) + + // For tracking pub times. + var pubs int + var totalPubTime time.Duration + var pmu sync.Mutex + last := time.Now() + + updatePubStats := func(elapsed time.Duration) { + pmu.Lock() + defer pmu.Unlock() + // Reset every 5s + if time.Since(last) > 5*time.Second { + pubs = 0 + totalPubTime = 0 + last = time.Now() + } + pubs++ + totalPubTime += elapsed + } + avgPubTime := func() time.Duration { + pmu.Lock() + np := pubs + tpt := totalPubTime + pmu.Unlock() + return tpt / time.Duration(np) + } + + t.Logf("Creating %d Publishers", numPublishers) + + for i := 0; i < numPublishers; i++ { + go func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for { + // Grab a random source stream + stream := sources[rand.Intn(len(sources))] + // Grab random event type. + evt := eventTypes[rand.Intn(len(eventTypes))] + subj := fmt.Sprintf("%s.%s", stream, evt) + start := time.Now() + _, err := js.Publish(subj, msg) + if err != nil { + t.Logf("Exiting publisher: %v", err) + return + } + elapsed := time.Since(start) + if elapsed > 5*time.Second { + t.Logf("Publish time took more than expected: %v", elapsed) + } + updatePubStats(elapsed) + } + }() + } + + t.Log("Creating Monitoring Routine - Data in ~10s") + + // Create monitoring routine. + go func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + fseq, lseq := uint64(0), uint64(0) + for { + // Grab consumers + var minAckFloor uint64 = math.MaxUint64 + for _, consumer := range []string{"C1", "C2"} { + ci, err := js.ConsumerInfo("EVENTS", consumer) + if err != nil { + t.Logf("Exiting Monitor: %v", err) + return + } + if lseq > 0 { + t.Logf("%s:\n Delivered:\t%d\n AckFloor:\t%d\n AckPending:\t%d\n NumPending:\t%d", + consumer, ci.Delivered.Stream, ci.AckFloor.Stream, ci.NumAckPending, ci.NumPending) + } + if ci.AckFloor.Stream < minAckFloor { + minAckFloor = ci.AckFloor.Stream + } + } + // Now grab aggregate stream state. + si, err := js.StreamInfo("EVENTS") + if err != nil { + t.Logf("Exiting Monitor: %v", err) + return + } + state := si.State + if lseq != 0 { + t.Logf("Stream:\n Msgs: \t%d\n First:\t%d\n Last: \t%d\n Deletes:\t%d\n", + state.Msgs, state.FirstSeq, state.LastSeq, state.NumDeleted) + t.Logf("Publish Stats:\n Msgs/s:\t%0.2f\n Avg Pub:\t%v\n\n", float64(si.State.LastSeq-lseq)/5.0, avgPubTime()) + if si.State.FirstSeq < minAckFloor && si.State.FirstSeq == fseq { + t.Log("Stream first seq < minimum ack floor") + } + } + fseq, lseq = si.State.FirstSeq, si.State.LastSeq + time.Sleep(5 * time.Second) + } + + }() + + time.Sleep(testTime) +} diff --git a/server/raft.go b/server/raft.go index a38efe9e..6afea35e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3108,12 +3108,14 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { n.trackResponse(ar) } else if ar.term > n.term { // False here and they have a higher term. + n.Lock() n.term = ar.term n.vote = noVote n.writeTermVote() n.warn("Detected another leader with higher term, will stepdown and reset") n.stepdown.push(noLeader) n.resetWAL() + n.Unlock() } else if ar.reply != _EMPTY_ { n.catchupFollower(ar) } diff --git a/server/test_test.go b/server/test_test.go index a853310f..64717f90 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -52,14 +52,6 @@ func RunRandClientPortServer() *Server { return RunServer(&opts) } -// Used to setup clusters of clusters for tests. -type cluster struct { - servers []*Server - opts []*Options - name string - t testing.TB -} - func require_True(t *testing.T, b bool) { t.Helper() if !b { @@ -276,6 +268,11 @@ func (c *cluster) shutdown() { if c == nil { return } + // Stop any proxies. + for _, np := range c.nproxies { + np.stop() + } + // Shutdown and cleanup servers. for i, s := range c.servers { sd := s.StoreDir() s.Shutdown()