diff --git a/server/norace_test.go b/server/norace_test.go index 7d785a77..4d6b8d52 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1756,6 +1756,70 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) { } +func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { + t.Skip("fails always") + + cerr := func(t *testing.T, err error) { + t.Helper() + if err != nil { + t.Fatalf("unexepected err: %s", err) + } + } + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "KV", + Subjects: []string{"kv.>"}, + Storage: nats.FileStorage, + }) + cerr(t, err) + + // 100kb messages spread over 1000 subjects + body := make([]byte, 100*1024) + for i := 0; i < 100000; i++ { + err := nc.Publish(fmt.Sprintf("kv.%d", i%1000), body) + cerr(t, err) + } + si, err = js.StreamInfo("KV") + cerr(t, err) + if si == nil || si.Config.Name != "KV" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + si, err = js.StreamInfo("KV") + if err != nil { + return err + } + + if si.State.Msgs == 100000 { + return nil + } + + return fmt.Errorf("waiting for more") + }) + + jp, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "kv.20"}) + start := time.Now() + res, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), jp, time.Minute) + elapsed := time.Since(start) + cerr(t, err) + pres := JSApiStreamPurgeResponse{} + err = json.Unmarshal(res.Data, &pres) + cerr(t, err) + if !pres.Success { + t.Fatalf("purge failed: %#v", pres) + } + if elapsed > time.Second { + t.Fatalf("Purge took %s", elapsed) + } +} + func TestNoRaceJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) { c := createJetStreamClusterExplicit(t, "MMS", 9) defer c.shutdown()