Fix case where request for next msg times out

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-05 12:34:18 -05:00
parent 05ca8f4346
commit 5efdf63625
2 changed files with 135 additions and 14 deletions

View File

@@ -300,7 +300,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
return nil, fmt.Errorf("observable replacement durable config not the same")
}
// Once we are here we have a replacement push-based durable.
eo.updateDelivery(o.config.Delivery)
eo.updateDeliverySubject(o.config.Delivery)
return eo, nil
}
// Set up the ack subscription for this observable. Will use wildcard for all acks.
@@ -401,6 +401,7 @@ func (o *Observable) hasDeliveryInterest(localInterest bool) bool {
return false
}
// This processes and update to the local interest for a delivery subject.
func (o *Observable) updateDeliveryInterest(localInterest bool) {
interest := o.hasDeliveryInterest(localInterest)
@@ -435,13 +436,15 @@ func (o *Observable) Config() ObservableConfig {
return o.config
}
func (o *Observable) updateDelivery(newDelivery string) {
// This is a config change for the delivery subject for a
// push based observable.
func (o *Observable) updateDeliverySubject(newDelivery string) {
// Update the config and the dsubj
o.mu.Lock()
defer o.mu.Unlock()
mset := o.mset
if mset == nil {
if mset == nil || o.isPullMode() {
return
}
@@ -457,6 +460,7 @@ func (o *Observable) updateDelivery(newDelivery string) {
o.acc.sl.RegisterNotification(newDelivery, o.inch)
}
// Check that configs are equal but allow delivery subjects to be different.
func configsEqualSansDelivery(a, b ObservableConfig) bool {
// These were copied in so can set Delivery here.
a.Delivery, b.Delivery = _EMPTY_, _EMPTY_
@@ -629,6 +633,7 @@ func (o *Observable) updateStore() {
}
}
// shouldSample lets us know if we are sampling metrics on acks.
func (o *Observable) shouldSample() bool {
if o.sfreq <= 0 {
return false
@@ -677,9 +682,12 @@ func (o *Observable) ackMsg(sseq, dseq, dcount uint64) {
case AckExplicit:
o.sampleAck(sseq, dseq, dcount)
delete(o.pending, sseq)
if dseq == o.adflr+1 {
o.adflr = dseq
o.asflr = sseq
// Observables sequence numbers can skip during redlivery since
// they always increment. So if we do not have any pending treat
// as all scenario below. Otherwise check that we filled in a gap.
// TODO(dlc) - check this.
if len(o.pending) == 0 || dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
}
delete(o.rdc, sseq)
case AckAll:
@@ -689,8 +697,7 @@ func (o *Observable) ackMsg(sseq, dseq, dcount uint64) {
return
}
sagap = sseq - o.asflr
o.adflr = dseq
o.asflr = sseq
o.adflr, o.asflr = dseq, sseq
// FIXME(dlc) - delete rdc entries?
case AckNone:
// FIXME(dlc) - This is error but do we care?
@@ -1057,7 +1064,7 @@ func (o *Observable) deliverCurrentMsg(subj string, msg []byte, seq uint64) bool
// Deliver a msg to the observable.
// Lock should be held and o.mset validated to be non-nil.
func (o *Observable) deliverMsg(dsubj, subj string, msg []byte, seq, dcount uint64) {
o.mset.sendq <- &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount), msg, o, o.dseq}
o.mset.sendq <- &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount), msg, o, seq}
if o.config.AckPolicy == AckNone {
o.adflr = o.dseq
o.asflr = seq
@@ -1084,16 +1091,30 @@ func (o *Observable) trackPending(seq uint64) {
// Depending on our state, we will process the failure.
func (o *Observable) didNotDeliver(seq uint64) {
o.mu.Lock()
if o.mset == nil {
mset := o.mset
if mset == nil {
o.mu.Unlock()
return
}
if o.config.Delivery != _EMPTY_ {
shouldSignal := false
if o.isPushMode() {
o.active = false
} else if o.pending != nil {
// push mode and we have pending.
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need
// to queue it up for immediate redelivery since
// we know it was not delivered.
if !o.onRedeliverQueue(seq) {
o.rdq = append(o.rdq, seq)
shouldSignal = true
}
}
}
// FIXME(dlc) - Other scenarios. Pull mode, etc.
o.mu.Unlock()
if shouldSignal {
mset.signalObservers()
}
}
// This checks if we already have this sequence queued for redelivery.
@@ -1404,7 +1425,7 @@ func (o *Observable) SetInActiveDeleteThreshold(dthresh time.Duration) error {
}
// RequestNextMsgSubject returns the subject to request the next message when in pull or worker mode.
// Returns empty otherwise.1
// Returns empty otherwise.
func (o *Observable) RequestNextMsgSubject() string {
return o.nextMsgSubj
}