From 7e00a975b1a6a6e5ca984dee9f50943e70e8ba63 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 15 Oct 2019 17:41:12 -0700 Subject: [PATCH] Add purge and interest retention tests Signed-off-by: Derek Collison --- server/memstore.go | 15 +++- server/msgset.go | 78 ++++++++++++++++-- server/observable.go | 124 ++++++++++++++--------------- server/store.go | 1 + test/jetstream_test.go | 174 +++++++++++++++++++++++++++++++++++++++-- 5 files changed, 313 insertions(+), 79 deletions(-) diff --git a/server/memstore.go b/server/memstore.go index e5a2a976..15230ee7 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -157,9 +157,22 @@ func (ms *memStore) expireMsgs() { } } +// Purge will remove all messages from this store. +// Will return the number of purged messages. +func (ms *memStore) Purge() uint64 { + ms.mu.Lock() + defer ms.mu.Unlock() + purged := uint64(len(ms.msgs)) + ms.stats.FirstSeq = ms.stats.LastSeq + 1 + ms.stats.Bytes = 0 + ms.stats.Msgs = 0 + ms.msgs = make(map[uint64]*storedMsg) + return purged +} + func (ms *memStore) deleteFirstMsgOrPanic() { if !ms.deleteFirstMsg() { - panic("jetstream memstore has inconsistent state, can't find firstSeq msg") + panic("jetstream memstore has inconsistent state, can't find first seq msg") } } diff --git a/server/msgset.go b/server/msgset.go index e54d70e3..4abfedbd 100644 --- a/server/msgset.go +++ b/server/msgset.go @@ -42,7 +42,7 @@ const ( // StreamPolicy (default) means that messages are retained until any possible given limit is reached. // This could be any one of MaxMsgs, MaxBytes, or MaxAge. StreamPolicy RetentionPolicy = iota - // InterestPolicy specifies that when all known subscribers have acknowledged a message it can be removed. + // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed. InterestPolicy // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed. WorkQueuePolicy @@ -143,8 +143,9 @@ func (mset *MsgSet) Delete() error { return mset.delete() } -func (mset *MsgSet) Purge() error { - return fmt.Errorf("NO IMPL") +// Purge will remove all messages from the message set and underlying store. +func (mset *MsgSet) Purge() uint64 { + return mset.store.Purge() } // Will create internal subscriptions for the msgSet. @@ -397,7 +398,72 @@ func (mset *MsgSet) partitionUnique(partition string) bool { return true } -// ackMsg is called into from an observable when we have a WorkQueue retention policy. -func (mset *MsgSet) ackMsg(seq uint64) { - mset.store.RemoveMsg(seq) +// ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy. +func (mset *MsgSet) ackMsg(obs *Observable, seq uint64) { + switch mset.config.Retention { + case StreamPolicy: + return + case WorkQueuePolicy: + mset.store.RemoveMsg(seq) + case InterestPolicy: + var needAck bool + mset.mu.Lock() + for _, o := range mset.obs { + if o != obs && o.needAck(seq) { + needAck = true + break + } + } + mset.mu.Unlock() + if !needAck { + mset.store.RemoveMsg(seq) + } + } +} + +// Checks to see if there is registered interest in the delivery subject. +// Note that since we require delivery to be a literal this is just like +// a publish match. +func (mset *MsgSet) noInterest(delivery string) bool { + var c *client + var acc *Account + + mset.mu.Lock() + if mset.client != nil { + c = mset.client + acc = c.acc + } + mset.mu.Unlock() + if acc == nil { + return true + } + r := acc.sl.Match(delivery) + interest := len(r.psubs)+len(r.qsubs) > 0 + + // Check for service imports here. + if !interest && acc.imports.services != nil { + acc.mu.RLock() + si := acc.imports.services[delivery] + invalid := si != nil && si.invalid + acc.mu.RUnlock() + if si != nil && !invalid && si.acc != nil && si.acc.sl != nil { + rr := si.acc.sl.Match(si.to) + interest = len(rr.psubs)+len(rr.qsubs) > 0 + } + } + // Process GWs here. This is not going to exact since it could be that the GW does not + // know yet, but that is ok for here. + if !interest && (c != nil && c.srv != nil && c.srv.gateway.enabled) { + gw := c.srv.gateway + gw.RLock() + for _, gwc := range gw.outo { + psi, qr := gwc.gatewayInterest(acc.Name, delivery) + if psi || qr != nil { + interest = true + break + } + } + gw.RUnlock() + } + return !interest } diff --git a/server/observable.go b/server/observable.go index db7fda3b..5cd9bac8 100644 --- a/server/observable.go +++ b/server/observable.go @@ -67,8 +67,8 @@ type Observable struct { name string sseq uint64 dseq uint64 - aflr uint64 - soff uint64 + adflr uint64 + asflr uint64 dsubj string reqSub *subscription ackSub *subscription @@ -129,6 +129,9 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) if !mset.validPartition(config.Partition) { return nil, fmt.Errorf("observable partition not a valid subset of the interest subjects") } + if config.AckPolicy == AckAll { + return nil, fmt.Errorf("observable with partition can not have an ack policy of ack all") + } } // Check on start position conflicts. @@ -257,8 +260,8 @@ func (o *Observable) updateDelivery(newDelivery string) { o.dsubj = newDelivery o.config.Delivery = newDelivery // FIXME(dlc) - check partitions, think we need offset. - o.dseq = o.aflr - o.sseq = o.aflr + o.dseq = o.adflr + o.sseq = o.asflr o.mu.Unlock() o.checkActive() @@ -302,7 +305,7 @@ func (o *Observable) processNak(sseq, dseq uint64) { var mset *MsgSet o.mu.Lock() // Check for out of range. - if dseq <= o.aflr || dseq > o.dseq { + if dseq <= o.adflr || dseq > o.dseq { o.mu.Unlock() return } @@ -326,26 +329,63 @@ func (o *Observable) processNak(sseq, dseq uint64) { // Process an ack for a message. func (o *Observable) ackMsg(sseq, dseq uint64) { + var sagap uint64 o.mu.Lock() switch o.config.AckPolicy { - case AckNone, AckAll: - o.aflr = dseq - // FIXME(dlc) - delete rdc entries? case AckExplicit: delete(o.pending, sseq) - if dseq == o.aflr+1 { - o.aflr++ + if dseq == o.adflr+1 { + o.adflr = dseq + o.asflr = sseq } delete(o.rdc, sseq) + case AckAll: + // no-op + if dseq <= o.adflr || sseq <= o.asflr { + o.mu.Unlock() + return + } + sagap = sseq - o.asflr + o.adflr = dseq + o.asflr = sseq + // FIXME(dlc) - delete rdc entries? + case AckNone: + // FIXME(dlc) - This is error but do we care? + o.mu.Unlock() + return } mset := o.mset o.mu.Unlock() - if mset != nil && mset.config.Retention == WorkQueuePolicy { - mset.ackMsg(sseq) + // Let the owning message set know if we are interest or workqueue retention based. + if mset != nil && mset.config.Retention != StreamPolicy { + if sagap > 1 { + // FIXME(dlc) - This is very inefficient, will need to fix. + for seq := sseq; seq > sseq-sagap; seq-- { + mset.ackMsg(o, seq) + } + } else { + mset.ackMsg(o, sseq) + } } } +// Check if we need an ack for this store seq. +func (o *Observable) needAck(sseq uint64) bool { + var na bool + o.mu.Lock() + switch o.config.AckPolicy { + case AckNone, AckAll: + na = sseq > o.asflr + case AckExplicit: + if sseq > o.asflr && len(o.pending) > 0 { + _, na = o.pending[sseq] + } + } + o.mu.Unlock() + return na +} + // Default is 1 if msg is nil. func batchSizeFromMsg(msg []byte) int { bs := 1 @@ -494,7 +534,10 @@ func (o *Observable) ackReply(sseq, dseq, dcount uint64) string { // Lock should be held and o.mset validated to be non-nil. func (o *Observable) deliverMsg(dsubj, subj string, msg []byte, seq, dcount uint64) { o.mset.sendq <- &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount), msg, o, o.dseq} - if o.config.AckPolicy == AckExplicit { + if o.config.AckPolicy == AckNone { + o.adflr = o.dseq + o.asflr = seq + } else if o.config.AckPolicy == AckExplicit { o.trackPending(seq) } o.dseq++ @@ -687,9 +730,10 @@ func (o *Observable) selectStartingSeqNo() { } // Always set delivery sequence to 1. o.dseq = 1 - o.soff = o.sseq - 1 - // Set ack floor to delivery - 1 - o.aflr = o.dseq - 1 + // Set ack delivery floor to delivery-1 + o.adflr = o.dseq - 1 + // Set ack store floor to store-1 + o.asflr = o.sseq - 1 } // Test whether a config represents a durable subscriber. @@ -783,54 +827,6 @@ func (o *Observable) Delete() error { return nil } -// Checks to see if there is registered interest in the delivery subject. -// Note that since we require delivery to be a literal this is just like -// a publish match. -func (mset *MsgSet) noInterest(delivery string) bool { - var c *client - var acc *Account - - mset.mu.Lock() - if mset.client != nil { - c = mset.client - acc = c.acc - } - mset.mu.Unlock() - if acc == nil { - return true - } - r := acc.sl.Match(delivery) - interest := len(r.psubs)+len(r.qsubs) > 0 - - // Check for service imports here. - if !interest && acc.imports.services != nil { - acc.mu.RLock() - si := acc.imports.services[delivery] - invalid := si != nil && si.invalid - acc.mu.RUnlock() - if si != nil && !invalid && si.acc != nil && si.acc.sl != nil { - rr := si.acc.sl.Match(si.to) - interest = len(rr.psubs)+len(rr.qsubs) > 0 - } - } - // Process GWs here. This is not going to exact since it could be that the GW does not - // know, that is ok for here. - // TODO(@@IK) to check. - if !interest && (c != nil && c.srv != nil && c.srv.gateway.enabled) { - gw := c.srv.gateway - gw.RLock() - for _, gwc := range gw.outo { - psi, qr := gwc.gatewayInterest(acc.Name, delivery) - if psi || qr != nil { - interest = true - break - } - } - gw.RUnlock() - } - return !interest -} - // Check that we do not form a cycle by delivering to a delivery subject // that is part of the interest group. func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool { diff --git a/server/store.go b/server/store.go index 44285c3d..4ff995cb 100644 --- a/server/store.go +++ b/server/store.go @@ -32,6 +32,7 @@ type MsgSetStore interface { StoreMsg(subj string, msg []byte) (uint64, error) Lookup(seq uint64) (subj string, msg []byte, ts int64, err error) RemoveMsg(seq uint64) bool + Purge() uint64 GetSeqFromTime(t time.Time) uint64 Stats() MsgSetStats } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 17b63147..f53694ae 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -304,13 +304,23 @@ func TestJetStreamCreateObservable(t *testing.T) { t.Fatalf("Expected an error on unsubscribed delivery subject") } - // This should work.. nc := clientConnectToServer(t, s) defer nc.Close() sub, _ := nc.SubscribeSync(delivery) defer sub.Unsubscribe() nc.Flush() + // Partitions can not be AckAll. + if _, err := mset.AddObservable(&server.ObservableConfig{ + Delivery: delivery, + DeliverAll: true, + Partition: "foo", + AckPolicy: server.AckAll, + }); err == nil { + t.Fatalf("Expected an error on partitioned observable with ack policy of all") + } + + // This should work.. o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery}) if err != nil { t.Fatalf("Expected no error with registered interest, got %v", err) @@ -793,7 +803,7 @@ func TestJetStreamWorkQueueRequestBatch(t *testing.T) { }) } -func TestJetStreamWorkQueueMsgSet(t *testing.T) { +func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -1636,17 +1646,12 @@ func TestJetStreamCanNotNakAckd(t *testing.T) { // Fake these for now. ackReplyT := "$JS.A.DC.WQ.1.%d.%d" - nak := func(seq int) { + checkBadNak := func(seq int) { t.Helper() if err := nc.Publish(fmt.Sprintf(ackReplyT, seq, seq), server.AckNak); err != nil { t.Fatalf("Error sending nak: %v", err) } nc.Flush() - } - - checkBadNak := func(seq int) { - t.Helper() - nak(seq) if _, err := nc.Request(reqNextMsgSubj, nil, 10*time.Millisecond); err != nats.ErrTimeout { t.Fatalf("Did not expect new delivery on nak of %d", seq) } @@ -1661,3 +1666,156 @@ func TestJetStreamCanNotNakAckd(t *testing.T) { // Now check we can not nak something we do not have. checkBadNak(22) } + +func TestJetStreamMsgSetPurge(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "DC"}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Send 100 msgs + for i := 0; i < 100; i++ { + nc.Publish("DC", []byte("OK!")) + } + nc.Flush() + if stats := mset.Stats(); stats.Msgs != 100 { + t.Fatalf("Expected %d messages, got %d", 100, stats.Msgs) + } + mset.Purge() + if stats := mset.Stats(); stats.Msgs != 0 { + t.Fatalf("Expected %d messages, got %d", 0, stats.Msgs) + } +} + +func TestJetStreamInterestRetentionMsgSet(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "DC", Retention: server.InterestPolicy}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Send 100 msgs + totalMsgs := 10 + + for i := 0; i < totalMsgs; i++ { + nc.Publish("DC", []byte("OK!")) + } + nc.Flush() + + checkNumMsgs := func(numExpected int) { + t.Helper() + if stats := mset.Stats(); stats.Msgs != uint64(numExpected) { + t.Fatalf("Expected %d messages, got %d", numExpected, stats.Msgs) + } + } + + checkNumMsgs(totalMsgs) + + syncSub := func() *nats.Subscription { + sub, _ := nc.SubscribeSync(nats.NewInbox()) + nc.Flush() + return sub + } + + // Now create three observables. + // 1. AckExplicit + // 2. AckAll + // 3. AckNone + + sub1 := syncSub() + mset.AddObservable(&server.ObservableConfig{Delivery: sub1.Subject, DeliverAll: true, AckPolicy: server.AckExplicit}) + + sub2 := syncSub() + mset.AddObservable(&server.ObservableConfig{Delivery: sub2.Subject, DeliverAll: true, AckPolicy: server.AckAll}) + + sub3 := syncSub() + mset.AddObservable(&server.ObservableConfig{Delivery: sub3.Subject, DeliverAll: true, AckPolicy: server.AckNone}) + + // Wait for all messsages to be pending for each sub. + for _, sub := range []*nats.Subscription{sub1, sub2, sub3} { + checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != totalMsgs { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, totalMsgs) + } + return nil + }) + } + + getAndAck := func(sub *nats.Subscription) { + t.Helper() + if m, err := sub.NextMsg(time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else { + m.Respond(nil) + } + nc.Flush() + } + + // Ack evens for the explicit ack sub. + var odds []*nats.Msg + for i := 1; i <= totalMsgs; i++ { + if m, err := sub1.NextMsg(time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if i%2 == 0 { + m.Respond(nil) // Ack evens. + } else { + odds = append(odds, m) + } + } + nc.Flush() + + checkNumMsgs(totalMsgs) + + // Now ack first for AckAll sub2 + getAndAck(sub2) + + // We should be at the same number since we acked 1, explicit acked 2 + checkNumMsgs(totalMsgs) + + // Now ack second for AckAll sub2 + getAndAck(sub2) + + // We should now have 1 removed. + checkNumMsgs(totalMsgs - 1) + + // Now ack third for AckAll sub2 + getAndAck(sub2) + + // We should still only have 1 removed. + checkNumMsgs(totalMsgs - 1) + + // Now ack odds from explicit. + for _, m := range odds { + m.Respond(nil) // Ack + } + nc.Flush() + + // we should have 1, 2, 3 acks now. + checkNumMsgs(totalMsgs - 3) + + // Now ack last ackall message. This should clear all of them. + for i := 4; i <= totalMsgs; i++ { + if m, err := sub2.NextMsg(time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if i == totalMsgs { + m.Respond(nil) + } + } + nc.Flush() + + // Should be zero now. + checkNumMsgs(0) +}