Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-08-24 16:20:46 -07:00
4 changed files with 152 additions and 11 deletions

View File

@@ -5265,3 +5265,122 @@ func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing
}
}
}
func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)
// send 50 msgs
for i := 0; i < 50; i++ {
_, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
}
// File based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("file"),
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
)
require_NoError(t, err)
// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)
cia, err := js.ConsumerInfo("TEST", "file")
require_NoError(t, err)
// Make sure followers will have exact same state.
_, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "file"), nil, time.Second)
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "file")
cib, err := js.ConsumerInfo("TEST", "file")
require_NoError(t, err)
// Want to compare sans cluster details which we know will change due to leader change.
// Also last activity for delivered can be slightly off so nil out as well.
checkConsumerInfo := func(a, b *nats.ConsumerInfo) {
t.Helper()
a.Cluster, b.Cluster = nil, nil
a.Delivered.Last, b.Delivered.Last = nil, nil
if !reflect.DeepEqual(a, b) {
t.Fatalf("ConsumerInfo do not match\n\t%+v\n\t%+v", a, b)
}
}
checkConsumerInfo(cia, cib)
// Memory based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("mem"),
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
nats.ConsumerMemoryStorage(),
)
require_NoError(t, err)
// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)
cia, err = js.ConsumerInfo("TEST", "mem")
require_NoError(t, err)
// Make sure followers will have exact same state.
_, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "mem"), nil, time.Second)
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "mem")
cib, err = js.ConsumerInfo("TEST", "mem")
require_NoError(t, err)
checkConsumerInfo(cia, cib)
// Now file based but R1 and server restart.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("r1"),
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
nats.ConsumerReplicas(1),
)
require_NoError(t, err)
// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)
cia, err = js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
cl := c.consumerLeader(globalAccountName, "TEST", "r1")
cl.Shutdown()
cl.WaitForShutdown()
cl = c.restartServer(cl)
c.waitOnServerCurrent(cl)
cib, err = js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
// Created can skew a small bit due to server restart, this is expected.
now := time.Now()
cia.Created, cib.Created = now, now
checkConsumerInfo(cia, cib)
}