Nak as restart semantics on push based observable

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-14 19:38:21 -07:00
parent 98bd8234b6
commit b8c958ed07
2 changed files with 91 additions and 4 deletions

View File

@@ -280,7 +280,7 @@ func (o *Observable) processAck(_ *subscription, _ *client, subject, reply strin
case bytes.Equal(msg, AckNak):
if o.isPushMode() && o.config.AckPolicy != AckExplicit {
// Reset our observable to this sequence number.
o.resetToSeq(sseq, dseq)
o.resetToSeq(sseq+1, dseq+1)
} else {
// Queue up this message for redelivery
o.queueForRedelivery(sseq)

View File

@@ -865,7 +865,7 @@ func TestJetStreamBasicPushNak(t *testing.T) {
}
// What we should expect since we drained the sub above to have the replayed messages again from the nak sequence.
expected = expected - nakSeq + 1
expected = expected - nakSeq
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != expected {
@@ -880,7 +880,7 @@ func TestJetStreamBasicPushNak(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
seq := o.SeqFromReply(m.Reply)
if seq != uint64(i+nakSeq) {
if seq != uint64(i+nakSeq+1) {
t.Fatalf("Expected sequence of %d , got %d", (i + nakSeq), seq)
}
}
@@ -1559,7 +1559,6 @@ func TestJetStreamDurablePartitionedObservableReconnect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fmt.Printf("m.Reply is %q\n", m.Reply)
rsseq, roseq, redelivered := o.ReplyInfo(m.Reply)
if roseq != uint64(seq) {
t.Fatalf("Expected observable sequence of %d , got %d", seq, roseq)
@@ -1633,3 +1632,91 @@ func TestJetStreamDurablePartitionedObservableReconnect(t *testing.T) {
nextSeq++
}
}
func TestJetStreamObservableSeqGaps(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "FOO"})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc := clientConnectToServer(t, s)
defer nc.Close()
// Now load up some messages.
toSend := 1000
for i := 0; i < toSend; i++ {
nc.Publish("FOO", []byte("Hello World!"))
}
nc.Flush()
stats := mset.Stats()
if stats.Msgs != uint64(toSend) {
t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs)
}
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
maxMsgs := 100
sub.SetPendingLimits(maxMsgs, -1)
nc.Flush()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: sub.Subject, DeliverAll: true, AckPolicy: server.AckAll})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer o.Delete()
// This should quickly overrun us, but wait until we have 100.
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxMsgs {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, maxMsgs)
}
return nil
})
// First should give us sc
if _, err := sub.NextMsg(time.Second); err != nats.ErrSlowConsumer {
t.Fatalf("Expected slow consumer error")
}
// Now drain until we find a gap.
nextStart := 0
var lastMsg *nats.Msg
for i := 1; i <= toSend; i++ {
nextStart = i
m, err := sub.NextMsg(100 * time.Millisecond)
if err != nil {
break
}
if seq := o.SeqFromReply(m.Reply); seq != uint64(i) {
break
}
lastMsg = m
}
if nextStart != maxMsgs+1 || lastMsg == nil {
t.Fatalf("Expected lastSeen to be %d, got %d", maxMsgs+1, nextStart)
}
// This is what we would expect users to do. Kill the subscription and re-establish it, send a NAK for last reply.
subj := sub.Subject
sub.Unsubscribe()
sub, _ = nc.SubscribeSync(subj)
defer sub.Unsubscribe()
lastMsg.Respond(server.AckNak)
for i := nextStart; i <= toSend; i++ {
m, err := sub.NextMsg(100 * time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if seq := o.SeqFromReply(m.Reply); seq != uint64(i) {
t.Fatalf("Expected sequence %d, got %d", i, seq)
}
}
}