mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge pull request #2382 from nats-io/interest-policy-bug
[FIXED] A stream with interest retention had a clustered consumer could cause server panic.
This commit is contained in:
@@ -2802,7 +2802,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
|
||||
// non-leader consumers will need to restore state first.
|
||||
if dflag && rp == InterestPolicy {
|
||||
stop := mset.lastSeq()
|
||||
|
||||
o.mu.Lock()
|
||||
if !o.isLeader() {
|
||||
o.readStoredState()
|
||||
@@ -2810,8 +2809,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
|
||||
start := o.asflr
|
||||
o.mu.Unlock()
|
||||
|
||||
rmseqs := make([]uint64, 0, stop-start+1)
|
||||
|
||||
var rmseqs []uint64
|
||||
mset.mu.RLock()
|
||||
for seq := start; seq <= stop; seq++ {
|
||||
if !mset.checkInterest(seq, o) {
|
||||
|
||||
@@ -12169,6 +12169,77 @@ func TestJetStreamConsumerBadNumPending(t *testing.T) {
|
||||
checkForNoPending(mon)
|
||||
}
|
||||
|
||||
// We had a report of a consumer delete crashing the server when in interest retention mode.
|
||||
// This I believe is only really possible in clustered mode, but we will force the issue here.
|
||||
func TestJetStreamConsumerCleanupWithRetentionPolicy(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
// Client for API requests.
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "ORDERS",
|
||||
Subjects: []string{"orders.*"},
|
||||
Retention: nats.InterestPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
sub, err := js.SubscribeSync("orders.*")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
payload := []byte("Hello World")
|
||||
for i := 0; i < 10; i++ {
|
||||
subj := fmt.Sprintf("orders.%d", i+1)
|
||||
js.Publish(subj, payload)
|
||||
}
|
||||
|
||||
checkSubsPending(t, sub, 10)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
m.Ack()
|
||||
}
|
||||
|
||||
ci, err := sub.ConsumerInfo()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error getting consumer info: %v", err)
|
||||
}
|
||||
|
||||
acc := s.GlobalAccount()
|
||||
mset, err := acc.lookupStream("ORDERS")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
o := mset.lookupConsumer(ci.Name)
|
||||
if o == nil {
|
||||
t.Fatalf("Error looking up consumer %q", ci.Name)
|
||||
}
|
||||
lseq := mset.lastSeq()
|
||||
o.mu.Lock()
|
||||
// Force boundary condition here.
|
||||
o.asflr = lseq + 2
|
||||
o.mu.Unlock()
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Make sure server still available.
|
||||
if _, err := js.StreamInfo("ORDERS"); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Simple JetStream Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user