From 86a64fbc46b6ce3da1a23b736b691b0319cdf8cb Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 9 Mar 2021 09:46:28 -0800 Subject: [PATCH] Updates to JS consumer errors Signed-off-by: Waldemar Quevedo --- server/consumer.go | 16 ++++++++-------- server/filestore_test.go | 2 +- server/store.go | 2 +- server/stream.go | 8 ++++---- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 3d56bdab..05cd6991 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -226,10 +226,10 @@ type consumer struct { } const ( - // JsAckWaitDefault is the default AckWait, only applicable on explicit ack policy observables. + // JsAckWaitDefault is the default AckWait, only applicable on explicit ack policy consumers. JsAckWaitDefault = 30 * time.Second // JsDeleteWaitTimeDefault is the default amount of time we will wait for non-durable - // observables to be in an inactive state before deleting them. + // consumers to be in an inactive state before deleting them. JsDeleteWaitTimeDefault = 5 * time.Second // JsFlowControlMaxPending specifies default pending bytes during flow control that can be // outstanding. @@ -424,7 +424,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if len(mset.consumers) > 0 { if config.FilterSubject == _EMPTY_ { mset.mu.Unlock() - return nil, fmt.Errorf("multiple non-filtered observables not allowed on workqueue stream") + return nil, fmt.Errorf("multiple non-filtered consumers not allowed on workqueue stream") } else if !mset.partitionUnique(config.FilterSubject) { // We have a partition but it is not unique amongst the others. mset.mu.Unlock() @@ -514,7 +514,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if err != nil { mset.mu.Unlock() o.deleteWithoutAdvisory() - return nil, fmt.Errorf("error creating store for observable: %v", err) + return nil, fmt.Errorf("error creating store for consumer: %v", err) } o.store = store @@ -554,7 +554,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return eo, nil } - // Set up the ack subscription for this observable. Will use wildcard for all acks. + // Set up the ack subscription for this consumer. Will use wildcard for all acks. // We will remember the template to generate replies with sequence numbers and use // that to scanf them back in. mn := mset.cfg.Name @@ -2360,7 +2360,7 @@ func ackReplyInfo(subject string) (sseq, dseq, dc uint64) { return sseq, dseq, dc } -// NextSeq returns the next delivered sequence number for this observable. +// NextSeq returns the next delivered sequence number for this consumer. func (o *consumer) nextSeq() uint64 { o.mu.Lock() dseq := o.dseq @@ -2447,7 +2447,7 @@ func (o *consumer) isPullMode() bool { return o.cfg.DeliverSubject == _EMPTY_ } -// Name returns the name of this observable. +// Name returns the name of this consumer. func (o *consumer) String() string { o.mu.RLock() n := o.name @@ -2670,7 +2670,7 @@ func (mset *stream) validSubject(partitionSubject string) bool { } // SetInActiveDeleteThreshold sets the delete threshold for how long to wait -// before deleting an inactive ephemeral observable. +// before deleting an inactive ephemeral consumer. func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error { o.mu.Lock() defer o.mu.Unlock() diff --git a/server/filestore_test.go b/server/filestore_test.go index 43498c1c..62370aa2 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1302,7 +1302,7 @@ func TestFileStoreMeta(t *testing.T) { t.Fatalf("Checksums do not match, got %q vs %q", mychecksum, checksum) } - // Now create an observable. Same deal for them. + // Now create a consumer. Same deal for them. oconfig := ConsumerConfig{ DeliverSubject: "d", FilterSubject: "foo", diff --git a/server/store.go b/server/store.go index 33059c0d..0ecae7e8 100644 --- a/server/store.go +++ b/server/store.go @@ -92,7 +92,7 @@ const ( // LimitsPolicy (default) means that messages are retained until any given limit is reached. // This could be one of MaxMsgs, MaxBytes, or MaxAge. LimitsPolicy RetentionPolicy = iota - // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed. + // InterestPolicy specifies that when all known consumers have acknowledged a message it can be removed. InterestPolicy // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed. WorkQueuePolicy diff --git a/server/stream.go b/server/stream.go index 2db0c62d..2192d02b 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2473,8 +2473,8 @@ func (mset *stream) internalLoop() { didDeliver, _ := c.processInboundClientMsg(msg) c.pa.szb = nil - // Check to see if this is a delivery for an observable and - // we failed to deliver the message. If so alert the observable. + // Check to see if this is a delivery for a consumer and + // we failed to deliver the message. If so alert the consumer. if pm.o != nil && pm.seq > 0 && !didDeliver { pm.o.didNotDeliver(pm.seq) } @@ -2635,7 +2635,7 @@ func (mset *stream) getConsumers() []*consumer { return obs } -// NumConsumers reports on number of active observables for this stream. +// NumConsumers reports on number of active consumers for this stream. func (mset *stream) numConsumers() int { mset.mu.Lock() defer mset.mu.Unlock() @@ -2675,7 +2675,7 @@ func (mset *stream) state() StreamState { return store.State() } -// Determines if the new proposed partition is unique amongst all observables. +// Determines if the new proposed partition is unique amongst all consumers. // Lock should be held. func (mset *stream) partitionUnique(partition string) bool { for _, o := range mset.consumers {