mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
Merge pull request #1702 from nats-io/split
Remove conditional and broadcast for signalling consumers
This commit is contained in:
@@ -191,6 +191,7 @@ type Consumer struct {
|
||||
filterWC bool
|
||||
dtmr *time.Timer
|
||||
dthresh time.Duration
|
||||
mch chan struct{}
|
||||
qch chan struct{}
|
||||
inch chan bool
|
||||
sfreq int32
|
||||
@@ -384,6 +385,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
|
||||
dsubj: config.DeliverSubject,
|
||||
active: true,
|
||||
qch: make(chan struct{}),
|
||||
mch: make(chan struct{}, 1),
|
||||
sfreq: int32(sampleFreq),
|
||||
maxdc: uint64(config.MaxDeliver),
|
||||
created: time.Now().UTC(),
|
||||
@@ -510,7 +512,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
|
||||
// If push mode, register for notifications on interest.
|
||||
if o.isPushMode() {
|
||||
o.dthresh = JsDeleteWaitTimeDefault
|
||||
o.inch = make(chan bool, 4)
|
||||
o.inch = make(chan bool, 8)
|
||||
a.sl.RegisterNotification(config.DeliverSubject, o.inch)
|
||||
o.active = o.hasDeliveryInterest(<-o.inch)
|
||||
// Check if we are not durable that the delivery subject has interest.
|
||||
@@ -647,12 +649,16 @@ func (o *Consumer) updateDeliveryInterest(localInterest bool) {
|
||||
interest := o.hasDeliveryInterest(localInterest)
|
||||
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
mset := o.mset
|
||||
if mset == nil || o.isPullMode() {
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
shouldSignal := interest && !o.active
|
||||
|
||||
if interest && !o.active {
|
||||
o.signalNewMessages()
|
||||
}
|
||||
o.active = interest
|
||||
|
||||
// Stop and clear the delete timer always.
|
||||
@@ -663,11 +669,6 @@ func (o *Consumer) updateDeliveryInterest(localInterest bool) {
|
||||
if !o.isDurable() && !interest {
|
||||
o.dtmr = time.AfterFunc(o.dthresh, func() { o.Delete() })
|
||||
}
|
||||
o.mu.Unlock()
|
||||
|
||||
if shouldSignal {
|
||||
mset.signalConsumers()
|
||||
}
|
||||
}
|
||||
|
||||
// Config returns the consumer's configuration.
|
||||
@@ -780,29 +781,25 @@ func (o *Consumer) progressUpdate(seq uint64) {
|
||||
|
||||
// Process a NAK.
|
||||
func (o *Consumer) processNak(sseq, dseq uint64) {
|
||||
var mset *Stream
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
// Check for out of range.
|
||||
if dseq <= o.adflr || dseq > o.dseq {
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// If we are explicit ack make sure this is still on our pending list.
|
||||
if len(o.pending) > 0 {
|
||||
if _, ok := o.pending[sseq]; !ok {
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
// If already queued up also ignore.
|
||||
if !o.onRedeliverQueue(sseq) {
|
||||
o.rdq = append(o.rdq, sseq)
|
||||
mset = o.mset
|
||||
}
|
||||
o.mu.Unlock()
|
||||
if mset != nil {
|
||||
mset.signalConsumers()
|
||||
}
|
||||
|
||||
o.signalNewMessages()
|
||||
}
|
||||
|
||||
// Process a TERM
|
||||
@@ -941,6 +938,15 @@ func (o *Consumer) Info() *ConsumerInfo {
|
||||
return info
|
||||
}
|
||||
|
||||
// Will signal us that new messages are available. Will break out of waiting.
|
||||
func (o *Consumer) signalNewMessages() {
|
||||
// Kick our new message channel
|
||||
select {
|
||||
case o.mch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// shouldSample lets us know if we are sampling metrics on acks.
|
||||
func (o *Consumer) shouldSample() bool {
|
||||
switch {
|
||||
@@ -1238,7 +1244,7 @@ func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
|
||||
if o.replay {
|
||||
o.waiting.add(&wr)
|
||||
o.mu.Unlock()
|
||||
mset.signalConsumers()
|
||||
o.signalNewMessages()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1420,7 +1426,6 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
|
||||
for {
|
||||
var (
|
||||
mset *Stream
|
||||
seq, dcnt uint64
|
||||
subj, dsubj string
|
||||
hdr []byte
|
||||
@@ -1436,7 +1441,6 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
mset = o.mset
|
||||
|
||||
// If we are in push mode and not active let's stop sending.
|
||||
if o.isPushMode() && !o.active {
|
||||
@@ -1510,9 +1514,17 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
if o.replay && o.sseq > lseq {
|
||||
o.replay = false
|
||||
}
|
||||
|
||||
// We will wait here for new messages to arrive.
|
||||
mch := o.mch
|
||||
qch := o.qch
|
||||
o.mu.Unlock()
|
||||
mset.waitForMsgs()
|
||||
|
||||
select {
|
||||
case <-qch:
|
||||
return
|
||||
case <-mch:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1635,7 +1647,6 @@ func (o *Consumer) didNotDeliver(seq uint64) {
|
||||
return
|
||||
}
|
||||
|
||||
shouldSignal := false
|
||||
if o.isPushMode() {
|
||||
o.active = false
|
||||
} else if o.pending != nil {
|
||||
@@ -1646,14 +1657,11 @@ func (o *Consumer) didNotDeliver(seq uint64) {
|
||||
// we know it was not delivered.
|
||||
if !o.onRedeliverQueue(seq) {
|
||||
o.rdq = append(o.rdq, seq)
|
||||
shouldSignal = true
|
||||
o.signalNewMessages()
|
||||
}
|
||||
}
|
||||
}
|
||||
o.mu.Unlock()
|
||||
if shouldSignal {
|
||||
mset.signalConsumers()
|
||||
}
|
||||
}
|
||||
|
||||
// This checks if we already have this sequence queued for redelivery.
|
||||
@@ -1683,15 +1691,15 @@ func (o *Consumer) removeFromRedeliverQueue(seq uint64) bool {
|
||||
// Checks the pending messages.
|
||||
func (o *Consumer) checkPending() {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
mset := o.mset
|
||||
if mset == nil {
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ttl := int64(o.config.AckWait)
|
||||
next := int64(o.ackWait(0))
|
||||
now := time.Now().UnixNano()
|
||||
shouldSignal := false
|
||||
|
||||
// Since we can update timestamps, we have to review all pending.
|
||||
// We may want to unlock here or warn if list is big.
|
||||
@@ -1701,7 +1709,7 @@ func (o *Consumer) checkPending() {
|
||||
if elapsed >= ttl {
|
||||
if !o.onRedeliverQueue(seq) {
|
||||
expired = append(expired, seq)
|
||||
shouldSignal = true
|
||||
o.signalNewMessages()
|
||||
}
|
||||
} else if ttl-elapsed < next {
|
||||
// Update when we should fire next.
|
||||
@@ -1729,11 +1737,6 @@ func (o *Consumer) checkPending() {
|
||||
o.ptmr.Stop()
|
||||
o.ptmr = nil
|
||||
}
|
||||
o.mu.Unlock()
|
||||
|
||||
if shouldSignal {
|
||||
mset.signalConsumers()
|
||||
}
|
||||
}
|
||||
|
||||
// SeqFromReply will extract a sequence number from a reply subject.
|
||||
@@ -1965,6 +1968,10 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
stopAndClearTimer(&o.dtmr)
|
||||
delivery := o.config.DeliverSubject
|
||||
o.waiting = nil
|
||||
// Break us out of the readLoop.
|
||||
if doSignal {
|
||||
o.signalNewMessages()
|
||||
}
|
||||
o.mu.Unlock()
|
||||
|
||||
if delivery != "" {
|
||||
@@ -1972,13 +1979,6 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
}
|
||||
|
||||
mset.mu.Lock()
|
||||
// Break us out of the readLoop.
|
||||
// TODO(dlc) - Should not be bad for small amounts of observables, maybe
|
||||
// even into thousands. Above that should check what this might do
|
||||
// performance wise.
|
||||
if doSignal {
|
||||
mset.sg.Broadcast()
|
||||
}
|
||||
mset.unsubscribe(ackSub)
|
||||
mset.unsubscribe(reqSub)
|
||||
mset.deleteConsumer(o)
|
||||
|
||||
@@ -71,8 +71,6 @@ type StreamInfo struct {
|
||||
// for a Stream we will direct link from the client to this Stream structure.
|
||||
type Stream struct {
|
||||
mu sync.RWMutex
|
||||
sg *sync.Cond
|
||||
sgw int
|
||||
jsa *jsAccount
|
||||
client *client
|
||||
sid int
|
||||
@@ -157,7 +155,6 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo
|
||||
// Setup the internal client.
|
||||
c := s.createInternalJetStreamClient()
|
||||
mset := &Stream{jsa: jsa, config: cfg, client: c, consumers: make(map[string]*Consumer)}
|
||||
mset.sg = sync.NewCond(&mset.mu)
|
||||
|
||||
jsa.streams[cfg.Name] = mset
|
||||
storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
|
||||
@@ -1000,7 +997,6 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
|
||||
}
|
||||
|
||||
if err == nil && seq > 0 && numConsumers > 0 {
|
||||
var needSignal bool
|
||||
var _obs [4]*Consumer
|
||||
obs := _obs[:0]
|
||||
|
||||
@@ -1013,25 +1009,12 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
|
||||
for _, o := range obs {
|
||||
o.incStreamPending(seq, subject)
|
||||
if !o.deliverCurrentMsg(subject, hdr, msg, seq, ts) {
|
||||
needSignal = true
|
||||
o.signalNewMessages()
|
||||
}
|
||||
}
|
||||
|
||||
if needSignal {
|
||||
mset.signalConsumers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Will signal all waiting consumers.
|
||||
func (mset *Stream) signalConsumers() {
|
||||
mset.mu.Lock()
|
||||
if mset.sgw > 0 {
|
||||
mset.sg.Broadcast()
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
}
|
||||
|
||||
// Internal message for use by jetstream subsystem.
|
||||
type jsPubMsg struct {
|
||||
subj string
|
||||
@@ -1156,9 +1139,7 @@ func (mset *Stream) stop(delete bool) error {
|
||||
o.stop(delete, false, delete)
|
||||
}
|
||||
|
||||
// Make sure we release all consumers here at once.
|
||||
mset.mu.Lock()
|
||||
mset.sg.Broadcast()
|
||||
|
||||
// Send stream delete advisory after the consumers.
|
||||
if delete {
|
||||
@@ -1270,22 +1251,6 @@ func (mset *Stream) State() StreamState {
|
||||
return mset.store.State()
|
||||
}
|
||||
|
||||
// waitForMsgs will have the stream wait for the arrival of new messages.
|
||||
func (mset *Stream) waitForMsgs() {
|
||||
mset.mu.Lock()
|
||||
|
||||
if mset.client == nil {
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
mset.sgw++
|
||||
mset.sg.Wait()
|
||||
mset.sgw--
|
||||
|
||||
mset.mu.Unlock()
|
||||
}
|
||||
|
||||
// Determines if the new proposed partition is unique amongst all observables.
|
||||
// Lock should be held.
|
||||
func (mset *Stream) partitionUnique(partition string) bool {
|
||||
|
||||
@@ -2665,6 +2665,7 @@ func TestJetStreamAckReplyStreamPendingWithAcks(t *testing.T) {
|
||||
|
||||
sub, _ := nc.SubscribeSync(dsubj)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)
|
||||
|
||||
Reference in New Issue
Block a user