Support reload

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-06-15 07:58:09 -07:00
parent 9400733606
commit d24ae4723f
2 changed files with 26 additions and 4 deletions

View File

@@ -6003,20 +6003,20 @@ func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) {
// Test for auto-cleanup of consumers.
func TestMQTTConsumerInactiveThreshold(t *testing.T) {
conf := createConfFile(t, []byte(`
tmpl := `
listen: 127.0.0.1:-1
server_name: mqtt
jetstream: enabled
mqtt {
listen: 127.0.0.1:-1
consumer_inactive_threshold: "0.2s"
consumer_inactive_threshold: %q
}
# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`))
`
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, "0.2s")))
defer removeFile(t, conf)
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)
@@ -6040,6 +6040,13 @@ func TestMQTTConsumerInactiveThreshold(t *testing.T) {
}
return nil
})
// Check reload.
// We will not redo existing consumers however.
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, "22s"))
if opts := s.getOpts(); opts.MQTT.ConsumerInactiveThreshold != 22*time.Second {
t.Fatalf("Expected reloaded value of %v but got %v", 22*time.Second, opts.MQTT.ConsumerInactiveThreshold)
}
}
//////////////////////////////////////////////////////////////////////////

View File

@@ -675,6 +675,15 @@ func (o *mqttConsumerMemoryStorageReload) Apply(s *Server) {
s.Noticef("Reloaded: MQTT consumer_memory_storage = %v", o.newValue)
}
type mqttInactiveThresholdReload struct {
noopOption
newValue time.Duration
}
func (o *mqttInactiveThresholdReload) Apply(s *Server) {
s.Noticef("Reloaded: MQTT consumer_inactive_threshold = %v", o.newValue)
}
// Compares options and disconnects clients that are no longer listed in pinned certs. Lock must not be held.
func (s *Server) recheckPinnedCerts(curOpts *Options, newOpts *Options) {
s.mu.Lock()
@@ -1208,12 +1217,17 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &mqttStreamReplicasReload{newValue: newValue.(MQTTOpts).StreamReplicas})
diffOpts = append(diffOpts, &mqttConsumerReplicasReload{newValue: newValue.(MQTTOpts).ConsumerReplicas})
diffOpts = append(diffOpts, &mqttConsumerMemoryStorageReload{newValue: newValue.(MQTTOpts).ConsumerMemoryStorage})
diffOpts = append(diffOpts, &mqttInactiveThresholdReload{newValue: newValue.(MQTTOpts).ConsumerInactiveThreshold})
// Nil out/set to 0 the options that we allow to be reloaded so that
// we only fail reload if some that we don't support are changed.
tmpOld := oldValue.(MQTTOpts)
tmpNew := newValue.(MQTTOpts)
tmpOld.TLSConfig, tmpOld.AckWait, tmpOld.MaxAckPending, tmpOld.StreamReplicas, tmpOld.ConsumerReplicas, tmpOld.ConsumerMemoryStorage = nil, 0, 0, 0, 0, false
tmpOld.ConsumerInactiveThreshold = 0
tmpNew.TLSConfig, tmpNew.AckWait, tmpNew.MaxAckPending, tmpNew.StreamReplicas, tmpNew.ConsumerReplicas, tmpNew.ConsumerMemoryStorage = nil, 0, 0, 0, 0, false
tmpNew.ConsumerInactiveThreshold = 0
if !reflect.DeepEqual(tmpOld, tmpNew) {
// See TODO(ik) note below about printing old/new values.
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
@@ -1224,6 +1238,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
tmpNew.StreamReplicas = newValue.(MQTTOpts).StreamReplicas
tmpNew.ConsumerReplicas = newValue.(MQTTOpts).ConsumerReplicas
tmpNew.ConsumerMemoryStorage = newValue.(MQTTOpts).ConsumerMemoryStorage
tmpNew.ConsumerInactiveThreshold = newValue.(MQTTOpts).ConsumerInactiveThreshold
case "connecterrorreports":
diffOpts = append(diffOpts, &connectErrorReports{newValue: newValue.(int)})
case "reconnecterrorreports":