mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Fix for flapper and additional consumer perf test
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2418,6 +2418,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
|
||||
for i := 0; i < toSend; i++ {
|
||||
sendStreamMsg(t, nc, "foo.1", "Hello World!")
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
state := mset.State()
|
||||
if state.Msgs != uint64(toSend) {
|
||||
t.Fatalf("Expected %d messages, got %d", toSend, state.Msgs)
|
||||
@@ -2430,18 +2432,20 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
|
||||
defer o.Delete()
|
||||
|
||||
expectPending := func(ep int) {
|
||||
t.Helper()
|
||||
// Now check consumer info.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if info, pep := o.Info(), ep+1; int(info.NumPending) != pep {
|
||||
return fmt.Errorf("Expected consumer info pending of %d, got %d", pep, info.NumPending)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
m, err := nc.Request(o.RequestNextMsgSubject(), nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
_, _, _, _, pending := o.ReplyInfo(m.Reply)
|
||||
if pending != uint64(ep) {
|
||||
t.Fatalf("Expected ack reply pending of %d, got %d", ep, pending)
|
||||
}
|
||||
// Now check consumer info.
|
||||
if info := o.Info(); int(info.NumPending) != ep {
|
||||
t.Fatalf("Expected consumer info pending of %d, got %d", ep, info.NumPending)
|
||||
t.Fatalf("Expected ack reply pending of %d, got %d - reply: %q", ep, pending, m.Reply)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2450,6 +2454,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
|
||||
for i := 0; i < toSend; i++ {
|
||||
sendStreamMsg(t, nc, "foo.1", "Hello World!")
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
expectPending(toSend*2 - 2)
|
||||
// Purge and send a new one.
|
||||
mset.Purge()
|
||||
@@ -2471,6 +2477,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
|
||||
for i := 0; i < toSend; i++ {
|
||||
sendStreamMsg(t, nc, "foo.1", "Hello World!")
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
// Wait for expiration to kick in.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if state := mset.State(); state.Msgs != 0 {
|
||||
@@ -2492,6 +2500,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
|
||||
for i := 0; i < toSend; i++ {
|
||||
sendStreamMsg(t, nc, "foo.33", "Hello World!")
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
if info := o.Info(); info.NumPending != 0 {
|
||||
t.Fatalf("Expected no pending, got %d", info.NumPending)
|
||||
}
|
||||
@@ -2508,6 +2518,7 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
|
||||
for i := 0; i < toSend; i++ {
|
||||
sendStreamMsg(t, nc, "foo.22", "Hello World!")
|
||||
}
|
||||
nc.Flush()
|
||||
expectPending(100)
|
||||
mset.Purge()
|
||||
sendStreamMsg(t, nc, "foo.22", "Hello World!")
|
||||
@@ -8553,6 +8564,68 @@ func TestJetStreamPubPerf(t *testing.T) {
|
||||
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
|
||||
}
|
||||
|
||||
func TestJetStreamConsumerPerf(t *testing.T) {
|
||||
// Comment out to run, holding place for now.
|
||||
t.SkipNow()
|
||||
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer os.RemoveAll(config.StoreDir)
|
||||
}
|
||||
|
||||
acc := s.GlobalAccount()
|
||||
|
||||
msetConfig := server.StreamConfig{
|
||||
Name: "sr22",
|
||||
Storage: server.MemoryStorage,
|
||||
Subjects: []string{"foo"},
|
||||
}
|
||||
|
||||
mset, err := acc.AddStream(&msetConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding stream: %v", err)
|
||||
}
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
payload := []byte("Hello World")
|
||||
|
||||
toStore := 2000000
|
||||
for i := 0; i < toStore; i++ {
|
||||
nc.Publish("foo", payload)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
_, err = mset.AddConsumer(&server.ConsumerConfig{
|
||||
Durable: "d",
|
||||
DeliverSubject: "d",
|
||||
AckPolicy: server.AckNone,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating consumer: %v", err)
|
||||
}
|
||||
|
||||
var received int
|
||||
done := make(chan bool)
|
||||
|
||||
nc.Subscribe("d", func(m *nats.Msg) {
|
||||
received++
|
||||
if received >= toStore {
|
||||
done <- true
|
||||
}
|
||||
})
|
||||
start := time.Now()
|
||||
nc.Flush()
|
||||
|
||||
<-done
|
||||
tt := time.Since(start)
|
||||
fmt.Printf("time is %v\n", tt)
|
||||
fmt.Printf("%.0f msgs/sec\n", float64(toStore)/tt.Seconds())
|
||||
}
|
||||
|
||||
func TestJetStreamPubSubPerf(t *testing.T) {
|
||||
// Comment out to run, holding place for now.
|
||||
t.SkipNow()
|
||||
|
||||
Reference in New Issue
Block a user