Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-03-20 21:35:15 -07:00
4 changed files with 215 additions and 12 deletions

View File

@@ -2185,6 +2185,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case <-t.C:
doSnapshot()
// Check is we have preAcks left over if we have become the leader.
if isLeader {
mset.mu.Lock()
if mset.preAcks != nil {
mset.preAcks = nil
}
mset.mu.Unlock()
}
case <-uch:
// keep stream assignment current
sa = mset.streamAssignment()

View File

@@ -6551,11 +6551,13 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
// Test params.
numSourceStreams := 20
numConsumersPerSource := 1
numPullersForAggregate := 50
numPullersPerConsumer := 50
numPublishers := 100
setHighStartSequence := false
simulateMaxRedeliveries := false
testTime := 60 * time.Minute // make sure to do --timeout=65m
maxBadPubTimes := uint32(20)
badPubThresh := 5 * time.Second
testTime := 5 * time.Minute // make sure to do --timeout=65m
t.Logf("Starting Test: Total Test Time %v", testTime)
@@ -6673,11 +6675,11 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
)
require_NoError(t, err)
t.Logf("Creating %d Pull Subscribers", numPullersForAggregate)
t.Logf("Creating %d x 2 Pull Subscribers", numPullersPerConsumer)
// Now create the pullers.
for _, subName := range []string{"C1", "C2"} {
for i := 0; i < numPullersForAggregate; i++ {
for i := 0; i < numPullersPerConsumer; i++ {
go func(subName string) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
@@ -6698,8 +6700,10 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
}
// Shuffle
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
// Wait for a random interval up to 100ms.
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
for _, m := range msgs {
// If we want to simulate max redeliveries being hit, since not acking
// once will cause it due to subscriber setup.
@@ -6751,6 +6755,9 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
t.Logf("Creating %d Publishers", numPublishers)
var numLimitsExceeded atomic.Uint32
errCh := make(chan error, 100)
for i := 0; i < numPublishers; i++ {
go func() {
nc, js := jsClientConnect(t, c.randomServer())
@@ -6769,8 +6776,13 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
return
}
elapsed := time.Since(start)
if elapsed > 5*time.Second {
if elapsed > badPubThresh {
t.Logf("Publish time took more than expected: %v", elapsed)
numLimitsExceeded.Add(1)
if ne := numLimitsExceeded.Load(); ne > maxBadPubTimes {
errCh <- fmt.Errorf("Too many exceeded times on publish: %d", ne)
return
}
}
updatePubStats(elapsed)
}
@@ -6823,5 +6835,169 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
}()
time.Sleep(testTime)
select {
case e := <-errCh:
t.Fatal(e)
case <-time.After(testTime):
t.Fatalf("Did not receive completion signal")
}
}
// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: "F3"
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
// Route Ports
// "S1": 14622,
// "S2": 15622,
// "S3": 16622,
// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.
// Do these in order, S1, S2 (proxy) then S3.
c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}
// S1
conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622")
c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf)))
// S2
// Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT.
np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL())
conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes)
c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf)))
// S3
conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622")
c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf)))
c.checkClusterFormed()
c.waitOnClusterReady()
defer c.shutdown()
defer np.stop()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)
// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})
// Now create the consumer.
_, err = js.PullSubscribe(_EMPTY_, "C", nats.BindStream("EVENTS"), nats.ManualAck())
require_NoError(t, err)
// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", sl)
}
return nil
})
go func(js nats.JetStream) {
sub, err := js.PullSubscribe(_EMPTY_, "C", nats.BindStream("EVENTS"), nats.ManualAck())
require_NoError(t, err)
for {
msgs, err := sub.Fetch(100, nats.MaxWait(2*time.Second))
if err != nil && err != nats.ErrTimeout {
return
}
// Shuffle
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
for _, m := range msgs {
m.Ack()
}
}
}(js)
numPublishers := 25
pubThresh := 2 * time.Second
var maxExceeded atomic.Int64
errCh := make(chan error, numPublishers)
wg := sync.WaitGroup{}
msg := make([]byte, 2*1024) // 2k payload
rand.Read(msg)
// 25 publishers.
for i := 0; i < numPublishers; i++ {
wg.Add(1)
go func(iter int) {
defer wg.Done()
// Connect to random, the slow ones will be connected to the slow node.
// But if you connect them all there it will pass.
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for i := 0; i < 1_000; i++ {
start := time.Now()
_, err := js.Publish("EV.PAID", msg)
if err != nil {
errCh <- fmt.Errorf("Publish error: %v", err)
return
}
if elapsed := time.Since(start); elapsed > pubThresh {
errCh <- fmt.Errorf("Publish time exceeded")
if int64(elapsed) > maxExceeded.Load() {
maxExceeded.Store(int64(elapsed))
}
return
}
}
}(i)
}
wg.Wait()
select {
case e := <-errCh:
t.Fatalf("%v: threshold is %v, maximum seen: %v", e, pubThresh, time.Duration(maxExceeded.Load()))
default:
}
}

View File

@@ -2092,7 +2092,7 @@ func (n *raft) runAsLeader() {
continue
}
n.sendAppendEntry(entries)
// We need to re-craete `entries` because there is a reference
// We need to re-create `entries` because there is a reference
// to it in the node's pae map.
entries = nil
}

View File

@@ -248,6 +248,9 @@ type stream struct {
sigq *ipQueue[*cMsg]
csl *Sublist // Consumer Sublist
// For non limits policy streams when they process an ack before the actual msg.
preAcks map[uint64]struct{}
// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment
@@ -3988,7 +3991,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
} else {
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
// Check for preAcks and the need to skip vs store.
var shouldSkip bool
if _, shouldSkip = mset.preAcks[seq]; shouldSkip {
delete(mset.preAcks, seq)
}
if shouldSkip {
store.SkipMsg()
} else {
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
}
}
if err != nil {
@@ -4861,10 +4873,16 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
if shouldRemove {
if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF {
// This should be rare but I have seen it.
// The ack reached us before the actual msg with AckNone and InterestPolicy.
if n := mset.raftNode(); n != nil {
md := streamMsgDelete{Seq: seq, NoErase: true, Stream: mset.cfg.Name}
n.ForwardProposal(encodeMsgDelete(&md))
// The ack reached us before the actual msg.
var state StreamState
mset.store.FastState(&state)
if seq >= state.LastSeq {
mset.mu.Lock()
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]struct{})
}
mset.preAcks[seq] = struct{}{}
mset.mu.Unlock()
}
}
}