mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge pull request #2299 from ripienaar/slow_purge
test to show slow purges
This commit is contained in:
@@ -1756,6 +1756,70 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) {
|
||||
t.Skip("fails always")
|
||||
|
||||
cerr := func(t *testing.T, err error) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatalf("unexepected err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
si, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "KV",
|
||||
Subjects: []string{"kv.>"},
|
||||
Storage: nats.FileStorage,
|
||||
})
|
||||
cerr(t, err)
|
||||
|
||||
// 100kb messages spread over 1000 subjects
|
||||
body := make([]byte, 100*1024)
|
||||
for i := 0; i < 100000; i++ {
|
||||
err := nc.Publish(fmt.Sprintf("kv.%d", i%1000), body)
|
||||
cerr(t, err)
|
||||
}
|
||||
si, err = js.StreamInfo("KV")
|
||||
cerr(t, err)
|
||||
if si == nil || si.Config.Name != "KV" {
|
||||
t.Fatalf("StreamInfo is not correct %+v", si)
|
||||
}
|
||||
|
||||
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
|
||||
si, err = js.StreamInfo("KV")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if si.State.Msgs == 100000 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("waiting for more")
|
||||
})
|
||||
|
||||
jp, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "kv.20"})
|
||||
start := time.Now()
|
||||
res, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), jp, time.Minute)
|
||||
elapsed := time.Since(start)
|
||||
cerr(t, err)
|
||||
pres := JSApiStreamPurgeResponse{}
|
||||
err = json.Unmarshal(res.Data, &pres)
|
||||
cerr(t, err)
|
||||
if !pres.Success {
|
||||
t.Fatalf("purge failed: %#v", pres)
|
||||
}
|
||||
if elapsed > time.Second {
|
||||
t.Fatalf("Purge took %s", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "MMS", 9)
|
||||
defer c.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user