Merge pull request #2021 from nats-io/direct

Direct consumer updates.
This commit is contained in:
Derek Collison
2021-03-18 11:51:39 -07:00
committed by GitHub
3 changed files with 31 additions and 22 deletions

View File

@@ -237,7 +237,7 @@ const (
JsDeleteWaitTimeDefault = 5 * time.Second
// JsFlowControlMaxPending specifies default pending bytes during flow control that can be
// outstanding.
JsFlowControlMaxPending = 16 * 1024 * 1024
JsFlowControlMaxPending = 4 * 1024 * 1024
)
func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
@@ -544,13 +544,15 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name
o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name
store, err := mset.store.ConsumerStore(o.name, config)
if err != nil {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, fmt.Errorf("error creating store for consumer: %v", err)
if !config.Direct {
store, err := mset.store.ConsumerStore(o.name, config)
if err != nil {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, fmt.Errorf("error creating store for consumer: %v", err)
}
o.store = store
}
o.store = store
if !isValidName(o.name) {
mset.mu.Unlock()
@@ -604,9 +606,14 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Check in place here for interest. Will setup properly in setLeader.
r := o.acc.sl.Match(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, errNoInterest
// Directs can let the interest come to us eventually, but setup delete timer.
if config.Direct {
o.updateDeliveryInterest(false)
} else {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, errNoInterest
}
}
}
}
@@ -1163,8 +1170,10 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
n += binary.PutVarint(b[n:], ts)
o.node.Propose(b[:n])
}
// Update local state always.
o.store.UpdateDelivered(dseq, sseq, dc, ts)
if o.store != nil {
// Update local state always.
o.store.UpdateDelivered(dseq, sseq, dc, ts)
}
}
// Lock should be held.
@@ -1177,7 +1186,7 @@ func (o *consumer) updateAcks(dseq, sseq uint64) {
n += binary.PutUvarint(b[n:], dseq)
n += binary.PutUvarint(b[n:], sseq)
o.node.Propose(b[:n])
} else {
} else if o.store != nil {
o.store.UpdateAcks(dseq, sseq)
}
}
@@ -1291,7 +1300,7 @@ func (o *consumer) readStoreState() *ConsumerState {
// Sets our store state from another source. Used in clustered mode on snapshot restore.
func (o *consumer) setStoreState(state *ConsumerState) error {
if state == nil {
if state == nil || o.store == nil {
return nil
}
o.applyState(state)
@@ -1518,6 +1527,10 @@ func (o *consumer) needAck(sseq uint64) bool {
asflr, osseq = o.asflr, o.sseq
pending = o.pending
} else {
if o.store == nil {
o.mu.RUnlock()
return false
}
state, err := o.store.State()
if err != nil || state == nil {
o.mu.RUnlock()

View File

@@ -787,7 +787,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
// Set cache time to creation time to start.
ts := time.Now().UnixNano()
// Race detector wants these protected.
mb.mu.Lock()
mb.llts, mb.lwts = ts, ts
mb.mu.Unlock()
// Remember our last sequence number.
mb.first.seq = fs.state.LastSeq + 1

View File

@@ -4828,14 +4828,7 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) {
// Check that TEST(n) has 1 consumer and that S(n) is created and has 1 message.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo(test.streamName)
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Consumers != 1 {
return fmt.Errorf("Expected %q stream to have 1 consumer, got %v", test.streamName, si.State.Consumers)
}
si, err = js2.StreamInfo(test.sourceName)
si, err := js2.StreamInfo(test.sourceName)
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}