NAK for workers

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-11 18:04:01 -07:00
parent 0a92d8e87d
commit 4eee93dea0
2 changed files with 85 additions and 8 deletions

View File

@@ -52,9 +52,9 @@ type Observable struct {
mu sync.Mutex
name string
mset *MsgSet
pseq uint64
sseq uint64
dseq uint64
aseq uint64
dsubj string
reqSub *subscription
ackSub *subscription
@@ -214,20 +214,34 @@ func (o *Observable) processAck(_ *subscription, _ *client, subject, reply strin
if o.isPushMode() {
// Reset our observable to this sequence number.
o.resetToSeq(seq)
} else {
// Queue up this message for redelivery
o.queueForRedelivery(seq)
}
}
}
// queueForRedelivery will queue up a message for redelivery.
func (o *Observable) queueForRedelivery(seq uint64) {
o.mu.Lock()
o.redeliver = append(o.redeliver, seq)
mset := o.mset
o.mu.Unlock()
if mset != nil {
mset.signalObservers()
}
}
// Process an ack for a message.
func (o *Observable) ackMsg(seq uint64) {
o.mu.Lock()
switch o.config.AckPolicy {
case AckNone, AckAll:
o.pseq = seq
o.aseq = seq
case AckExplicit:
delete(o.pending, seq)
if seq == o.pseq+1 {
o.pseq++
if seq == o.aseq+1 {
o.aseq++
}
}
mset := o.mset
@@ -438,15 +452,14 @@ func (o *Observable) checkPending() {
return
}
aw := int64(o.config.AckWait)
for seq := o.pseq; seq < o.dseq; seq++ {
for seq := o.aseq; seq < o.dseq; seq++ {
if ts, ok := o.pending[seq]; ok {
if now-ts > aw {
// If we have waiting, go ahead and deliver here.
// FIXME(dlc) - Not sure this is correct.
o.redeliver = append(o.redeliver, seq)
shouldSignal = true
} else {
break
continue
}
}
}
@@ -511,7 +524,7 @@ func (o *Observable) selectStartingSeqNo() {
// Set delivery sequence to be the same to start.
o.dseq = o.sseq
// Set pending sequence to delivery - 1
o.pseq = o.dseq - 1
o.aseq = o.dseq - 1
}
// Test whether a config represents a durable subscriber.
@@ -532,6 +545,7 @@ func (o *Observable) Name() string {
return n
}
// For now size of 6 for randomly created names.
const randObservableNameLen = 6
func createObservableName() string {

View File

@@ -1018,3 +1018,66 @@ func TestJetStreamWorkQueueAckWaitRedelivery(t *testing.T) {
t.Fatalf("Expected no messages, got %d", stats.Msgs)
}
}
func TestJetStreamWorkQueueNakRedelivery(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mname := "MY_WQ"
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Retention: server.WorkQueuePolicy})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc := clientConnectToServer(t, s)
defer nc.Close()
// Now load up some messages.
toSend := 10
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(mname, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
stats := mset.Stats()
if stats.Msgs != uint64(toSend) {
t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs)
}
o, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true, AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer o.Delete()
reqNextMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, o.Name())
getMsg := func(seqno int) *nats.Msg {
t.Helper()
m, err := nc.Request(reqNextMsgSubj, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if seq := o.SeqFromReply(m.Reply); seq != uint64(seqno) {
t.Fatalf("Expected sequence of %d , got %d", seqno, seq)
}
return m
}
for i := 1; i <= 5; i++ {
m := getMsg(i)
// Ack the message here.
m.Respond(nil)
}
// Grab #6
m := getMsg(6)
// NAK this one.
m.Respond(server.AckNak)
// When we request again should be 6 again.
getMsg(6)
// Then we should get 7, 8, etc.
getMsg(7)
getMsg(8)
}