diff --git a/.github/workflows/MQTT test.yaml b/.github/workflows/MQTT test.yaml new file mode 100644 index 00000000..8f268fef --- /dev/null +++ b/.github/workflows/MQTT test.yaml @@ -0,0 +1,37 @@ +name: MQTT Compliance +on: [push, pull_request] + +jobs: + test: + strategy: + matrix: + go: ["1.21"] + env: + GOPATH: /home/runner/work/nats-server + GO111MODULE: "on" + + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + path: src/github.com/nats-io/nats-server + + - name: Setup Go + uses: actions/setup-go@v4 + with: + go-version: ${{matrix.go}} + + - name: Install deps + shell: bash --noprofile --norc -x -eo pipefail {0} + run: | + wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb + sudo apt install ./mqtt-cli-4.20.0.deb + + - name: Run tests + shell: bash --noprofile --norc -x -eo pipefail {0} + run: | + set -e + cd src/github.com/nats-io/nats-server/server + go test -v -vet=off --run=TestMQTTCLICompliance + set +e diff --git a/server/mqtt.go b/server/mqtt.go index e3333339..c0d17f26 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -3915,6 +3915,14 @@ func (c *client) mqttEnqueuePubResponse(packetType byte, pi uint16, trace bool) proto := [4]byte{packetType, 0x2, 0, 0} proto[2] = byte(pi >> 8) proto[3] = byte(pi) + + // Bits 3,2,1 and 0 of the fixed header in the PUBREL Control Packet are + // reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat + // any other value as malformed and close the Network Connection [MQTT-3.6.1-1]. + if packetType == mqttPacketPubRel { + proto[0] |= 0x2 + } + c.mu.Lock() c.enqueueProto(proto[:4]) c.mu.Unlock() diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 5712be0e..a6f4fa7c 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -27,7 +27,9 @@ import ( "math/rand" "net" "os" + "os/exec" "reflect" + "strconv" "strings" "sync" "testing" @@ -6997,6 +6999,41 @@ func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) { testMQTTExpectNothing(t, r) } +func TestMQTTCLICompliance(t *testing.T) { + mqttPath := os.Getenv("MQTT_CLI") + if mqttPath == "" { + if p, err := exec.LookPath("mqtt"); err == nil { + mqttPath = p + } + } + if mqttPath == "" { + t.Skip(`"mqtt" command is not found in $PATH nor $MQTT_CLI. See https://hivemq.github.io/mqtt-cli/docs/installation/#debian-package for installation instructions`) + } + + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: mqtt + jetstream { + store_dir = %q + } + mqtt { + listen: 127.0.0.1:-1 + } + `, t.TempDir()))) + s, o := RunServerWithConfig(conf) + defer testMQTTShutdownServer(s) + + cmd := exec.Command(mqttPath, "test", "-V", "3", "-p", strconv.Itoa(o.MQTT.Port)) + + output, err := cmd.CombinedOutput() + t.Log(string(output)) + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + t.Fatalf("mqtt cli exited with error: %v", exitError) + } + } +} + ////////////////////////////////////////////////////////////////////////// // // Benchmarks