mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix for interest only, broken test
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2296,7 +2296,7 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
}
|
||||
o.closed = true
|
||||
|
||||
if dflag && advisory {
|
||||
if dflag && advisory && o.isLeader() {
|
||||
o.sendDeleteAdvisoryLocked()
|
||||
}
|
||||
|
||||
@@ -2338,7 +2338,7 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
sysc.closeConnection(ClientClosed)
|
||||
}
|
||||
|
||||
if delivery != "" {
|
||||
if delivery != _EMPTY_ {
|
||||
a.sl.ClearNotification(delivery, o.inch)
|
||||
}
|
||||
|
||||
@@ -2348,9 +2348,14 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
mset.mu.Unlock()
|
||||
|
||||
// We need to optionally remove all messages since we are interest based retention.
|
||||
// We will do this consistently on all replicas. Note that if in clustered mode the
|
||||
// non-leader consumers will need to restore state first.
|
||||
if dflag && rp == InterestPolicy {
|
||||
var seqs []uint64
|
||||
o.mu.Lock()
|
||||
if !o.isLeader() {
|
||||
o.readStoredState()
|
||||
}
|
||||
for seq := range o.pending {
|
||||
seqs = append(seqs, seq)
|
||||
}
|
||||
|
||||
@@ -759,6 +759,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
|
||||
t.Fatalf("Did not get expected msg, expected %q, got %q", payload, m.Data)
|
||||
}
|
||||
}
|
||||
|
||||
ci, err := sub.ConsumerInfo()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error getting consumer info: %v", err)
|
||||
@@ -782,19 +783,20 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
|
||||
|
||||
// Now send more..
|
||||
// Send 10 more messages.
|
||||
for i := 10; i <= 20; i++ {
|
||||
for i := 11; i <= 20; i++ {
|
||||
payload := []byte(fmt.Sprintf("MSG-%d", i))
|
||||
if _, err = js.Publish("foo", payload); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
checkSubsPending(t, sub, 10)
|
||||
// Sanity check for duplicate deliveries..
|
||||
if nmsgs, _, _ := sub.Pending(); nmsgs > 10 {
|
||||
t.Fatalf("Expected only %d responses, got %d more", 10, nmsgs)
|
||||
}
|
||||
|
||||
for i := 10; i <= 20; i++ {
|
||||
for i := 11; i <= 20; i++ {
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@@ -1746,6 +1748,84 @@ func TestJetStreamClusterStreamLimits(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamInterestOnlyPolicy(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "foo",
|
||||
Replicas: 3,
|
||||
Retention: nats.InterestPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
toSend := 10
|
||||
|
||||
// With no interest these should be no-ops.
|
||||
for i := 0; i < toSend; i++ {
|
||||
if _, err := js.Publish("foo", []byte("JSC-OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
si, err := js.StreamInfo("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.State.Msgs != 0 {
|
||||
t.Fatalf("Expected no messages with no interest, got %d", si.State.Msgs)
|
||||
}
|
||||
|
||||
// Now create a consumer.
|
||||
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < toSend; i++ {
|
||||
if _, err := js.Publish("foo", []byte("JSC-OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
checkSubsPending(t, sub, toSend)
|
||||
|
||||
si, err = js.StreamInfo("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.State.Msgs != uint64(toSend) {
|
||||
t.Fatalf("Expected %d messages with interest, got %d", toSend, si.State.Msgs)
|
||||
}
|
||||
if si.State.FirstSeq != uint64(toSend+1) {
|
||||
t.Fatalf("Expected first sequence of %d, got %d", toSend+1, si.State.FirstSeq)
|
||||
}
|
||||
|
||||
// Now delete the consumer.
|
||||
sub.Unsubscribe()
|
||||
if err := js.DeleteConsumer("foo", "dlc"); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the messages to be purged.
|
||||
checkFor(t, 5*time.Second, 20*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.State.Msgs == 0 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Wanted 0 messages, got %d", si.State.Msgs)
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamPerf(t *testing.T) {
|
||||
// Comment out to run, holding place for now.
|
||||
skip(t)
|
||||
@@ -1905,7 +1985,7 @@ func jsClientConnect(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStream
|
||||
|
||||
func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) {
|
||||
t.Helper()
|
||||
checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user