Per-subject limits for MQTT retained messages

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-05-30 15:50:55 +01:00
parent 7b74f4f0da
commit 74690388f5

View File

@@ -112,7 +112,7 @@ const (
// Stream name for MQTT retained messages on a given account
mqttRetainedMsgsStreamName = mqttStreamNamePrefix + "rmsgs"
mqttRetainedMsgsStreamSubject = "$MQTT.rmsgs"
mqttRetainedMsgsStreamSubject = "$MQTT.rmsgs."
// Stream name for MQTT sessions on a given account
mqttSessStreamName = mqttStreamNamePrefix + "sess"
@@ -1150,11 +1150,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
} else if si == nil {
// Create the stream for retained messages.
cfg := &StreamConfig{
Name: mqttRetainedMsgsStreamName,
Subjects: []string{mqttRetainedMsgsStreamSubject},
Storage: FileStorage,
Retention: LimitsPolicy,
Replicas: replicas,
Name: mqttRetainedMsgsStreamName,
Subjects: []string{mqttRetainedMsgsStreamSubject + as.domainTk + ">"},
Storage: FileStorage,
Retention: LimitsPolicy,
Replicas: replicas,
MaxMsgsPer: 1,
}
// We will need "si" outside of this block.
si, _, err = jsa.createStream(cfg)
@@ -1199,7 +1200,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
Stream: mqttRetainedMsgsStreamName,
Config: ConsumerConfig{
Durable: rmDurName,
FilterSubject: mqttRetainedMsgsStreamSubject,
FilterSubject: mqttRetainedMsgsStreamSubject + as.domainTk + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
AckPolicy: AckNone,
@@ -3092,7 +3093,7 @@ func (c *client) mqttHandlePubRetain() {
Source: c.opts.Username,
}
rmBytes, _ := json.Marshal(rm)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject, -1, rmBytes)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+asm.domainTk+key, -1, rmBytes)
if err == nil {
// Update the new sequence
rm.sseq = smr.Sequence