Allow "nats" utility to display internal MQTT streams

MQTT streams are special in that we do not set subjects in the config
since they capture all subjects. Otherwise, we would have been forced
to create a stream on say "MQTT.>" but then all publishes would have
to be prefixed with "MQTT." in order for them to be captured.

However, if one uses the "nats" tool to inspect those streams, the tool
would fail with:

```
server response is not a valid "io.nats.jetstream.api.v1.stream_info_response" message:
(root): Must validate one and only one schema (oneOf)
config: subjects is required
config: Must validate all the schemas (allOf)
```

To solve that, if we detect that user asks for the MQTT streams, we
artificially set the returned config's subject to ">".

Alternatively, we may want to not return those streams at all, although
there may be value to see the info for mqtt streams/consumers.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-11-30 17:43:50 -07:00
parent ac4890acba
commit 718c995914
2 changed files with 49 additions and 1 deletions

View File

@@ -952,7 +952,14 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()}
config := mset.Config()
// MQTT streams are created without subject, but "nats" tooling would then
// fail to display them since it uses validation and expect the config's
// Subjects to not be empty.
if strings.HasPrefix(name, mqttStreamNamePrefix) && len(config.Subjects) == 0 {
config.Subjects = []string{">"}
}
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: config}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
}

View File

@@ -3745,7 +3745,48 @@ func TestMQTTConfigReload(t *testing.T) {
testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQos1, []byte("msg2"))
}
func TestMQTTStreamInfoReturnsNonEmptySubject(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer s.Shutdown()
cisub := &mqttConnInfo{clientID: "sub", cleanSess: false}
c, r := testMQTTConnect(t, cisub, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
// Check that we can query all MQTT streams. MQTT streams are
// created without subject filter, however, if we return them like this,
// the 'nats' utility will fail to display them due to some xml validation.
for _, sname := range []string{
mqttStreamName,
mqttSessionsStreamName,
mqttRetainedMsgsStreamName,
} {
t.Run(sname, func(t *testing.T) {
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, sname), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var bResp JSApiStreamInfoResponse
if err = json.Unmarshal(resp.Data, &bResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(bResp.Config.Subjects) == 0 {
t.Fatalf("No subject returned, which will cause nats tooling to fail: %+v", bResp.Config)
}
})
}
}
//////////////////////////////////////////////////////////////////////////
//
// Benchmarks
//
//////////////////////////////////////////////////////////////////////////
const (
mqttPubSubj = "a"