diff --git a/server/consumer.go b/server/consumer.go index aa9fdda3..70a0fb43 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -55,6 +55,8 @@ type CreateConsumerRequest struct { Config ConsumerConfig `json:"config"` } +// ConsumerAckMetric is a metric published when a user acknowledges a message, the +// number of these that will be published is dependant on SampleFrequency type ConsumerAckMetric struct { Schema string `json:"schema"` ID string `json:"id"` @@ -67,7 +69,9 @@ type ConsumerAckMetric struct { Deliveries uint64 `json:"deliveries"` } -type ConsumerDeliveryExceededNotification struct { +// ConsumerDeliveryExceededAdvisory is an advisory informing that a message hit +// its MaxDeliver threshold and so might be a candidate for DLQ handling +type ConsumerDeliveryExceededAdvisory struct { Schema string `json:"schema"` ID string `json:"id"` Time int64 `json:"timestamp"` @@ -297,7 +301,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { // already under lock, mset.Name() would deadlock o.streamName = mset.config.Name o.ackEventT = JetStreamMetricConsumerAckPre + "." + o.streamName + "." + o.name - o.deliveryExcEventT = JetStreamEventConsumerMaxDeliveryExceedPre + "." + o.streamName + "." + o.name + o.deliveryExcEventT = JetStreamAdvisoryConsumerMaxDeliveryExceedPre + "." + o.streamName + "." + o.name store, err := mset.store.ConsumerStore(o.name, config) if err != nil { @@ -829,8 +833,8 @@ func (o *Consumer) incDeliveryCount(sseq uint64) uint64 { } func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) { - e := &ConsumerDeliveryExceededNotification{ - Schema: "io.nats.jetstream.notification.v1.max_deliver", + e := &ConsumerDeliveryExceededAdvisory{ + Schema: "io.nats.jetstream.advisory.v1.max_deliver", ID: nuid.Next(), Time: time.Now().UnixNano(), Stream: o.streamName, diff --git a/server/jetstream.go b/server/jetstream.go index 10335ccc..25696767 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -139,8 +139,8 @@ const ( // JetStreamMsgBySeqPre is the prefix for direct requests for a message by its stream sequence number. JetStreamMsgBySeqPre = "$JS.BYSEQ" - // JetStreamNotificationPrefix is a prefix for all JetStream notification - JetStreamNotificationPrefix = "$JS.EVENT.NOTIFICATION" + // JetStreamAdvisoryPrefix is a prefix for all JetStream advisories + JetStreamAdvisoryPrefix = "$JS.EVENT.ADVISORY" // JetStreamMetricPrefix is a prefix for all JetStream metrics JetStreamMetricPrefix = "$JS.EVENT.METRIC" @@ -148,8 +148,8 @@ const ( // JetStreamMetricConsumerAckPre is a metric containing ack latency JetStreamMetricConsumerAckPre = JetStreamMetricPrefix + ".CONSUMER_ACK" - // JetStreamEventConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold - JetStreamEventConsumerMaxDeliveryExceedPre = JetStreamNotificationPrefix + ".MAX_DELIVERIES" + // JetStreamAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold + JetStreamAdvisoryConsumerMaxDeliveryExceedPre = JetStreamAdvisoryPrefix + ".MAX_DELIVERIES" ) // This is for internal accounting for JetStream for this server.