mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Updates to JS consumer errors
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
@@ -226,10 +226,10 @@ type consumer struct {
|
||||
}
|
||||
|
||||
const (
|
||||
// JsAckWaitDefault is the default AckWait, only applicable on explicit ack policy observables.
|
||||
// JsAckWaitDefault is the default AckWait, only applicable on explicit ack policy consumers.
|
||||
JsAckWaitDefault = 30 * time.Second
|
||||
// JsDeleteWaitTimeDefault is the default amount of time we will wait for non-durable
|
||||
// observables to be in an inactive state before deleting them.
|
||||
// consumers to be in an inactive state before deleting them.
|
||||
JsDeleteWaitTimeDefault = 5 * time.Second
|
||||
// JsFlowControlMaxPending specifies default pending bytes during flow control that can be
|
||||
// outstanding.
|
||||
@@ -424,7 +424,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
if len(mset.consumers) > 0 {
|
||||
if config.FilterSubject == _EMPTY_ {
|
||||
mset.mu.Unlock()
|
||||
return nil, fmt.Errorf("multiple non-filtered observables not allowed on workqueue stream")
|
||||
return nil, fmt.Errorf("multiple non-filtered consumers not allowed on workqueue stream")
|
||||
} else if !mset.partitionUnique(config.FilterSubject) {
|
||||
// We have a partition but it is not unique amongst the others.
|
||||
mset.mu.Unlock()
|
||||
@@ -514,7 +514,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
if err != nil {
|
||||
mset.mu.Unlock()
|
||||
o.deleteWithoutAdvisory()
|
||||
return nil, fmt.Errorf("error creating store for observable: %v", err)
|
||||
return nil, fmt.Errorf("error creating store for consumer: %v", err)
|
||||
}
|
||||
o.store = store
|
||||
|
||||
@@ -554,7 +554,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
return eo, nil
|
||||
}
|
||||
|
||||
// Set up the ack subscription for this observable. Will use wildcard for all acks.
|
||||
// Set up the ack subscription for this consumer. Will use wildcard for all acks.
|
||||
// We will remember the template to generate replies with sequence numbers and use
|
||||
// that to scanf them back in.
|
||||
mn := mset.cfg.Name
|
||||
@@ -2360,7 +2360,7 @@ func ackReplyInfo(subject string) (sseq, dseq, dc uint64) {
|
||||
return sseq, dseq, dc
|
||||
}
|
||||
|
||||
// NextSeq returns the next delivered sequence number for this observable.
|
||||
// NextSeq returns the next delivered sequence number for this consumer.
|
||||
func (o *consumer) nextSeq() uint64 {
|
||||
o.mu.Lock()
|
||||
dseq := o.dseq
|
||||
@@ -2447,7 +2447,7 @@ func (o *consumer) isPullMode() bool {
|
||||
return o.cfg.DeliverSubject == _EMPTY_
|
||||
}
|
||||
|
||||
// Name returns the name of this observable.
|
||||
// Name returns the name of this consumer.
|
||||
func (o *consumer) String() string {
|
||||
o.mu.RLock()
|
||||
n := o.name
|
||||
@@ -2670,7 +2670,7 @@ func (mset *stream) validSubject(partitionSubject string) bool {
|
||||
}
|
||||
|
||||
// SetInActiveDeleteThreshold sets the delete threshold for how long to wait
|
||||
// before deleting an inactive ephemeral observable.
|
||||
// before deleting an inactive ephemeral consumer.
|
||||
func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
@@ -1302,7 +1302,7 @@ func TestFileStoreMeta(t *testing.T) {
|
||||
t.Fatalf("Checksums do not match, got %q vs %q", mychecksum, checksum)
|
||||
}
|
||||
|
||||
// Now create an observable. Same deal for them.
|
||||
// Now create a consumer. Same deal for them.
|
||||
oconfig := ConsumerConfig{
|
||||
DeliverSubject: "d",
|
||||
FilterSubject: "foo",
|
||||
|
||||
@@ -92,7 +92,7 @@ const (
|
||||
// LimitsPolicy (default) means that messages are retained until any given limit is reached.
|
||||
// This could be one of MaxMsgs, MaxBytes, or MaxAge.
|
||||
LimitsPolicy RetentionPolicy = iota
|
||||
// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
|
||||
// InterestPolicy specifies that when all known consumers have acknowledged a message it can be removed.
|
||||
InterestPolicy
|
||||
// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
|
||||
WorkQueuePolicy
|
||||
|
||||
@@ -2473,8 +2473,8 @@ func (mset *stream) internalLoop() {
|
||||
didDeliver, _ := c.processInboundClientMsg(msg)
|
||||
c.pa.szb = nil
|
||||
|
||||
// Check to see if this is a delivery for an observable and
|
||||
// we failed to deliver the message. If so alert the observable.
|
||||
// Check to see if this is a delivery for a consumer and
|
||||
// we failed to deliver the message. If so alert the consumer.
|
||||
if pm.o != nil && pm.seq > 0 && !didDeliver {
|
||||
pm.o.didNotDeliver(pm.seq)
|
||||
}
|
||||
@@ -2635,7 +2635,7 @@ func (mset *stream) getConsumers() []*consumer {
|
||||
return obs
|
||||
}
|
||||
|
||||
// NumConsumers reports on number of active observables for this stream.
|
||||
// NumConsumers reports on number of active consumers for this stream.
|
||||
func (mset *stream) numConsumers() int {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
@@ -2675,7 +2675,7 @@ func (mset *stream) state() StreamState {
|
||||
return store.State()
|
||||
}
|
||||
|
||||
// Determines if the new proposed partition is unique amongst all observables.
|
||||
// Determines if the new proposed partition is unique amongst all consumers.
|
||||
// Lock should be held.
|
||||
func (mset *stream) partitionUnique(partition string) bool {
|
||||
for _, o := range mset.consumers {
|
||||
|
||||
Reference in New Issue
Block a user