diff --git a/server/const.go b/server/const.go index 3eec8e59..afbd00da 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.1.RC2" + VERSION = "2.2.1.RC3" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/consumer.go b/server/consumer.go index 9b661802..7103c455 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -732,10 +732,8 @@ func (o *consumer) setLeader(isLeader bool) { o.acc.sl.RegisterNotification(o.cfg.DeliverSubject, o.inch) if o.active = <-o.inch; !o.active { // Check gateways in case they are enabled. - if o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject); o.active { - // There is no local interest but there is GW interest, we - // will watch for interest disappearing. - // TODO: may need to revisit... + if s.gateway.enabled { + o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject) stopAndClearTimer(&o.gwdtmr) o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() }) } @@ -2064,7 +2062,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { } // We will wait here for new messages to arrive. - mch, outq, odsubj := o.mch, o.outq, o.cfg.DeliverSubject + mch, outq, odsubj, dseq := o.mch, o.outq, o.cfg.DeliverSubject, o.dseq-1 o.mu.Unlock() select { @@ -2078,7 +2076,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { // Messages are waiting. case <-hbc: if o.isActive() { - hdr := []byte("NATS/1.0 100 Idle Heartbeat\r\n\r\n") + hdr := []byte(fmt.Sprintf("NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n", JSLastDeliveredSeq, dseq)) outq.send(&jsPubMsg{odsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0, nil}) } // Reset our idle heartbeat timer. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 13c435a7..40ac0c59 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4922,6 +4922,9 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + // Wait for direct consumer to get registered and detect interest across GW. + time.Sleep(time.Second) + // Send 100 msgs over 100ms in separate Go routine. msg, toSend, done := []byte("Hello"), 100, make(chan bool) go func() { @@ -4935,15 +4938,19 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) { done <- true }() + breakGW := func() { + s.gateway.Lock() + gw := s.gateway.out["C2"] + s.gateway.Unlock() + if gw != nil { + gw.closeConnection(ClientClosed) + } + } + // Wait til about half way through. time.Sleep(20 * time.Millisecond) // Now break GW connection. - s.gateway.Lock() - gw := s.gateway.out["C2"] - s.gateway.Unlock() - if gw != nil { - gw.closeConnection(ClientClosed) - } + breakGW() // Wait for GW to reform. for _, c := range sc.clusters { @@ -4958,11 +4965,33 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) { t.Fatalf("Did not complete sending first batch of messages") } - // Now send 100 more. + // Make sure we can deal with data loss at the end. + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("S") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 100 { + return fmt.Errorf("Expected to have %d messages, got %d", 100, si.State.Msgs) + } + return nil + }) + + // Now send 100 more. Will aos break here in the middle. for i := 0; i < toSend; i++ { if _, err = js.Publish("TEST", msg); err != nil { t.Fatalf("Unexpected publish error: %v", err) } + if i == 50 { + breakGW() + } + } + + // Wait for GW to reform. + for _, c := range sc.clusters { + for _, s := range c.servers { + waitForOutboundGateways(t, s, 1, 2*time.Second) + } } si, err := js.StreamInfo("TEST") @@ -4973,10 +5002,10 @@ func TestJetStreamSuperClusterDirectConsumersBrokenGateways(t *testing.T) { t.Fatalf("Expected to have %d messages, got %d", 200, si.State.Msgs) } - checkFor(t, 20*time.Second, 250*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { si, err := js.StreamInfo("S") if err != nil { - t.Fatalf("Unexpected error: %v", err) + return fmt.Errorf("Unexpected error: %v", err) } if si.State.Msgs != 200 { return fmt.Errorf("Expected to have %d messages, got %d", 200, si.State.Msgs) diff --git a/server/stream.go b/server/stream.go index e1cb2eb6..82835d15 100644 --- a/server/stream.go +++ b/server/stream.go @@ -196,6 +196,7 @@ const ( JSExpectedLastSeq = "Nats-Expected-Last-Sequence" JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id" JSStreamSource = "Nats-Stream-Source" + JSLastDeliveredSeq = "Nats-Last-Delivered" ) // Dedupe entry @@ -1045,7 +1046,9 @@ func (mset *stream) processMirrorMsgs() { return case <-mch: for im := mset.pending(msgs); im != nil; im = im.next { - mset.processInboundMirrorMsg(im) + if !mset.processInboundMirrorMsg(im) { + break + } } case <-t.C: mset.mu.RLock() @@ -1061,56 +1064,63 @@ func (mset *stream) processMirrorMsgs() { // Checks that the message is from our current direct consumer. We can not depend on sub comparison // since cross account imports break. func (si *sourceInfo) isCurrentSub(reply string) bool { - return !(si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname != tokenAt(reply, 4)) + return si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname == tokenAt(reply, 4) } // processInboundMirrorMsg handles processing messages bound for a stream. -func (mset *stream) processInboundMirrorMsg(m *inMsg) { +func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { mset.mu.Lock() if mset.mirror == nil { mset.mu.Unlock() - return + return false } if !mset.isLeader() { mset.mu.Unlock() mset.cancelMirrorConsumer() - return + return false } - mset.mirror.last = time.Now() node := mset.node + // Check for heartbeats and flow control messages. + if m.isControlMsg() && mset.mirror.cname != _EMPTY_ { + mset.mirror.last = time.Now() + // Flow controls have reply subjects. + if m.rply != _EMPTY_ { + mset.handleFlowControl(m) + } else { + // For idle heartbeats make sure we did not miss anything. + if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq { + mset.retryMirrorConsumer() + } + } + mset.mu.Unlock() + return false + } + // Ignore from old subscriptions. // The reason we can not just compare subs is that on cross account imports they will not match. if !mset.mirror.isCurrentSub(m.rply) { mset.mu.Unlock() - return + return false } - // Check for heartbeats and flow control messages. - if m.isControlMsg() { - // Flow controls have reply subjects. - if m.rply != _EMPTY_ { - mset.handleFlowControl(m) - } - mset.mu.Unlock() - return - } - - sseq, _, dc, ts, pending := replyInfo(m.rply) + mset.mirror.last = time.Now() + sseq, dseq, dc, ts, pending := replyInfo(m.rply) if dc > 1 { mset.mu.Unlock() - return + return false } // Mirror info tracking. - olag := mset.mirror.lag + olag, odseq := mset.mirror.lag, mset.mirror.dseq if pending == 0 { mset.mirror.lag = 0 } else { mset.mirror.lag = pending - 1 } + mset.mirror.dseq = dseq mset.mu.Unlock() s := mset.srv @@ -1126,9 +1136,13 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) { if sseq <= mset.lastSeq() { mset.mu.Lock() mset.mirror.lag = olag + mset.mirror.dseq = odseq mset.mu.Unlock() - return + return false } else { + mset.mu.Lock() + mset.mirror.dseq = odseq + mset.mu.Unlock() mset.retryMirrorConsumer() } } else { @@ -1139,6 +1153,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) { s.DisableJetStream() } } + return err == nil } func (mset *stream) setMirrorErr(err *ApiError) { @@ -1200,19 +1215,6 @@ func (mset *stream) setupMirrorConsumer() error { mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}} } - // Process inbound mirror messages from the wire. - sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { - hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. - mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg) - }) - if err != nil { - mset.mirror = nil - return err - } - - mset.mirror.sub = sub - mset.mirror.last = time.Now() - if !mset.mirror.grr { mset.mirror.grr = true mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() }) @@ -1226,7 +1228,7 @@ func (mset *stream) setupMirrorConsumer() error { req := &CreateConsumerRequest{ Stream: mset.cfg.Mirror.Name, Config: ConsumerConfig{ - DeliverSubject: string(sub.subject), + DeliverSubject: deliverSubject, DeliverPolicy: DeliverByStartSequence, OptStartSeq: state.LastSeq, AckPolicy: AckNone, @@ -1288,6 +1290,20 @@ func (mset *stream) setupMirrorConsumer() error { if mset.mirror != nil { mset.mirror.cname = ccr.ConsumerInfo.Name } + + // Process inbound mirror messages from the wire. + sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { + hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. + mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg) + }) + if err != nil { + mset.mirror.err = jsError(err) + mset.mirror.sub = nil + } else { + mset.mirror.err = nil + mset.mirror.sub = sub + mset.mirror.last = time.Now() + } mset.mu.Unlock() } mset.setMirrorErr(ccr.Error) @@ -1333,7 +1349,6 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) { if si == nil { return } - mset.unsubscribe(si.sub) mset.setSourceConsumer(sname, seq) } @@ -1373,17 +1388,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { deliverSubject = syncSubject("$JS.S") } - sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { - hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. - mset.queueInbound(si.msgs, subject, reply, hdr, msg) - }) - if err != nil { - si.err = jsError(err) - si.sub = nil - return - } - - si.sub = sub if !si.grr { si.grr = true mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si) }) @@ -1455,6 +1459,19 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { } else { // Capture consumer name. si.cname = ccr.ConsumerInfo.Name + // Now create sub to receive messages. + sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { + hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. + mset.queueInbound(si.msgs, subject, reply, hdr, msg) + }) + if err != nil { + si.err = jsError(err) + si.sub = nil + } else { + si.err = nil + si.sub = sub + si.last = time.Now() + } } } mset.mu.Unlock() @@ -1495,7 +1512,9 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { return case <-mch: for im := mset.pending(msgs); im != nil; im = im.next { - mset.processInboundSourceMsg(si, im) + if !mset.processInboundSourceMsg(si, im) { + break + } } case <-t.C: mset.mu.RLock() @@ -1530,39 +1549,45 @@ func (mset *stream) handleFlowControl(m *inMsg) { } // processInboundSourceMsg handles processing other stream messages bound for this stream. -func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { +func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { mset.mu.Lock() if !mset.isLeader() { mset.mu.Unlock() mset.cancelSourceConsumer(si.name) - return + return false } - si.last = time.Now() node := mset.node + // Check for heartbeats and flow control messages. + if m.isControlMsg() && si.cname != _EMPTY_ { + si.last = time.Now() + // Flow controls have reply subjects. + if m.rply != _EMPTY_ { + mset.handleFlowControl(m) + } else { + // For idle heartbeats make sure we did not miss anything. + if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq { + mset.retrySourceConsumerAtSeq(si.name, si.sseq+1) + } + } + mset.mu.Unlock() + return false + } + // Ignore from old subscriptions. if !si.isCurrentSub(m.rply) { mset.mu.Unlock() - return - } - - // Check for heartbeats and flow control messages. - if m.isControlMsg() { - // Flow controls have reply subjects. - if m.rply != _EMPTY_ { - mset.handleFlowControl(m) - } - mset.mu.Unlock() - return + return false } + si.last = time.Now() sseq, dseq, dc, _, pending := replyInfo(m.rply) if dc > 1 { mset.mu.Unlock() - return + return false } // Tracking is done here. @@ -1570,16 +1595,16 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { si.dseq++ si.sseq = sseq } else { - cname := tokenAt(m.rply, 4) - // Check to see if we know this is from an old consumer. - if dseq > si.dseq && si.cname == cname { - mset.retrySourceConsumerAtSeq(si.name, si.sseq+1) - } else if dseq > si.dseq { - si.cname = cname - si.dseq, si.sseq = dseq, sseq + if dseq > si.dseq { + if si.cname == _EMPTY_ { + si.cname = tokenAt(m.rply, 4) + si.dseq, si.sseq = dseq, sseq + } else { + mset.retrySourceConsumerAtSeq(si.name, si.sseq+1) + } } mset.mu.Unlock() - return + return false } if pending == 0 { @@ -1619,6 +1644,8 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { s.DisableJetStream() } } + + return true } func streamAndSeq(subject string) (string, uint64) {