From e31001a782754f3a92d0da548b724bbd79659c3e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 11 Nov 2020 12:30:51 -0800 Subject: [PATCH] Remove conditional and broadcast for signalling consumers Signed-off-by: Derek Collison --- server/consumer.go | 80 +++++++++++++++++++++--------------------- server/stream.go | 37 +------------------ test/jetstream_test.go | 1 + 3 files changed, 42 insertions(+), 76 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ca32c49a..2de058e2 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -191,6 +191,7 @@ type Consumer struct { filterWC bool dtmr *time.Timer dthresh time.Duration + mch chan struct{} qch chan struct{} inch chan bool sfreq int32 @@ -384,6 +385,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { dsubj: config.DeliverSubject, active: true, qch: make(chan struct{}), + mch: make(chan struct{}, 1), sfreq: int32(sampleFreq), maxdc: uint64(config.MaxDeliver), created: time.Now().UTC(), @@ -510,7 +512,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { // If push mode, register for notifications on interest. if o.isPushMode() { o.dthresh = JsDeleteWaitTimeDefault - o.inch = make(chan bool, 4) + o.inch = make(chan bool, 8) a.sl.RegisterNotification(config.DeliverSubject, o.inch) o.active = o.hasDeliveryInterest(<-o.inch) // Check if we are not durable that the delivery subject has interest. @@ -647,12 +649,16 @@ func (o *Consumer) updateDeliveryInterest(localInterest bool) { interest := o.hasDeliveryInterest(localInterest) o.mu.Lock() + defer o.mu.Unlock() + mset := o.mset if mset == nil || o.isPullMode() { - o.mu.Unlock() return } - shouldSignal := interest && !o.active + + if interest && !o.active { + o.signalNewMessages() + } o.active = interest // Stop and clear the delete timer always. @@ -663,11 +669,6 @@ func (o *Consumer) updateDeliveryInterest(localInterest bool) { if !o.isDurable() && !interest { o.dtmr = time.AfterFunc(o.dthresh, func() { o.Delete() }) } - o.mu.Unlock() - - if shouldSignal { - mset.signalConsumers() - } } // Config returns the consumer's configuration. @@ -780,29 +781,25 @@ func (o *Consumer) progressUpdate(seq uint64) { // Process a NAK. func (o *Consumer) processNak(sseq, dseq uint64) { - var mset *Stream o.mu.Lock() + defer o.mu.Unlock() + // Check for out of range. if dseq <= o.adflr || dseq > o.dseq { - o.mu.Unlock() return } // If we are explicit ack make sure this is still on our pending list. if len(o.pending) > 0 { if _, ok := o.pending[sseq]; !ok { - o.mu.Unlock() return } } // If already queued up also ignore. if !o.onRedeliverQueue(sseq) { o.rdq = append(o.rdq, sseq) - mset = o.mset - } - o.mu.Unlock() - if mset != nil { - mset.signalConsumers() } + + o.signalNewMessages() } // Process a TERM @@ -941,6 +938,15 @@ func (o *Consumer) Info() *ConsumerInfo { return info } +// Will signal us that new messages are available. Will break out of waiting. +func (o *Consumer) signalNewMessages() { + // Kick our new message channel + select { + case o.mch <- struct{}{}: + default: + } +} + // shouldSample lets us know if we are sampling metrics on acks. func (o *Consumer) shouldSample() bool { switch { @@ -1238,7 +1244,7 @@ func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string if o.replay { o.waiting.add(&wr) o.mu.Unlock() - mset.signalConsumers() + o.signalNewMessages() return } @@ -1420,7 +1426,6 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { // Deliver all the msgs we have now, once done or on a condition, we wait for new ones. for { var ( - mset *Stream seq, dcnt uint64 subj, dsubj string hdr []byte @@ -1436,7 +1441,6 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { o.mu.Unlock() return } - mset = o.mset // If we are in push mode and not active let's stop sending. if o.isPushMode() && !o.active { @@ -1510,9 +1514,17 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { if o.replay && o.sseq > lseq { o.replay = false } + // We will wait here for new messages to arrive. + mch := o.mch + qch := o.qch o.mu.Unlock() - mset.waitForMsgs() + + select { + case <-qch: + return + case <-mch: + } } } @@ -1635,7 +1647,6 @@ func (o *Consumer) didNotDeliver(seq uint64) { return } - shouldSignal := false if o.isPushMode() { o.active = false } else if o.pending != nil { @@ -1646,14 +1657,11 @@ func (o *Consumer) didNotDeliver(seq uint64) { // we know it was not delivered. if !o.onRedeliverQueue(seq) { o.rdq = append(o.rdq, seq) - shouldSignal = true + o.signalNewMessages() } } } o.mu.Unlock() - if shouldSignal { - mset.signalConsumers() - } } // This checks if we already have this sequence queued for redelivery. @@ -1683,15 +1691,15 @@ func (o *Consumer) removeFromRedeliverQueue(seq uint64) bool { // Checks the pending messages. func (o *Consumer) checkPending() { o.mu.Lock() + defer o.mu.Unlock() + mset := o.mset if mset == nil { - o.mu.Unlock() return } ttl := int64(o.config.AckWait) next := int64(o.ackWait(0)) now := time.Now().UnixNano() - shouldSignal := false // Since we can update timestamps, we have to review all pending. // We may want to unlock here or warn if list is big. @@ -1701,7 +1709,7 @@ func (o *Consumer) checkPending() { if elapsed >= ttl { if !o.onRedeliverQueue(seq) { expired = append(expired, seq) - shouldSignal = true + o.signalNewMessages() } } else if ttl-elapsed < next { // Update when we should fire next. @@ -1729,11 +1737,6 @@ func (o *Consumer) checkPending() { o.ptmr.Stop() o.ptmr = nil } - o.mu.Unlock() - - if shouldSignal { - mset.signalConsumers() - } } // SeqFromReply will extract a sequence number from a reply subject. @@ -1965,6 +1968,10 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { stopAndClearTimer(&o.dtmr) delivery := o.config.DeliverSubject o.waiting = nil + // Break us out of the readLoop. + if doSignal { + o.signalNewMessages() + } o.mu.Unlock() if delivery != "" { @@ -1972,13 +1979,6 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { } mset.mu.Lock() - // Break us out of the readLoop. - // TODO(dlc) - Should not be bad for small amounts of observables, maybe - // even into thousands. Above that should check what this might do - // performance wise. - if doSignal { - mset.sg.Broadcast() - } mset.unsubscribe(ackSub) mset.unsubscribe(reqSub) mset.deleteConsumer(o) diff --git a/server/stream.go b/server/stream.go index 743345ea..a4aaa3ec 100644 --- a/server/stream.go +++ b/server/stream.go @@ -71,8 +71,6 @@ type StreamInfo struct { // for a Stream we will direct link from the client to this Stream structure. type Stream struct { mu sync.RWMutex - sg *sync.Cond - sgw int jsa *jsAccount client *client sid int @@ -157,7 +155,6 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo // Setup the internal client. c := s.createInternalJetStreamClient() mset := &Stream{jsa: jsa, config: cfg, client: c, consumers: make(map[string]*Consumer)} - mset.sg = sync.NewCond(&mset.mu) jsa.streams[cfg.Name] = mset storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name) @@ -1000,7 +997,6 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj } if err == nil && seq > 0 && numConsumers > 0 { - var needSignal bool var _obs [4]*Consumer obs := _obs[:0] @@ -1013,25 +1009,12 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj for _, o := range obs { o.incStreamPending(seq, subject) if !o.deliverCurrentMsg(subject, hdr, msg, seq, ts) { - needSignal = true + o.signalNewMessages() } } - - if needSignal { - mset.signalConsumers() - } } } -// Will signal all waiting consumers. -func (mset *Stream) signalConsumers() { - mset.mu.Lock() - if mset.sgw > 0 { - mset.sg.Broadcast() - } - mset.mu.Unlock() -} - // Internal message for use by jetstream subsystem. type jsPubMsg struct { subj string @@ -1156,9 +1139,7 @@ func (mset *Stream) stop(delete bool) error { o.stop(delete, false, delete) } - // Make sure we release all consumers here at once. mset.mu.Lock() - mset.sg.Broadcast() // Send stream delete advisory after the consumers. if delete { @@ -1270,22 +1251,6 @@ func (mset *Stream) State() StreamState { return mset.store.State() } -// waitForMsgs will have the stream wait for the arrival of new messages. -func (mset *Stream) waitForMsgs() { - mset.mu.Lock() - - if mset.client == nil { - mset.mu.Unlock() - return - } - - mset.sgw++ - mset.sg.Wait() - mset.sgw-- - - mset.mu.Unlock() -} - // Determines if the new proposed partition is unique amongst all observables. // Lock should be held. func (mset *Stream) partitionUnique(partition string) bool { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index ee80539a..77be0edf 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -2665,6 +2665,7 @@ func TestJetStreamAckReplyStreamPendingWithAcks(t *testing.T) { sub, _ := nc.SubscribeSync(dsubj) defer sub.Unsubscribe() + checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error { if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)