mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Add test for MQTT retained message migration
Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
@@ -2330,6 +2330,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
|
||||
break
|
||||
}
|
||||
log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err)
|
||||
errors++
|
||||
return
|
||||
}
|
||||
// Unmarshal the message so that we can obtain the subject name.
|
||||
|
||||
@@ -2993,6 +2993,78 @@ func TestMQTTRetainedMsgNetworkUpdates(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTRetainedMsgMigration(t *testing.T) {
|
||||
o := testMQTTDefaultOptions()
|
||||
s := testMQTTRunServer(t, o)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
// Create the retained messages stream to listen on the old subject first.
|
||||
// The server will correct this when the migration takes place.
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: mqttRetainedMsgsStreamName,
|
||||
Subjects: []string{`$MQTT.rmsgs`},
|
||||
Storage: nats.FileStorage,
|
||||
Retention: nats.LimitsPolicy,
|
||||
Replicas: 1,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Publish some retained messages on the old "$MQTT.rmsgs" subject.
|
||||
for i := 0; i < 100; i++ {
|
||||
msg := fmt.Sprintf(
|
||||
`{"origin":"b5IQZNtG","subject":"test%d","topic":"test%d","msg":"YmFy","flags":1}`, i, i,
|
||||
)
|
||||
_, err := js.Publish(`$MQTT.rmsgs`, []byte(msg))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
// Check that the old subject looks right.
|
||||
si, err := js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{
|
||||
SubjectsFilter: `$MQTT.>`,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
if si.State.NumSubjects != 1 {
|
||||
t.Fatalf("expected 1 subject, got %d", si.State.NumSubjects)
|
||||
}
|
||||
if n := si.State.Subjects[`$MQTT.rmsgs`]; n != 100 {
|
||||
t.Fatalf("expected to find 100 messages on the original subject but found %d", n)
|
||||
}
|
||||
|
||||
// Create an MQTT client, this will cause a migration to take place.
|
||||
mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
// Now look at the stream, there should be 100 messages on the new
|
||||
// divided subjects and none on the old undivided subject.
|
||||
si, err = js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{
|
||||
SubjectsFilter: `$MQTT.>`,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
if si.State.NumSubjects != 100 {
|
||||
t.Fatalf("expected 100 subjects, got %d", si.State.NumSubjects)
|
||||
}
|
||||
if n := si.State.Subjects[`$MQTT.rmsgs`]; n > 0 {
|
||||
t.Fatalf("expected to find no messages on the original subject but found %d", n)
|
||||
}
|
||||
|
||||
// Check that the message counts look right. There should be one
|
||||
// retained message per key.
|
||||
for i := 0; i < 100; i++ {
|
||||
expected := fmt.Sprintf(`$MQTT.rmsgs.test%d`, i)
|
||||
n, ok := si.State.Subjects[expected]
|
||||
if !ok {
|
||||
t.Fatalf("expected to find %q but didn't", expected)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatalf("expected %q to have 1 message but had %d", expected, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTClusterReplicasCount(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
size int
|
||||
|
||||
Reference in New Issue
Block a user