mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
@@ -12389,75 +12389,49 @@ func TestJetStreamDeliverLastPerSubject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJetStreamDeliverLastPerSubjectNumPending(t *testing.T) {
|
||||
for _, st := range []nats.StorageType{nats.FileStorage} { //}, nats.MemoryStorage} {
|
||||
t.Run(st.String(), func(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
// Client for API requests.
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
// Client for API requests.
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "KV",
|
||||
Subjects: []string{"KV.>"},
|
||||
Storage: st,
|
||||
MaxMsgsPerSubject: 5,
|
||||
Replicas: 1,
|
||||
}); err != nil {
|
||||
t.Fatalf("Error adding stream: %v", err)
|
||||
}
|
||||
if _, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "KV",
|
||||
Subjects: []string{"KV.>"},
|
||||
MaxMsgsPerSubject: 5,
|
||||
Replicas: 1,
|
||||
}); err != nil {
|
||||
t.Fatalf("Error adding stream: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
msg := []byte(fmt.Sprintf("msg%d", i))
|
||||
js.PublishAsync("KV.foo", msg)
|
||||
js.PublishAsync("KV.bar", msg)
|
||||
js.PublishAsync("KV.baz", msg)
|
||||
js.PublishAsync("KV.bat", msg)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
msg := []byte(fmt.Sprintf("msg%d", i))
|
||||
js.Publish("KV.foo", msg)
|
||||
js.Publish("KV.bar", msg)
|
||||
js.Publish("KV.baz", msg)
|
||||
js.Publish("KV.bat", msg)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
// Delete some messages
|
||||
js.DeleteMsg("KV", 2)
|
||||
js.DeleteMsg("KV", 5)
|
||||
|
||||
// Delete some messages
|
||||
js.DeleteMsg("KV", 2)
|
||||
js.DeleteMsg("KV", 5)
|
||||
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("KV")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if si.State.Msgs != 18 {
|
||||
t.Fatalf("Expected 18 messages, got %v", si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
inbox := nats.NewInbox()
|
||||
sub, _ := nc.SubscribeSync(inbox)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
ci, err := js.AddConsumer("KV", &nats.ConsumerConfig{
|
||||
DeliverSubject: inbox,
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
DeliverPolicy: nats.DeliverLastPerSubjectPolicy,
|
||||
FilterSubject: "KV.>",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error adding consumer: %v", err)
|
||||
}
|
||||
if ci.NumPending != 4 {
|
||||
t.Fatalf("Expected 4 pending msgs, got %v", ci.NumPending)
|
||||
}
|
||||
})
|
||||
ci, err := js.AddConsumer("KV", &nats.ConsumerConfig{
|
||||
DeliverSubject: nats.NewInbox(),
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
DeliverPolicy: nats.DeliverLastPerSubjectPolicy,
|
||||
FilterSubject: "KV.>",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error adding consumer: %v", err)
|
||||
}
|
||||
if ci.NumPending != 4 {
|
||||
t.Fatalf("Expected 4 pending msgs, got %v", ci.NumPending)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user