From c9bf329a99f3886da0ac996fe47d93f8f00dc1fd Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 21 Jun 2021 17:01:49 +0200 Subject: [PATCH 1/2] test to show slow purges Signed-off-by: R.I.Pienaar --- server/norace_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/server/norace_test.go b/server/norace_test.go index 7d785a77..022d99af 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -17,6 +17,7 @@ package server import ( "bufio" + "context" "encoding/json" "fmt" "math/rand" @@ -1756,6 +1757,89 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) { } +func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { + t.Skip("fails always") + + cerr := func(t *testing.T, err error) { + t.Helper() + if err != nil { + t.Fatalf("unexepected err: %s", err) + } + } + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := StreamConfig{ + Name: "KV", + Subjects: []string{"kv.>"}, + Storage: FileStorage, + } + req, err := json.Marshal(cfg) + cerr(t, err) + + _, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + cerr(t, err) + + si, err := js.StreamInfo("KV") + cerr(t, err) + + // 10kb messages spread over 1000 subjects + body := make([]byte, 101024) + for i := 0; i < 100000; i++ { + err := nc.Publish(fmt.Sprintf("kv.%d", i%1000), body) + cerr(t, err) + } + si, err = js.StreamInfo("KV") + cerr(t, err) + if si == nil || si.Config.Name != "KV" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + ticker := time.NewTicker(200 * time.Millisecond) + ready := false + for { + select { + case <-ticker.C: + si, err = js.StreamInfo("KV") + cerr(t, err) + if si.State.Msgs == 100000 { + cancel() + ticker.Stop() + return + } + + case <-ctx.Done(): + ticker.Stop() + if !ready { + t.Fatalf("timeout waiting for messages") + } + } + } + }() + + jp, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "kv.20"}) + start := time.Now() + res, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), jp, time.Minute) + elapsed := time.Since(start) + cerr(t, err) + pres := JSApiStreamPurgeResponse{} + err = json.Unmarshal(res.Data, &pres) + cerr(t, err) + if !pres.Success { + t.Fatalf("purge failed: %#v", pres) + } + if elapsed > time.Second { + t.Fatalf("Purge took %s", elapsed) + } +} + func TestNoRaceJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) { c := createJetStreamClusterExplicit(t, "MMS", 9) defer c.shutdown() From c6b85fd101a4778f4ad6ea4b327b113b603e7b66 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Tue, 22 Jun 2021 08:46:32 +0200 Subject: [PATCH 2/2] update for review Signed-off-by: R.I.Pienaar --- server/norace_test.go | 52 +++++++++++++------------------------------ 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index 022d99af..4d6b8d52 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -17,7 +17,6 @@ package server import ( "bufio" - "context" "encoding/json" "fmt" "math/rand" @@ -1773,22 +1772,15 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - cfg := StreamConfig{ + si, err := js.AddStream(&nats.StreamConfig{ Name: "KV", Subjects: []string{"kv.>"}, - Storage: FileStorage, - } - req, err := json.Marshal(cfg) + Storage: nats.FileStorage, + }) cerr(t, err) - _, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - cerr(t, err) - - si, err := js.StreamInfo("KV") - cerr(t, err) - - // 10kb messages spread over 1000 subjects - body := make([]byte, 101024) + // 100kb messages spread over 1000 subjects + body := make([]byte, 100*1024) for i := 0; i < 100000; i++ { err := nc.Publish(fmt.Sprintf("kv.%d", i%1000), body) cerr(t, err) @@ -1799,30 +1791,18 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } - func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - ticker := time.NewTicker(200 * time.Millisecond) - ready := false - for { - select { - case <-ticker.C: - si, err = js.StreamInfo("KV") - cerr(t, err) - if si.State.Msgs == 100000 { - cancel() - ticker.Stop() - return - } - - case <-ctx.Done(): - ticker.Stop() - if !ready { - t.Fatalf("timeout waiting for messages") - } - } + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + si, err = js.StreamInfo("KV") + if err != nil { + return err } - }() + + if si.State.Msgs == 100000 { + return nil + } + + return fmt.Errorf("waiting for more") + }) jp, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "kv.20"}) start := time.Now()