Reworked sources and mirrors on missed data.

Add last delivered sequence to consumer idle heartbeats.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-20 14:03:46 -07:00
parent ad55fe6b4e
commit ced35e5b8c
4 changed files with 141 additions and 87 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.1.RC2"
VERSION = "2.2.1.RC3"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -732,10 +732,8 @@ func (o *consumer) setLeader(isLeader bool) {
o.acc.sl.RegisterNotification(o.cfg.DeliverSubject, o.inch)
if o.active = <-o.inch; !o.active {
// Check gateways in case they are enabled.
if o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject); o.active {
// There is no local interest but there is GW interest, we
// will watch for interest disappearing.
// TODO: may need to revisit...
if s.gateway.enabled {
o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject)
stopAndClearTimer(&o.gwdtmr)
o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() })
}
@@ -2064,7 +2062,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
// We will wait here for new messages to arrive.
mch, outq, odsubj := o.mch, o.outq, o.cfg.DeliverSubject
mch, outq, odsubj, dseq := o.mch, o.outq, o.cfg.DeliverSubject, o.dseq-1
o.mu.Unlock()
select {
@@ -2078,7 +2076,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Messages are waiting.
case <-hbc:
if o.isActive() {
hdr := []byte("NATS/1.0 100 Idle Heartbeat\r\n\r\n")
hdr := []byte(fmt.Sprintf("NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n", JSLastDeliveredSeq, dseq))
outq.send(&jsPubMsg{odsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0, nil})
}
// Reset our idle heartbeat timer.

View File

@@ -4922,6 +4922,9 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
// Wait for direct consumer to get registered and detect interest across GW.
time.Sleep(time.Second)
// Send 100 msgs over 100ms in separate Go routine.
msg, toSend, done := []byte("Hello"), 100, make(chan bool)
go func() {
@@ -4935,15 +4938,19 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) {
done <- true
}()
breakGW := func() {
s.gateway.Lock()
gw := s.gateway.out["C2"]
s.gateway.Unlock()
if gw != nil {
gw.closeConnection(ClientClosed)
}
}
// Wait til about half way through.
time.Sleep(20 * time.Millisecond)
// Now break GW connection.
s.gateway.Lock()
gw := s.gateway.out["C2"]
s.gateway.Unlock()
if gw != nil {
gw.closeConnection(ClientClosed)
}
breakGW()
// Wait for GW to reform.
for _, c := range sc.clusters {
@@ -4958,11 +4965,33 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) {
t.Fatalf("Did not complete sending first batch of messages")
}
// Now send 100 more.
// Make sure we can deal with data loss at the end.
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 100 {
return fmt.Errorf("Expected to have %d messages, got %d", 100, si.State.Msgs)
}
return nil
})
// Now send 100 more. Will aos break here in the middle.
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
if i == 50 {
breakGW()
}
}
// Wait for GW to reform.
for _, c := range sc.clusters {
for _, s := range c.servers {
waitForOutboundGateways(t, s, 1, 2*time.Second)
}
}
si, err := js.StreamInfo("TEST")
@@ -4973,10 +5002,10 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) {
t.Fatalf("Expected to have %d messages, got %d", 200, si.State.Msgs)
}
checkFor(t, 20*time.Second, 250*time.Millisecond, func() error {
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
return fmt.Errorf("Unexpected error: %v", err)
}
if si.State.Msgs != 200 {
return fmt.Errorf("Expected to have %d messages, got %d", 200, si.State.Msgs)

View File

@@ -196,6 +196,7 @@ const (
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
JSStreamSource = "Nats-Stream-Source"
JSLastDeliveredSeq = "Nats-Last-Delivered"
)
// Dedupe entry
@@ -1045,7 +1046,9 @@ func (mset *stream) processMirrorMsgs() {
return
case <-mch:
for im := mset.pending(msgs); im != nil; im = im.next {
mset.processInboundMirrorMsg(im)
if !mset.processInboundMirrorMsg(im) {
break
}
}
case <-t.C:
mset.mu.RLock()
@@ -1061,56 +1064,63 @@ func (mset *stream) processMirrorMsgs() {
// Checks that the message is from our current direct consumer. We can not depend on sub comparison
// since cross account imports break.
func (si *sourceInfo) isCurrentSub(reply string) bool {
return !(si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname != tokenAt(reply, 4))
return si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname == tokenAt(reply, 4)
}
// processInboundMirrorMsg handles processing messages bound for a stream.
func (mset *stream) processInboundMirrorMsg(m *inMsg) {
func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
mset.mu.Lock()
if mset.mirror == nil {
mset.mu.Unlock()
return
return false
}
if !mset.isLeader() {
mset.mu.Unlock()
mset.cancelMirrorConsumer()
return
return false
}
mset.mirror.last = time.Now()
node := mset.node
// Check for heartbeats and flow control messages.
if m.isControlMsg() && mset.mirror.cname != _EMPTY_ {
mset.mirror.last = time.Now()
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(m)
} else {
// For idle heartbeats make sure we did not miss anything.
if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq {
mset.retryMirrorConsumer()
}
}
mset.mu.Unlock()
return false
}
// Ignore from old subscriptions.
// The reason we can not just compare subs is that on cross account imports they will not match.
if !mset.mirror.isCurrentSub(m.rply) {
mset.mu.Unlock()
return
return false
}
// Check for heartbeats and flow control messages.
if m.isControlMsg() {
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(m)
}
mset.mu.Unlock()
return
}
sseq, _, dc, ts, pending := replyInfo(m.rply)
mset.mirror.last = time.Now()
sseq, dseq, dc, ts, pending := replyInfo(m.rply)
if dc > 1 {
mset.mu.Unlock()
return
return false
}
// Mirror info tracking.
olag := mset.mirror.lag
olag, odseq := mset.mirror.lag, mset.mirror.dseq
if pending == 0 {
mset.mirror.lag = 0
} else {
mset.mirror.lag = pending - 1
}
mset.mirror.dseq = dseq
mset.mu.Unlock()
s := mset.srv
@@ -1126,9 +1136,13 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) {
if sseq <= mset.lastSeq() {
mset.mu.Lock()
mset.mirror.lag = olag
mset.mirror.dseq = odseq
mset.mu.Unlock()
return
return false
} else {
mset.mu.Lock()
mset.mirror.dseq = odseq
mset.mu.Unlock()
mset.retryMirrorConsumer()
}
} else {
@@ -1139,6 +1153,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) {
s.DisableJetStream()
}
}
return err == nil
}
func (mset *stream) setMirrorErr(err *ApiError) {
@@ -1200,19 +1215,6 @@ func (mset *stream) setupMirrorConsumer() error {
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}}
}
// Process inbound mirror messages from the wire.
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg)
})
if err != nil {
mset.mirror = nil
return err
}
mset.mirror.sub = sub
mset.mirror.last = time.Now()
if !mset.mirror.grr {
mset.mirror.grr = true
mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() })
@@ -1226,7 +1228,7 @@ func (mset *stream) setupMirrorConsumer() error {
req := &CreateConsumerRequest{
Stream: mset.cfg.Mirror.Name,
Config: ConsumerConfig{
DeliverSubject: string(sub.subject),
DeliverSubject: deliverSubject,
DeliverPolicy: DeliverByStartSequence,
OptStartSeq: state.LastSeq,
AckPolicy: AckNone,
@@ -1288,6 +1290,20 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.mirror != nil {
mset.mirror.cname = ccr.ConsumerInfo.Name
}
// Process inbound mirror messages from the wire.
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg)
})
if err != nil {
mset.mirror.err = jsError(err)
mset.mirror.sub = nil
} else {
mset.mirror.err = nil
mset.mirror.sub = sub
mset.mirror.last = time.Now()
}
mset.mu.Unlock()
}
mset.setMirrorErr(ccr.Error)
@@ -1333,7 +1349,6 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
if si == nil {
return
}
mset.unsubscribe(si.sub)
mset.setSourceConsumer(sname, seq)
}
@@ -1373,17 +1388,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
deliverSubject = syncSubject("$JS.S")
}
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(si.msgs, subject, reply, hdr, msg)
})
if err != nil {
si.err = jsError(err)
si.sub = nil
return
}
si.sub = sub
if !si.grr {
si.grr = true
mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si) })
@@ -1455,6 +1459,19 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
} else {
// Capture consumer name.
si.cname = ccr.ConsumerInfo.Name
// Now create sub to receive messages.
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(si.msgs, subject, reply, hdr, msg)
})
if err != nil {
si.err = jsError(err)
si.sub = nil
} else {
si.err = nil
si.sub = sub
si.last = time.Now()
}
}
}
mset.mu.Unlock()
@@ -1495,7 +1512,9 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
return
case <-mch:
for im := mset.pending(msgs); im != nil; im = im.next {
mset.processInboundSourceMsg(si, im)
if !mset.processInboundSourceMsg(si, im) {
break
}
}
case <-t.C:
mset.mu.RLock()
@@ -1530,39 +1549,45 @@ func (mset *stream) handleFlowControl(m *inMsg) {
}
// processInboundSourceMsg handles processing other stream messages bound for this stream.
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
mset.mu.Lock()
if !mset.isLeader() {
mset.mu.Unlock()
mset.cancelSourceConsumer(si.name)
return
return false
}
si.last = time.Now()
node := mset.node
// Check for heartbeats and flow control messages.
if m.isControlMsg() && si.cname != _EMPTY_ {
si.last = time.Now()
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(m)
} else {
// For idle heartbeats make sure we did not miss anything.
if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq {
mset.retrySourceConsumerAtSeq(si.name, si.sseq+1)
}
}
mset.mu.Unlock()
return false
}
// Ignore from old subscriptions.
if !si.isCurrentSub(m.rply) {
mset.mu.Unlock()
return
}
// Check for heartbeats and flow control messages.
if m.isControlMsg() {
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(m)
}
mset.mu.Unlock()
return
return false
}
si.last = time.Now()
sseq, dseq, dc, _, pending := replyInfo(m.rply)
if dc > 1 {
mset.mu.Unlock()
return
return false
}
// Tracking is done here.
@@ -1570,16 +1595,16 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
si.dseq++
si.sseq = sseq
} else {
cname := tokenAt(m.rply, 4)
// Check to see if we know this is from an old consumer.
if dseq > si.dseq && si.cname == cname {
mset.retrySourceConsumerAtSeq(si.name, si.sseq+1)
} else if dseq > si.dseq {
si.cname = cname
si.dseq, si.sseq = dseq, sseq
if dseq > si.dseq {
if si.cname == _EMPTY_ {
si.cname = tokenAt(m.rply, 4)
si.dseq, si.sseq = dseq, sseq
} else {
mset.retrySourceConsumerAtSeq(si.name, si.sseq+1)
}
}
mset.mu.Unlock()
return
return false
}
if pending == 0 {
@@ -1619,6 +1644,8 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
s.DisableJetStream()
}
}
return true
}
func streamAndSeq(subject string) (string, uint64) {