use "advisory" instead of "notification"

This commit is contained in:
R.I.Pienaar
2020-01-16 17:00:28 +01:00
committed by Derek Collison
parent 200ebbd47e
commit 24939d51db
2 changed files with 12 additions and 8 deletions

View File

@@ -55,6 +55,8 @@ type CreateConsumerRequest struct {
Config ConsumerConfig `json:"config"`
}
// ConsumerAckMetric is a metric published when a user acknowledges a message, the
// number of these that will be published is dependant on SampleFrequency
type ConsumerAckMetric struct {
Schema string `json:"schema"`
ID string `json:"id"`
@@ -67,7 +69,9 @@ type ConsumerAckMetric struct {
Deliveries uint64 `json:"deliveries"`
}
type ConsumerDeliveryExceededNotification struct {
// ConsumerDeliveryExceededAdvisory is an advisory informing that a message hit
// its MaxDeliver threshold and so might be a candidate for DLQ handling
type ConsumerDeliveryExceededAdvisory struct {
Schema string `json:"schema"`
ID string `json:"id"`
Time int64 `json:"timestamp"`
@@ -297,7 +301,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
// already under lock, mset.Name() would deadlock
o.streamName = mset.config.Name
o.ackEventT = JetStreamMetricConsumerAckPre + "." + o.streamName + "." + o.name
o.deliveryExcEventT = JetStreamEventConsumerMaxDeliveryExceedPre + "." + o.streamName + "." + o.name
o.deliveryExcEventT = JetStreamAdvisoryConsumerMaxDeliveryExceedPre + "." + o.streamName + "." + o.name
store, err := mset.store.ConsumerStore(o.name, config)
if err != nil {
@@ -829,8 +833,8 @@ func (o *Consumer) incDeliveryCount(sseq uint64) uint64 {
}
func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
e := &ConsumerDeliveryExceededNotification{
Schema: "io.nats.jetstream.notification.v1.max_deliver",
e := &ConsumerDeliveryExceededAdvisory{
Schema: "io.nats.jetstream.advisory.v1.max_deliver",
ID: nuid.Next(),
Time: time.Now().UnixNano(),
Stream: o.streamName,

View File

@@ -139,8 +139,8 @@ const (
// JetStreamMsgBySeqPre is the prefix for direct requests for a message by its stream sequence number.
JetStreamMsgBySeqPre = "$JS.BYSEQ"
// JetStreamNotificationPrefix is a prefix for all JetStream notification
JetStreamNotificationPrefix = "$JS.EVENT.NOTIFICATION"
// JetStreamAdvisoryPrefix is a prefix for all JetStream advisories
JetStreamAdvisoryPrefix = "$JS.EVENT.ADVISORY"
// JetStreamMetricPrefix is a prefix for all JetStream metrics
JetStreamMetricPrefix = "$JS.EVENT.METRIC"
@@ -148,8 +148,8 @@ const (
// JetStreamMetricConsumerAckPre is a metric containing ack latency
JetStreamMetricConsumerAckPre = JetStreamMetricPrefix + ".CONSUMER_ACK"
// JetStreamEventConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold
JetStreamEventConsumerMaxDeliveryExceedPre = JetStreamNotificationPrefix + ".MAX_DELIVERIES"
// JetStreamAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold
JetStreamAdvisoryConsumerMaxDeliveryExceedPre = JetStreamAdvisoryPrefix + ".MAX_DELIVERIES"
)
// This is for internal accounting for JetStream for this server.