Fixed consumer info num pending bug.

Under load we could have a message committed to the underlying store when a consumer was being created and then it increase num pending again when the stream signals the consumers.
This fix just remembers the last seq of the state when we calculate sgap and test before adding in the stream code.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-01-12 20:03:26 -08:00
parent 43eff407b8
commit 103f710479
4 changed files with 68 additions and 22 deletions

View File

@@ -207,6 +207,7 @@ type consumer struct {
adflr uint64
asflr uint64
sgap uint64
lsgap uint64
dsubj string
qgroup string
lss *lastSeqSkipList
@@ -425,15 +426,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if config.OptStartTime != nil {
return nil, NewJSConsumerInvalidPolicyError(badStart("last per subject", "time"))
}
badConfig := config.FilterSubject == _EMPTY_
if !badConfig {
subjects, ext := mset.allSubjects()
if len(subjects) == 1 && !ext && subjects[0] == config.FilterSubject && subjectIsLiteral(subjects[0]) {
badConfig = true
}
}
if badConfig {
return nil, NewJSConsumerInvalidPolicyError(notSet("deliver last per subject", "filter subject"))
if config.FilterSubject == _EMPTY_ {
return nil, NewJSConsumerInvalidPolicyError(notSet("last per subject", "filter subject"))
}
case DeliverNew:
if config.OptStartSeq > 0 {
@@ -444,17 +438,17 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
case DeliverByStartSequence:
if config.OptStartSeq == 0 {
return nil, NewJSConsumerInvalidPolicyError(notSet("deliver by start sequence", "start sequence"))
return nil, NewJSConsumerInvalidPolicyError(notSet("by start sequence", "start sequence"))
}
if config.OptStartTime != nil {
return nil, NewJSConsumerInvalidPolicyError(badStart("deliver by start sequence", "time"))
return nil, NewJSConsumerInvalidPolicyError(badStart("by start sequence", "time"))
}
case DeliverByStartTime:
if config.OptStartTime == nil {
return nil, NewJSConsumerInvalidPolicyError(notSet("deliver by start time", "start time"))
return nil, NewJSConsumerInvalidPolicyError(notSet("by start time", "start time"))
}
if config.OptStartSeq != 0 {
return nil, NewJSConsumerInvalidPolicyError(badStart("deliver by start time", "start sequence"))
return nil, NewJSConsumerInvalidPolicyError(badStart("by start time", "start sequence"))
}
}
@@ -3422,6 +3416,7 @@ func (o *consumer) setInitialPendingAndStart() {
mset.store.FastState(&state)
if state.Msgs > 0 {
o.sgap = state.Msgs - (o.sseq - state.FirstSeq)
o.lsgap = state.LastSeq
}
} else {
// Here we are filtered.
@@ -3429,8 +3424,10 @@ func (o *consumer) setInitialPendingAndStart() {
ss := mset.store.FilteredState(o.lss.resume+1, o.cfg.FilterSubject)
o.sseq = o.lss.seqs[0]
o.sgap = ss.Msgs + uint64(len(o.lss.seqs))
o.lsgap = ss.Last
} else if ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject); ss.Msgs > 0 {
o.sgap = ss.Msgs
o.lsgap = ss.Last
// See if we should update our starting sequence.
if dp == DeliverLast || dp == DeliverLastPerSubject {
o.sseq = ss.Last

View File

@@ -4496,14 +4496,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// Also short circuit if DeliverLastPerSubject is set with no FilterSubject.
if cfg.DeliverPolicy == DeliverLastPerSubject {
badConfig := cfg.FilterSubject == _EMPTY_
if !badConfig {
subjects := sa.Config.Subjects
if len(subjects) == 1 && subjects[0] == cfg.FilterSubject && subjectIsLiteral(subjects[0]) {
badConfig = true
}
}
if badConfig {
if cfg.FilterSubject == _EMPTY_ {
resp.Error = NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but FilterSubject is not set"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return

View File

@@ -9899,6 +9899,62 @@ func TestJetStreamClusterBalancedPlacement(t *testing.T) {
require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError())
}
func TestJetStreamClusterConsumerPendingBug(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
nc2, js2 := jsClientConnect(t, c.randomServer())
defer nc2.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 3})
require_NoError(t, err)
startCh, doneCh := make(chan bool), make(chan error)
go func() {
<-startCh
_, err := js2.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "dlc",
FilterSubject: "foo",
DeliverSubject: "x",
})
doneCh <- err
}()
n := 10_000
for i := 0; i < n; i++ {
nc.Publish("foo", []byte("ok"))
if i == 222 {
startCh <- true
}
}
// Wait for them to all be there.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("foo")
require_NoError(t, err)
if si.State.Msgs != uint64(n) {
return fmt.Errorf("Not received all messages")
}
return nil
})
select {
case err := <-doneCh:
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
ci, err := js.ConsumerInfo("foo", "dlc")
require_NoError(t, err)
if ci.NumPending != uint64(n) {
t.Fatalf("Expected NumPending to be %d, got %d", n, ci.NumPending)
}
case <-time.After(5 * time.Second):
t.Fatalf("Timed out?")
}
}
func TestJetStreamClusterPullPerf(t *testing.T) {
skip(t)

View File

@@ -3070,7 +3070,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
for _, o := range mset.consumers {
o.mu.Lock()
if o.isLeader() {
if o.isFilteredMatch(subject) {
if seq > o.lsgap && o.isFilteredMatch(subject) {
o.sgap++
}
o.signalNewMessages()