From d24ae4723fedeeb167ebae908fd3db32673b43f3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 15 Jun 2022 07:58:09 -0700 Subject: [PATCH] Support reload Signed-off-by: Derek Collison --- server/mqtt_test.go | 15 +++++++++++---- server/reload.go | 15 +++++++++++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index aba6c558..3913844a 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -6003,20 +6003,20 @@ func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) { // Test for auto-cleanup of consumers. func TestMQTTConsumerInactiveThreshold(t *testing.T) { - conf := createConfFile(t, []byte(` + tmpl := ` listen: 127.0.0.1:-1 server_name: mqtt jetstream: enabled mqtt { listen: 127.0.0.1:-1 - consumer_inactive_threshold: "0.2s" + consumer_inactive_threshold: %q } # For access to system account. accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } - `)) - + ` + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, "0.2s"))) defer removeFile(t, conf) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6040,6 +6040,13 @@ func TestMQTTConsumerInactiveThreshold(t *testing.T) { } return nil }) + + // Check reload. + // We will not redo existing consumers however. + reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, "22s")) + if opts := s.getOpts(); opts.MQTT.ConsumerInactiveThreshold != 22*time.Second { + t.Fatalf("Expected reloaded value of %v but got %v", 22*time.Second, opts.MQTT.ConsumerInactiveThreshold) + } } ////////////////////////////////////////////////////////////////////////// diff --git a/server/reload.go b/server/reload.go index bce1e3f7..38658d5e 100644 --- a/server/reload.go +++ b/server/reload.go @@ -675,6 +675,15 @@ func (o *mqttConsumerMemoryStorageReload) Apply(s *Server) { s.Noticef("Reloaded: MQTT consumer_memory_storage = %v", o.newValue) } +type mqttInactiveThresholdReload struct { + noopOption + newValue time.Duration +} + +func (o *mqttInactiveThresholdReload) Apply(s *Server) { + s.Noticef("Reloaded: MQTT consumer_inactive_threshold = %v", o.newValue) +} + // Compares options and disconnects clients that are no longer listed in pinned certs. Lock must not be held. func (s *Server) recheckPinnedCerts(curOpts *Options, newOpts *Options) { s.mu.Lock() @@ -1208,12 +1217,17 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &mqttStreamReplicasReload{newValue: newValue.(MQTTOpts).StreamReplicas}) diffOpts = append(diffOpts, &mqttConsumerReplicasReload{newValue: newValue.(MQTTOpts).ConsumerReplicas}) diffOpts = append(diffOpts, &mqttConsumerMemoryStorageReload{newValue: newValue.(MQTTOpts).ConsumerMemoryStorage}) + diffOpts = append(diffOpts, &mqttInactiveThresholdReload{newValue: newValue.(MQTTOpts).ConsumerInactiveThreshold}) + // Nil out/set to 0 the options that we allow to be reloaded so that // we only fail reload if some that we don't support are changed. tmpOld := oldValue.(MQTTOpts) tmpNew := newValue.(MQTTOpts) tmpOld.TLSConfig, tmpOld.AckWait, tmpOld.MaxAckPending, tmpOld.StreamReplicas, tmpOld.ConsumerReplicas, tmpOld.ConsumerMemoryStorage = nil, 0, 0, 0, 0, false + tmpOld.ConsumerInactiveThreshold = 0 tmpNew.TLSConfig, tmpNew.AckWait, tmpNew.MaxAckPending, tmpNew.StreamReplicas, tmpNew.ConsumerReplicas, tmpNew.ConsumerMemoryStorage = nil, 0, 0, 0, 0, false + tmpNew.ConsumerInactiveThreshold = 0 + if !reflect.DeepEqual(tmpOld, tmpNew) { // See TODO(ik) note below about printing old/new values. return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v", @@ -1224,6 +1238,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { tmpNew.StreamReplicas = newValue.(MQTTOpts).StreamReplicas tmpNew.ConsumerReplicas = newValue.(MQTTOpts).ConsumerReplicas tmpNew.ConsumerMemoryStorage = newValue.(MQTTOpts).ConsumerMemoryStorage + tmpNew.ConsumerInactiveThreshold = newValue.(MQTTOpts).ConsumerInactiveThreshold case "connecterrorreports": diffOpts = append(diffOpts, &connectErrorReports{newValue: newValue.(int)}) case "reconnecterrorreports":