Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-03-30 21:09:45 -07:00
3 changed files with 287 additions and 69 deletions

View File

@@ -2567,16 +2567,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Check for any preAcks in case we are interest based.
mset.mu.Lock()
seq := lseq + 1 - mset.clfs
var shouldAck bool
if len(mset.preAcks) > 0 {
if _, shouldAck = mset.preAcks[seq]; shouldAck {
delete(mset.preAcks, seq)
}
}
mset.clearAllPreAcks(seq)
mset.mu.Unlock()
if shouldAck {
mset.ackMsg(nil, seq)
}
continue
}
@@ -2590,7 +2582,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Messages to be skipped have no subject or timestamp or msg or hdr.
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
// Skip and update our lseq.
mset.setLastSeq(mset.store.SkipMsg())
last := mset.store.SkipMsg()
mset.setLastSeq(last)
mset.clearAllPreAcks(last)
continue
}
@@ -7200,6 +7194,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
mset.store.Compact(snap.FirstSeq)
mset.store.FastState(&state)
mset.setLastSeq(state.LastSeq)
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
}
// Range the deleted and delete if applicable.
for _, dseq := range snap.Deleted {
@@ -7614,12 +7609,10 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
ddloaded := mset.ddloaded
tierName := mset.tier
if len(mset.preAcks) > 0 {
if _, shouldSkip := mset.preAcks[seq]; shouldSkip {
delete(mset.preAcks, seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
}
if mset.hasAllPreAcks(seq) {
mset.clearAllPreAcks(seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
}
mset.mu.Unlock()

View File

@@ -6846,15 +6846,15 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
}
}
// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)
// Unbalanced stretch cluster.
// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.
//
// Route Ports
// "S1": 14622
// "S2": 15622
// "S3": 16622
func createStretchUnbalancedCluster(t testing.TB) (c *cluster, np *netProxy) {
t.Helper()
tmpl := `
listen: 127.0.0.1:-1
@@ -6871,16 +6871,8 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
// Route Ports
// "S1": 14622,
// "S2": 15622,
// "S3": 16622,
// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.
// Do these in order, S1, S2 (proxy) then S3.
c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}
c = &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}
// S1
conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622")
@@ -6888,7 +6880,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
// S2
// Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT.
np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
np = createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL())
conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes)
c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf)))
@@ -6899,6 +6891,21 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
c.checkClusterFormed()
c.waitOnClusterReady()
return c, np
}
// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)
c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()
@@ -6935,7 +6942,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", sl)
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})
@@ -7438,3 +7445,133 @@ func TestNoRaceFileStoreNumPending(t *testing.T) {
}
}
}
func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T) {
c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)
// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})
// Create a fast ack consumer.
_, err = js.Subscribe("EV.NEW", func(m *nats.Msg) {
m.Ack()
}, nats.Durable("C"), nats.ManualAck())
require_NoError(t, err)
// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})
// Connect a client directly to the stream leader.
nc, js = jsClientConnect(t, sl)
defer nc.Close()
// Now create a pull subscriber.
sub, err := js.PullSubscribe("EV.NEW", "D", nats.ManualAck())
require_NoError(t, err)
// Make sure this consumer leader is on S1.
cl = c.servers[0]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "D")
if s := c.consumerLeader(globalAccountName, "EVENTS", "D"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "D")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})
numToSend := 1000
for i := 0; i < numToSend; i++ {
_, err := js.PublishAsync("EV.NEW", nil)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(20 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// Now make sure we can pull messages since we have not acked.
// The bug is that the acks arrive on S1 faster then the messages but we want to
// make sure we do not remove prematurely.
msgs, err := sub.Fetch(100, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 100)
for _, m := range msgs {
m.AckSync()
}
ci, err := js.ConsumerInfo("EVENTS", "D")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(numToSend-100))
require_True(t, ci.NumAckPending == 0)
require_True(t, ci.Delivered.Stream == 100)
require_True(t, ci.AckFloor.Stream == 100)
// Check stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 900)
require_True(t, state.FirstSeq == 101)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
}
msgs, err = sub.Fetch(900, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 900)
for _, m := range msgs {
m.AckSync()
}
// Let acks propagate.
time.Sleep(250 * time.Millisecond)
// Check final stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 1001)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
// Now check preAcks
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
mset.mu.RUnlock()
require_True(t, numPreAcks == 0)
}
}

View File

@@ -250,7 +250,8 @@ type stream struct {
csl *Sublist // Consumer Sublist
// For non limits policy streams when they process an ack before the actual msg.
preAcks map[uint64]struct{}
// Can happen in stretch clusters, multi-cloud, or during catchup for a restarted server.
preAcks map[uint64]map[*consumer]struct{}
// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
@@ -1781,6 +1782,7 @@ func (mset *stream) deleteMsg(seq uint64) (bool, error) {
return false, fmt.Errorf("invalid stream")
}
mset.mu.RUnlock()
return mset.store.RemoveMsg(seq)
}
@@ -3995,11 +3997,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
// Check for preAcks and the need to skip vs store.
var shouldSkip bool
if _, shouldSkip = mset.preAcks[seq]; shouldSkip {
delete(mset.preAcks, seq)
}
if shouldSkip {
if mset.hasAllPreAcks(seq) {
mset.clearAllPreAcks(seq)
store.SkipMsg()
} else {
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
@@ -4842,18 +4841,118 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
defer pmsg.returnToPool()
sm, err := mset.store.LoadMsg(seq, &pmsg.StoreMsg)
if err != nil {
if err == ErrStoreEOF {
// Register this as a preAck.
mset.registerPreAck(obs, seq)
return true
}
return false
}
subj = sm.subj
}
for _, o := range mset.consumers {
if o != obs && o.needAck(seq, subj) {
// If this is us or we have a registered preAck for this consumer continue inspecting.
if o == obs || mset.hasPreAck(o, seq) {
continue
}
// Check if we need an ack.
if o.needAck(seq, subj) {
return true
}
}
return false
}
// Check if we have a pre-registered ack for this sequence.
// Write lock should be held.
func (mset *stream) hasPreAck(o *consumer, seq uint64) bool {
if o == nil || len(mset.preAcks) == 0 {
return false
}
consumers := mset.preAcks[seq]
if len(consumers) == 0 {
return false
}
_, found := consumers[o]
return found
}
// Check if we have all consumers pre-acked.
// Write lock should be held.
func (mset *stream) hasAllPreAcks(seq uint64) bool {
if len(mset.preAcks) == 0 {
return false
}
return len(mset.preAcks[seq]) >= len(mset.consumers)
}
// Check if we have all consumers pre-acked.
func (mset *stream) clearAllPreAcksLock(seq uint64) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.clearAllPreAcks(seq)
}
// Check if we have all consumers pre-acked.
// Write lock should be held.
func (mset *stream) clearAllPreAcks(seq uint64) {
delete(mset.preAcks, seq)
}
// Clear all preAcks below floor.
// Write lock should be held.
func (mset *stream) clearAllPreAcksBelowFloor(floor uint64) {
for seq := range mset.preAcks {
if seq < floor {
delete(mset.preAcks, seq)
}
}
}
// This will register an ack for a consumer if it arrives before the actual message.
func (mset *stream) registerPreAckLock(o *consumer, seq uint64) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.registerPreAck(o, seq)
}
// This will register an ack for a consumer if it arrives before
// the actual message.
// Write lock should be held.
func (mset *stream) registerPreAck(o *consumer, seq uint64) {
if o == nil {
return
}
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]map[*consumer]struct{})
}
if mset.preAcks[seq] == nil {
mset.preAcks[seq] = make(map[*consumer]struct{})
}
mset.preAcks[seq][o] = struct{}{}
}
// This will clear an ack for a consumer.
func (mset *stream) clearPreAckLock(o *consumer, seq uint64) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.clearPreAck(o, seq)
}
// This will clear an ack for a consumer.
// Write lock should be held.
func (mset *stream) clearPreAck(o *consumer, seq uint64) {
if o == nil || len(mset.preAcks) == 0 {
return
}
if consumers := mset.preAcks[seq]; len(consumers) > 0 {
delete(consumers, o)
if len(consumers) == 0 {
delete(mset.preAcks, seq)
}
}
}
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
func (mset *stream) ackMsg(o *consumer, seq uint64) {
if seq == 0 || mset.cfg.Retention == LimitsPolicy {
@@ -4872,6 +4971,14 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
// Make sure this sequence is not below our first sequence.
if seq < state.FirstSeq {
mset.clearPreAck(o, seq)
mset.mu.Unlock()
return
}
// If this has arrived before we have processed the message itself.
if seq > state.LastSeq {
mset.registerPreAck(o, seq)
mset.mu.Unlock()
return
}
@@ -4885,40 +4992,21 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
case InterestPolicy:
shouldRemove = !mset.checkInterest(seq, o)
}
// Check for existing preAcks. These will override the concept of shouldRemove.
if len(mset.preAcks) > 0 {
if _, hasAck := mset.preAcks[seq]; hasAck {
delete(mset.preAcks, seq)
shouldRemove = true
}
}
// If we should remove but we know this is beyond our last we can add to preAcks here.
// The ack reached us before the actual msg.
ackBeforeMsg := shouldRemove && seq > state.LastSeq
if ackBeforeMsg {
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]struct{})
}
mset.preAcks[seq] = struct{}{}
}
mset.mu.Unlock()
// If nothing else to do.
if !shouldRemove || ackBeforeMsg {
if !shouldRemove {
// Clear any pending preAcks for this consumer.
mset.clearPreAckLock(o, seq)
return
}
// If we are here we should attempt to remove.
if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF {
// The ack reached us before the actual msg.
mset.mu.Lock()
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]struct{})
}
mset.preAcks[seq] = struct{}{}
mset.mu.Unlock()
// This should not happen, but being pedantic.
mset.registerPreAckLock(o, seq)
} else {
mset.clearAllPreAcksLock(seq)
}
}