Added consumer_replicas (similar to stream_replicas)

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-05-17 18:06:12 -06:00
parent 5d3b1743e3
commit 1ddc5bd9f6
4 changed files with 212 additions and 3 deletions

View File

@@ -5557,6 +5557,8 @@ func TestMQTTStreamReplicasOverride(t *testing.T) {
defer cl.shutdown()
connectAndCheck := func(restarted bool) {
t.Helper()
o := cl.opts[0]
mc, r := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "test", cleanSess: false}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
@@ -5662,9 +5664,11 @@ func TestMQTTStreamReplicasInsufficientResources(t *testing.T) {
defer cl.shutdown()
l := &captureErrorLogger{errCh: make(chan string, 10)}
for _, s := range cl.servers {
s.SetLogger(l, false, false)
}
o := cl.opts[1]
cl.servers[1].SetLogger(l, false, false)
_, _, err := testMQTTConnectRetryWithError(t, &mqttConnInfo{clientID: "mqtt", cleanSess: false}, o.MQTT.Host, o.MQTT.Port, 0)
if err == nil {
t.Fatal("Expected to fail, did not")
@@ -5680,6 +5684,182 @@ func TestMQTTStreamReplicasInsufficientResources(t *testing.T) {
}
}
func TestMQTTConsumerReplicasValidate(t *testing.T) {
o := testMQTTDefaultOptions()
for _, test := range []struct {
name string
sr int
cr int
err bool
}{
{"stream replicas neg", -1, 3, false},
{"stream replicas 0", 0, 3, false},
{"consumer replicas neg", 0, -1, false},
{"consumer replicas 0", -1, 0, false},
{"consumer replicas too high", 1, 2, true},
} {
t.Run(test.name, func(t *testing.T) {
o.MQTT.StreamReplicas = test.sr
o.MQTT.ConsumerReplicas = test.cr
err := validateMQTTOptions(o)
if test.err {
if err == nil {
t.Fatal("Expected error, did not get one")
}
if !strings.Contains(err.Error(), "cannot be higher") {
t.Fatalf("Unexpected error: %v", err)
}
// OK
return
} else if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}
func TestMQTTConsumerReplicasOverride(t *testing.T) {
conf := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
mqtt {
listen: 127.0.0.1:-1
stream_replicas: 5
consumer_replicas: 1
}
# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
cl := createJetStreamClusterWithTemplate(t, conf, "MQTT", 5)
defer cl.shutdown()
connectAndCheck := func(subject string, restarted bool) {
t.Helper()
o := cl.opts[0]
mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: "test", cleanSess: false}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, restarted)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
nc, js := jsClientConnect(t, cl.servers[2])
defer nc.Close()
for ci := range js.ConsumersInfo(mqttStreamName) {
if ci.Config.FilterSubject == mqttStreamSubjectPrefix+"foo" {
if len(ci.Cluster.Replicas) != 0 {
t.Fatalf("Expected consumer to be R1, got: %+v", ci.Cluster)
}
} else {
if len(ci.Cluster.Replicas) != 1 {
t.Fatalf("Expected consumer to be R2, got: %+v", ci.Cluster)
}
}
}
}
connectAndCheck("foo", false)
cl.stopAll()
for _, o := range cl.opts {
o.MQTT.ConsumerReplicas = 2
}
cl.restartAllSamePorts()
cl.waitOnStreamLeader(globalAccountName, mqttStreamName)
cl.waitOnStreamLeader(globalAccountName, mqttRetainedMsgsStreamName)
cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName)
connectAndCheck("bar", true)
}
func TestMQTTConsumerReplicasReload(t *testing.T) {
tmpl := `
jetstream: enabled
server_name: mqtt
mqtt {
port: -1
consumer_replicas: %v
}
`
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, 3)))
defer removeFile(t, conf)
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)
l := &captureErrorLogger{errCh: make(chan string, 10)}
s.SetLogger(l, false, false)
c, r := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{mqttSubAckFailure})
select {
case e := <-l.errCh:
if !strings.Contains(e, NewJSStreamReplicasNotSupportedError().Description) {
t.Fatalf("Expected error regarding replicas, got %v", e)
}
case <-time.After(2 * time.Second):
t.Fatalf("Did not get the error regarding replicas count")
}
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, 1))
testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
}
func TestMQTTConsumerReplicasExceedsParentStream(t *testing.T) {
conf := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
mqtt {
listen: 127.0.0.1:-1
consumer_replicas: 4
}
# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
cl := createJetStreamClusterWithTemplate(t, conf, "MQTT", 3)
defer cl.shutdown()
l := &captureErrorLogger{errCh: make(chan string, 10)}
for _, s := range cl.servers {
s.SetLogger(l, false, false)
}
o := cl.opts[0]
mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: "test", cleanSess: false}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{mqttSubAckFailure})
select {
case e := <-l.errCh:
if !strings.Contains(e, NewJSConsumerReplicasExceedsStreamError().Description) {
t.Fatalf("Expected error regarding replicas exceeded parent, got %v", e)
}
case <-time.After(2 * time.Second):
t.Fatalf("Did not get the error regarding replicas count")
}
}
//////////////////////////////////////////////////////////////////////////
//
// Benchmarks