Fixed and moved large purge test to no race

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-08-14 13:07:46 -07:00
parent 9d7123213a
commit 14572b080b
2 changed files with 172 additions and 159 deletions

View File

@@ -7484,165 +7484,6 @@ func TestJetStreamClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) {
})
}
func TestJetStreamClusterExtendedStreamPurge(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := StreamConfig{
Name: "KV",
Subjects: []string{"kv.>"},
Storage: st,
Replicas: 2,
MaxMsgsPer: 100,
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
si, err := js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "KV" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
for i := 0; i < 1000; i++ {
js.PublishAsync("kv.foo", []byte("OK")) // 1 * i
js.PublishAsync("kv.bar", []byte("OK")) // 2 * i
js.PublishAsync("kv.baz", []byte("OK")) // 3 * i, so after first is 2700, last is 3000
}
for i := 0; i < 700; i++ {
js.PublishAsync(fmt.Sprintf("kv.%d", i+1), []byte("OK"))
}
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo("KV")
if err != nil {
return err
}
if si.State.Msgs != 1000 {
return fmt.Errorf("Expected %d msgs, got %d", 300, si.State.Msgs)
}
return nil
})
shouldFail := func(preq *JSApiStreamPurgeRequest) {
req, _ := json.Marshal(preq)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var pResp JSApiStreamPurgeResponse
if err = json.Unmarshal(resp.Data, &pResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if pResp.Success || pResp.Error == nil {
t.Fatalf("Expected an error response but got none")
}
}
// Sequence and Keep should be mutually exclusive.
shouldFail(&JSApiStreamPurgeRequest{Sequence: 10, Keep: 10})
purge := func(preq *JSApiStreamPurgeRequest, newTotal uint64) {
t.Helper()
req, _ := json.Marshal(preq)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var pResp JSApiStreamPurgeResponse
if err = json.Unmarshal(resp.Data, &pResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !pResp.Success || pResp.Error != nil {
t.Fatalf("Got a bad response %+v", pResp)
}
si, err = js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != newTotal {
t.Fatalf("Expected total after purge to be %d but got %d", newTotal, si.State.Msgs)
}
}
expectLeft := func(subject string, expected uint64) {
t.Helper()
ci, err := js.AddConsumer("KV", &nats.ConsumerConfig{Durable: "dlc", FilterSubject: subject, AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer js.DeleteConsumer("KV", "dlc")
if ci.NumPending != expected {
t.Fatalf("Expected %d remaining but got %d", expected, ci.NumPending)
}
}
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo"}, 900)
expectLeft("kv.foo", 0)
purge(&JSApiStreamPurgeRequest{Subject: "kv.bar", Keep: 1}, 801)
expectLeft("kv.bar", 1)
purge(&JSApiStreamPurgeRequest{Subject: "kv.baz", Sequence: 2851}, 751)
expectLeft("kv.baz", 50)
purge(&JSApiStreamPurgeRequest{Subject: "kv.*"}, 0)
// RESET
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.Publish("kv.foo", []byte("OK"))
}
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10)
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10)
expectLeft("kv.foo", 10)
// RESET AGAIN
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.Publish("kv.foo", []byte("OK"))
}
purge(&JSApiStreamPurgeRequest{Keep: 10}, 10)
expectLeft(">", 10)
// RESET AGAIN
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.Publish("kv.foo", []byte("OK"))
}
purge(&JSApiStreamPurgeRequest{Sequence: 90}, 11) // Up to 90 so we keep that, hence the 11.
expectLeft(">", 11)
})
}
}
func TestPurgeBySequence(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {

View File

@@ -2549,3 +2549,175 @@ func TestNoRaceJetStreamStalledMirrorsAfterExpire(t *testing.T) {
return nil
})
}
func TestNoRaceJetStreamClusterExtendedStreamPurge(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := StreamConfig{
Name: "KV",
Subjects: []string{"kv.>"},
Storage: st,
Replicas: 2,
MaxMsgsPer: 100,
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
c.waitOnStreamLeader("$G", "KV")
si, err := js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "KV" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
for i := 0; i < 1000; i++ {
js.PublishAsync("kv.foo", []byte("OK")) // 1 * i
js.PublishAsync("kv.bar", []byte("OK")) // 2 * i
js.PublishAsync("kv.baz", []byte("OK")) // 3 * i
}
// First is 2700, last is 3000
for i := 0; i < 700; i++ {
js.PublishAsync(fmt.Sprintf("kv.%d", i+1), []byte("OK"))
}
// Now first is 2700, last is 3700
select {
case <-js.PublishAsyncComplete():
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive completion signal")
}
si, err = js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 1000 {
t.Fatalf("Expected %d msgs, got %d", 1000, si.State.Msgs)
}
shouldFail := func(preq *JSApiStreamPurgeRequest) {
req, _ := json.Marshal(preq)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var pResp JSApiStreamPurgeResponse
if err = json.Unmarshal(resp.Data, &pResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if pResp.Success || pResp.Error == nil {
t.Fatalf("Expected an error response but got none")
}
}
// Sequence and Keep should be mutually exclusive.
shouldFail(&JSApiStreamPurgeRequest{Sequence: 10, Keep: 10})
purge := func(preq *JSApiStreamPurgeRequest, newTotal uint64) {
t.Helper()
req, _ := json.Marshal(preq)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var pResp JSApiStreamPurgeResponse
if err = json.Unmarshal(resp.Data, &pResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !pResp.Success || pResp.Error != nil {
t.Fatalf("Got a bad response %+v", pResp)
}
si, err = js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != newTotal {
t.Fatalf("Expected total after purge to be %d but got %d", newTotal, si.State.Msgs)
}
}
expectLeft := func(subject string, expected uint64) {
t.Helper()
ci, err := js.AddConsumer("KV", &nats.ConsumerConfig{Durable: "dlc", FilterSubject: subject, AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer js.DeleteConsumer("KV", "dlc")
if ci.NumPending != expected {
t.Fatalf("Expected %d remaining but got %d", expected, ci.NumPending)
}
}
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo"}, 900)
expectLeft("kv.foo", 0)
purge(&JSApiStreamPurgeRequest{Subject: "kv.bar", Keep: 1}, 801)
expectLeft("kv.bar", 1)
purge(&JSApiStreamPurgeRequest{Subject: "kv.baz", Sequence: 2851}, 751)
expectLeft("kv.baz", 50)
purge(&JSApiStreamPurgeRequest{Subject: "kv.*"}, 0)
// RESET
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
c.waitOnStreamLeader("$G", "KV")
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.PublishAsync("kv.foo", []byte("OK"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10)
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10)
expectLeft("kv.foo", 10)
// RESET AGAIN
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.Publish("kv.foo", []byte("OK"))
}
purge(&JSApiStreamPurgeRequest{Keep: 10}, 10)
expectLeft(">", 10)
// RESET AGAIN
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.Publish("kv.foo", []byte("OK"))
}
purge(&JSApiStreamPurgeRequest{Sequence: 90}, 11) // Up to 90 so we keep that, hence the 11.
expectLeft(">", 11)
})
}
}