Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2022-11-14 08:40:58 -08:00
3 changed files with 66 additions and 11 deletions

View File

@@ -2417,22 +2417,19 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
o.mu.RUnlock()
return needAck
}
asflr, osseq = state.AckFloor.Stream, o.sseq
pending = state.Pending
// If loading state as here, the osseq is +1.
asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending
}
switch o.cfg.AckPolicy {
case AckNone, AckAll:
needAck = sseq > asflr
case AckExplicit:
if sseq > asflr {
// Generally this means we need an ack, but just double check pending acks.
needAck = true
if sseq < osseq {
if len(pending) == 0 {
needAck = false
} else {
_, needAck = pending[sseq]
}
if sseq >= osseq {
needAck = true
} else {
_, needAck = pending[sseq]
}
}
}

View File

@@ -26,7 +26,7 @@ var defaultFuzzServerOptions = Options{
}
func dummyFuzzClient() *client {
return &client{srv: New(&defaultFuzzServerOptions), msubs: -1, mpay: -1, mcl: MAX_CONTROL_LINE_SIZE}
return &client{srv: New(&defaultFuzzServerOptions), msubs: -1, mpay: MAX_PAYLOAD_SIZE, mcl: MAX_CONTROL_LINE_SIZE}
}
func FuzzClient(data []byte) int {

View File

@@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"net"
"os"
"reflect"
@@ -1315,3 +1316,60 @@ func TestJetStreamClusterHAssetsEnforcement(t *testing.T) {
})
require_Error(t, err, exceededErrs...)
}
func TestJetStreamClusterInterestStreamConsumer(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)
var subs []*nats.Subscription
ns := 5
for i := 0; i < ns; i++ {
dn := fmt.Sprintf("d%d", i)
sub, err := js.PullSubscribe("foo", dn)
require_NoError(t, err)
subs = append(subs, sub)
}
// Send 10 msgs
n := 10
for i := 0; i < n; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
// Collect all the messages.
var msgs []*nats.Msg
for _, sub := range subs {
lmsgs := fetchMsgs(t, sub, n, time.Second)
if len(lmsgs) != n {
t.Fatalf("Did not receive all msgs: %d vs %d", len(lmsgs), n)
}
msgs = append(msgs, lmsgs...)
}
// Shuffle
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
for _, m := range msgs {
m.AckSync()
}
// Make sure replicated acks are processed.
time.Sleep(250 * time.Millisecond)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs != 0 {
t.Fatalf("Should not have any messages left: %d of %d", si.State.Msgs, n)
}
}