From 966d9d56f49fe12bfec65d3689959564b46cb815 Mon Sep 17 00:00:00 2001 From: Leander Kohler Date: Mon, 25 Apr 2022 16:13:32 +0200 Subject: [PATCH] Add JSConsumerDeliveryNakAdvisory The advisory `JSAdvisoryConsumerMsgNakPre` will be triggered when a message is naked --- server/consumer.go | 25 +++++++++++++++++++++++++ server/jetstream_api.go | 3 +++ server/jetstream_events.go | 15 +++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/server/consumer.go b/server/consumer.go index 89b11482..7d0e6c12 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -257,6 +257,7 @@ type consumer struct { inch chan bool sfreq int32 ackEventT string + nakEventT string deliveryExcEventT string created time.Time ldt time.Time @@ -660,6 +661,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // already under lock, mset.Name() would deadlock o.stream = mset.cfg.Name o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name + o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name if !isValidName(o.name) { @@ -1768,6 +1770,29 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { return } } + + // Deliver an advisory + e := JSConsumerDeliveryNakAdvisory{ + TypedEvent: TypedEvent{ + Type: JSConsumerDeliveryNakAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Stream: o.stream, + Consumer: o.name, + ConsumerSeq: dseq, + StreamSeq: sseq, + Deliveries: dc, + Domain: o.srv.getOpts().JetStreamDomain, + } + + j, err := json.Marshal(e) + if err != nil { + return + } + + o.sendAdvisory(o.nakEventT, j) + // Check to see if we have delays attached. if len(nak) > len(AckNak) { arg := bytes.TrimSpace(nak[len(AckNak):]) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 65842b8d..ac2438c5 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -204,6 +204,9 @@ const ( // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold. JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES" + // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked + JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED" + // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated. JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED" diff --git a/server/jetstream_events.go b/server/jetstream_events.go index 6497b84d..2e1c7313 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -110,6 +110,21 @@ type JSConsumerDeliveryExceededAdvisory struct { // JSConsumerDeliveryExceededAdvisoryType is the schema type for JSConsumerDeliveryExceededAdvisory const JSConsumerDeliveryExceededAdvisoryType = "io.nats.jetstream.advisory.v1.max_deliver" +// JSConsumerDeliveryNakAdvisory is an advisory informing that a message was +// naked by the consumer +type JSConsumerDeliveryNakAdvisory struct { + TypedEvent + Stream string `json:"stream"` + Consumer string `json:"consumer"` + ConsumerSeq uint64 `json:"consumer_seq"` + StreamSeq uint64 `json:"stream_seq"` + Deliveries uint64 `json:"deliveries"` + Domain string `json:"domain,omitempty"` +} + +// JSConsumerDeliveryNakAdvisoryType is the schema type for JSConsumerDeliveryNakAdvisory +const JSConsumerDeliveryNakAdvisoryType = "io.nats.jetstream.advisory.v1.nak" + // JSConsumerDeliveryTerminatedAdvisory is an advisory informing that a message was // terminated by the consumer, so might be a candidate for DLQ handling type JSConsumerDeliveryTerminatedAdvisory struct {