Add JSConsumerDeliveryNakAdvisory

The advisory `JSAdvisoryConsumerMsgNakPre` will be triggered
when a message is naked
This commit is contained in:
Leander Kohler
2022-04-25 16:13:32 +02:00
parent 646b3850bf
commit 966d9d56f4
3 changed files with 43 additions and 0 deletions

View File

@@ -257,6 +257,7 @@ type consumer struct {
inch chan bool
sfreq int32
ackEventT string
nakEventT string
deliveryExcEventT string
created time.Time
ldt time.Time
@@ -660,6 +661,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// already under lock, mset.Name() would deadlock
o.stream = mset.cfg.Name
o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name
o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name
o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name
if !isValidName(o.name) {
@@ -1768,6 +1770,29 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
return
}
}
// Deliver an advisory
e := JSConsumerDeliveryNakAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerDeliveryNakAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: o.stream,
Consumer: o.name,
ConsumerSeq: dseq,
StreamSeq: sseq,
Deliveries: dc,
Domain: o.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(e)
if err != nil {
return
}
o.sendAdvisory(o.nakEventT, j)
// Check to see if we have delays attached.
if len(nak) > len(AckNak) {
arg := bytes.TrimSpace(nak[len(AckNak):])

View File

@@ -204,6 +204,9 @@ const (
// JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
// JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
// JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"

View File

@@ -110,6 +110,21 @@ type JSConsumerDeliveryExceededAdvisory struct {
// JSConsumerDeliveryExceededAdvisoryType is the schema type for JSConsumerDeliveryExceededAdvisory
const JSConsumerDeliveryExceededAdvisoryType = "io.nats.jetstream.advisory.v1.max_deliver"
// JSConsumerDeliveryNakAdvisory is an advisory informing that a message was
// naked by the consumer
type JSConsumerDeliveryNakAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Consumer string `json:"consumer"`
ConsumerSeq uint64 `json:"consumer_seq"`
StreamSeq uint64 `json:"stream_seq"`
Deliveries uint64 `json:"deliveries"`
Domain string `json:"domain,omitempty"`
}
// JSConsumerDeliveryNakAdvisoryType is the schema type for JSConsumerDeliveryNakAdvisory
const JSConsumerDeliveryNakAdvisoryType = "io.nats.jetstream.advisory.v1.nak"
// JSConsumerDeliveryTerminatedAdvisory is an advisory informing that a message was
// terminated by the consumer, so might be a candidate for DLQ handling
type JSConsumerDeliveryTerminatedAdvisory struct {