From a43a69a4039e6b3abf9403b513d4d993dcfeef7d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 22 Jan 2021 11:04:06 -0800 Subject: [PATCH] Fix for interest only, broken test Signed-off-by: Derek Collison --- server/consumer.go | 9 +++- test/jetstream_cluster_test.go | 86 ++++++++++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 5 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index d3ecd75a..1667cbb0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2296,7 +2296,7 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { } o.closed = true - if dflag && advisory { + if dflag && advisory && o.isLeader() { o.sendDeleteAdvisoryLocked() } @@ -2338,7 +2338,7 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { sysc.closeConnection(ClientClosed) } - if delivery != "" { + if delivery != _EMPTY_ { a.sl.ClearNotification(delivery, o.inch) } @@ -2348,9 +2348,14 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { mset.mu.Unlock() // We need to optionally remove all messages since we are interest based retention. + // We will do this consistently on all replicas. Note that if in clustered mode the + // non-leader consumers will need to restore state first. if dflag && rp == InterestPolicy { var seqs []uint64 o.mu.Lock() + if !o.isLeader() { + o.readStoredState() + } for seq := range o.pending { seqs = append(seqs, seq) } diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 5450e70e..736a6c61 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -759,6 +759,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { t.Fatalf("Did not get expected msg, expected %q, got %q", payload, m.Data) } } + ci, err := sub.ConsumerInfo() if err != nil { t.Fatalf("Unexpected error getting consumer info: %v", err) @@ -782,19 +783,20 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { // Now send more.. // Send 10 more messages. - for i := 10; i <= 20; i++ { + for i := 11; i <= 20; i++ { payload := []byte(fmt.Sprintf("MSG-%d", i)) if _, err = js.Publish("foo", payload); err != nil { t.Fatalf("Unexpected publish error: %v", err) } } + checkSubsPending(t, sub, 10) // Sanity check for duplicate deliveries.. if nmsgs, _, _ := sub.Pending(); nmsgs > 10 { t.Fatalf("Expected only %d responses, got %d more", 10, nmsgs) } - for i := 10; i <= 20; i++ { + for i := 11; i <= 20; i++ { m, err := sub.NextMsg(time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1746,6 +1748,84 @@ func TestJetStreamClusterStreamLimits(t *testing.T) { } +func TestJetStreamClusterStreamInterestOnlyPolicy(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Replicas: 3, + Retention: nats.InterestPolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + toSend := 10 + + // With no interest these should be no-ops. + for i := 0; i < toSend; i++ { + if _, err := js.Publish("foo", []byte("JSC-OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + si, err := js.StreamInfo("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 0 { + t.Fatalf("Expected no messages with no interest, got %d", si.State.Msgs) + } + + // Now create a consumer. + sub, err := js.SubscribeSync("foo", nats.Durable("dlc")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for i := 0; i < toSend; i++ { + if _, err := js.Publish("foo", []byte("JSC-OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + checkSubsPending(t, sub, toSend) + + si, err = js.StreamInfo("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != uint64(toSend) { + t.Fatalf("Expected %d messages with interest, got %d", toSend, si.State.Msgs) + } + if si.State.FirstSeq != uint64(toSend+1) { + t.Fatalf("Expected first sequence of %d, got %d", toSend+1, si.State.FirstSeq) + } + + // Now delete the consumer. + sub.Unsubscribe() + if err := js.DeleteConsumer("foo", "dlc"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Wait for the messages to be purged. + checkFor(t, 5*time.Second, 20*time.Millisecond, func() error { + si, err := js.StreamInfo("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs == 0 { + return nil + } + return fmt.Errorf("Wanted 0 messages, got %d", si.State.Msgs) + }) +} + func TestJetStreamClusterStreamPerf(t *testing.T) { // Comment out to run, holding place for now. skip(t) @@ -1905,7 +1985,7 @@ func jsClientConnect(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStream func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { t.Helper() - checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error { + checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error { if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) }