mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[IMPROVED] MQTT stream per session replaced with single stream
With the availability of a "max message per subject" for a given stream, it is possible to replace individual streams that were created per session with a single stream that gets all sessions as a single message per subject, which subject is composed of the session client ID hash. The first time the new stream is created for a given account, all existing MQTT session streams will be transferred to the new mux'ed MQTT session stream. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
272
server/mqtt.go
272
server/mqtt.go
@@ -113,6 +113,10 @@ const (
|
||||
mqttRetainedMsgsStreamName = mqttStreamNamePrefix + "rmsgs"
|
||||
mqttRetainedMsgsStreamSubject = "$MQTT.rmsgs"
|
||||
|
||||
// Stream name for MQTT sessions on a given account
|
||||
mqttSessStreamName = mqttStreamNamePrefix + "sess"
|
||||
mqttSessStreamSubjectPrefix = "$MQTT.sess."
|
||||
|
||||
// Stream name prefix for MQTT sessions on a given account
|
||||
mqttSessionsStreamNamePrefix = mqttStreamNamePrefix + "sess_"
|
||||
|
||||
@@ -148,6 +152,7 @@ const (
|
||||
mqttJSAMsgLoad = "ML"
|
||||
mqttJSASessPersist = "SP"
|
||||
mqttJSARetainedMsgDel = "RD"
|
||||
mqttJSAStreamNames = "SN"
|
||||
|
||||
// Name of the header key added to NATS message to carry mqtt PUBLISH information
|
||||
mqttNatsHeader = "Nmqtt-Pub"
|
||||
@@ -159,6 +164,9 @@ const (
|
||||
|
||||
// This is how frequently the timer to cleanup the sessions flappers map is firing.
|
||||
mqttSessFlappingCleanupInterval = 5 * time.Second
|
||||
|
||||
// Default retry delay if transfer of old session streams to new one fails
|
||||
mqttDefaultTransferRetry = 5 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -217,6 +225,20 @@ type mqttAccountSessionManager struct {
|
||||
replicas int
|
||||
rrmLastSeq uint64 // Restore retained messages expected last sequence
|
||||
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
|
||||
sp sessPersist // Used for cluster-wide processing of session records being persisted
|
||||
domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject.
|
||||
}
|
||||
|
||||
type sessPersist struct {
|
||||
mu sync.Mutex
|
||||
ch chan struct{}
|
||||
head *sessPersistRecord
|
||||
tail *sessPersistRecord
|
||||
}
|
||||
|
||||
type sessPersistRecord struct {
|
||||
seq uint64
|
||||
next *sessPersistRecord
|
||||
}
|
||||
|
||||
type mqttJSA struct {
|
||||
@@ -257,6 +279,7 @@ type mqttSession struct {
|
||||
maxp uint16
|
||||
tmaxack int
|
||||
clean bool
|
||||
domainTk string
|
||||
}
|
||||
|
||||
type mqttPersistedSession struct {
|
||||
@@ -778,6 +801,7 @@ func (s *Server) mqttHandleClosedClient(c *client) {
|
||||
sess.mu.Lock()
|
||||
sess.c = nil
|
||||
doClean := sess.clean
|
||||
seq := sess.seq
|
||||
sess.mu.Unlock()
|
||||
// If it was a clean session, then we remove from the account manager,
|
||||
// and we will call clear() outside of any lock.
|
||||
@@ -790,7 +814,7 @@ func (s *Server) mqttHandleClosedClient(c *client) {
|
||||
|
||||
// This needs to be done outside of any lock.
|
||||
if doClean {
|
||||
sess.clear(true)
|
||||
sess.clear(true, seq)
|
||||
}
|
||||
|
||||
// Now handle the "will". This function will be a no-op if there is no "will" to send.
|
||||
@@ -893,6 +917,14 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
|
||||
nuid: nuid.New(),
|
||||
quitCh: quitCh,
|
||||
},
|
||||
sp: sessPersist{
|
||||
ch: make(chan struct{}, 1),
|
||||
},
|
||||
}
|
||||
// We need to include the domain in the subject prefix used to store sessions in the $MQTT_sess stream.
|
||||
// If no domain is present, use "_" so that the token always exists.
|
||||
if d := s.getOpts().JetStreamDomain; d != _EMPTY_ {
|
||||
as.domainTk = string(getHash(d)) + "."
|
||||
}
|
||||
|
||||
var subs []*subscription
|
||||
@@ -954,8 +986,29 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
|
||||
as.sendJSAPIrequests(s, c, accName, closeCh)
|
||||
})
|
||||
|
||||
// Create the stream for the messages.
|
||||
// Start the go routine that will handle network updates regarding sessions
|
||||
s.startGoRoutine(func() {
|
||||
defer s.grWG.Done()
|
||||
as.sessPersistProcessing(closeCh)
|
||||
})
|
||||
|
||||
// Create the stream for the sessions.
|
||||
cfg := &StreamConfig{
|
||||
Name: mqttSessStreamName,
|
||||
Subjects: []string{mqttSessStreamSubjectPrefix + as.domainTk + ">"},
|
||||
Storage: FileStorage,
|
||||
Retention: LimitsPolicy,
|
||||
Replicas: as.replicas,
|
||||
MaxMsgsPer: 1,
|
||||
}
|
||||
if _, err := jsa.createStream(cfg); err == nil {
|
||||
as.transferUniqueSessStreamsToMuxed(s)
|
||||
} else if isErrorOtherThan(err, JSStreamNameExistErr) {
|
||||
return nil, fmt.Errorf("create sessions stream for account %q: %v", acc.GetName(), err)
|
||||
}
|
||||
|
||||
// Create the stream for the messages.
|
||||
cfg = &StreamConfig{
|
||||
Name: mqttStreamName,
|
||||
Subjects: []string{mqttStreamSubjectPrefix + ">"},
|
||||
Storage: FileStorage,
|
||||
@@ -1080,7 +1133,12 @@ func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, time
|
||||
// Either we use nuid.Next() which uses a global lock, or our own nuid object, but
|
||||
// then it needs to be "write" protected. This approach will reduce across account
|
||||
// contention since we won't use the global nuid's lock.
|
||||
reply := jsa.rplyr + kind + "." + jsa.nuid.Next()
|
||||
var sb strings.Builder
|
||||
sb.WriteString(jsa.rplyr)
|
||||
sb.WriteString(kind)
|
||||
sb.WriteByte('.')
|
||||
sb.WriteString(jsa.nuid.Next())
|
||||
reply := sb.String()
|
||||
jsa.mu.Unlock()
|
||||
|
||||
ch := make(chan interface{}, 1)
|
||||
@@ -1170,6 +1228,20 @@ func (jsa *mqttJSA) deleteStream(name string) (bool, error) {
|
||||
return sdr.Success, sdr.ToError()
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) loadLastMsgFor(streamName string, subject string) (*StoredMsg, error) {
|
||||
mreq := &JSApiMsgGetRequest{LastFor: subject}
|
||||
req, err := json.Marshal(mreq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lmri, err := jsa.newRequest(mqttJSAMsgLoad, fmt.Sprintf(JSApiMsgGetT, streamName), 0, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lmr := lmri.(*JSApiMsgGetResponse)
|
||||
return lmr.Message, lmr.ToError()
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) loadMsg(streamName string, seq uint64) (*StoredMsg, error) {
|
||||
mreq := &JSApiMsgGetRequest{Seq: seq}
|
||||
req, err := json.Marshal(mreq)
|
||||
@@ -1275,6 +1347,12 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl
|
||||
resp.Error = NewJSInvalidJSONError()
|
||||
}
|
||||
ch <- resp
|
||||
case mqttJSAStreamNames:
|
||||
var resp = &JSApiStreamNamesResponse{}
|
||||
if err := json.Unmarshal(msg, resp); err != nil {
|
||||
resp.Error = NewJSInvalidJSONError()
|
||||
}
|
||||
ch <- resp
|
||||
default:
|
||||
pc.Warnf("Unknown reply code %q", token)
|
||||
}
|
||||
@@ -1350,7 +1428,34 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc *
|
||||
if err := par.Error; err != nil {
|
||||
return
|
||||
}
|
||||
cIDHash := strings.TrimPrefix(par.Stream, mqttSessionsStreamNamePrefix)
|
||||
// We would need to lookup the message that that is a request/reply
|
||||
// that we can do in place here. So move that to a long-running routine
|
||||
// that will process the session persist record.
|
||||
as.mu.RLock()
|
||||
sp := &as.sp
|
||||
as.mu.RUnlock()
|
||||
|
||||
spr := &sessPersistRecord{seq: par.Sequence}
|
||||
sp.mu.Lock()
|
||||
if sp.tail != nil {
|
||||
sp.tail.next = spr
|
||||
} else {
|
||||
sp.head = spr
|
||||
select {
|
||||
case sp.ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
sp.tail = spr
|
||||
sp.mu.Unlock()
|
||||
}
|
||||
|
||||
func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRecord) {
|
||||
smsg, err := as.jsa.loadMsg(mqttSessStreamName, spr.seq)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
cIDHash := strings.TrimPrefix(smsg.Subject, mqttSessStreamSubjectPrefix+as.domainTk)
|
||||
|
||||
as.mu.Lock()
|
||||
defer as.mu.Unlock()
|
||||
@@ -1360,7 +1465,10 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc *
|
||||
}
|
||||
// If our current session's stream sequence is higher, it means that this
|
||||
// update is stale, so we don't do anything here.
|
||||
if par.Sequence < sess.seq {
|
||||
sess.mu.Lock()
|
||||
ignore := spr.seq < sess.seq
|
||||
sess.mu.Unlock()
|
||||
if ignore {
|
||||
return
|
||||
}
|
||||
as.removeSession(sess, false)
|
||||
@@ -1378,6 +1486,32 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc *
|
||||
sess.mu.Unlock()
|
||||
}
|
||||
|
||||
func (as *mqttAccountSessionManager) sessPersistProcessing(closeCh chan struct{}) {
|
||||
as.mu.RLock()
|
||||
sp := &as.sp
|
||||
quitCh := as.jsa.quitCh
|
||||
as.mu.RUnlock()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sp.ch:
|
||||
sp.mu.Lock()
|
||||
l := sp.head
|
||||
sp.head, sp.tail = nil, nil
|
||||
sp.mu.Unlock()
|
||||
|
||||
for spr := l; spr != nil; spr = l.next {
|
||||
l = spr
|
||||
as.processSessPersistRecord(spr)
|
||||
}
|
||||
case <-closeCh:
|
||||
return
|
||||
case <-quitCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds this client ID to the flappers map, and if needed start the timer
|
||||
// for map cleanup.
|
||||
//
|
||||
@@ -1855,69 +1989,33 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms
|
||||
// Runs from the client's readLoop.
|
||||
// Lock not held on entry, but session is in the locked map.
|
||||
func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opts *Options) (*mqttSession, bool, error) {
|
||||
// Add the JS domain (possibly empty) to the client ID, which will make
|
||||
// session stream/filter subject be unique per domain. So if an application
|
||||
// with the same client ID moves to the other domain, then there won't be
|
||||
// conflict of session message in one domain updating the session's stream
|
||||
// in others.
|
||||
hash := string(getHash(opts.JetStreamDomain + clientID))
|
||||
sname := mqttSessionsStreamNamePrefix + hash
|
||||
cfg := &StreamConfig{
|
||||
Name: sname,
|
||||
Subjects: []string{sname},
|
||||
Storage: FileStorage,
|
||||
Retention: LimitsPolicy,
|
||||
MaxMsgs: 1,
|
||||
Replicas: as.replicas,
|
||||
}
|
||||
jsa := &as.jsa
|
||||
formatError := func(errTxt string, err error) (*mqttSession, bool, error) {
|
||||
accName := jsa.c.acc.GetName()
|
||||
return nil, false, fmt.Errorf("%s for account %q, session %q: %v", errTxt, accName, clientID, err)
|
||||
}
|
||||
CREATE_STREAM:
|
||||
// Send a request to create the stream for this session.
|
||||
si, err := jsa.createStream(cfg)
|
||||
|
||||
hash := string(getHash(clientID))
|
||||
subject := mqttSessStreamSubjectPrefix + as.domainTk + hash
|
||||
smsg, err := jsa.loadLastMsgFor(mqttSessStreamName, subject)
|
||||
if err != nil {
|
||||
// Check for insufficient resources. If that is the case, and if possible, try
|
||||
// again with a lower replicas value.
|
||||
if cfg.Replicas > 1 && IsNatsErr(err, JSInsufficientResourcesErr) {
|
||||
cfg.Replicas--
|
||||
goto CREATE_STREAM
|
||||
if isErrorOtherThan(err, JSNoMessageFoundErr) {
|
||||
return formatError("loading session record", err)
|
||||
}
|
||||
// If there is an error and not simply "already used" (which means that the
|
||||
// stream already exists) then we fail.
|
||||
if isErrorOtherThan(err, JSStreamNameExistErr) {
|
||||
return formatError("create session stream", err)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
// Since we have returned if error is not "stream already exist", then
|
||||
// it means that the stream already exists and so we now need to recover
|
||||
// the existing record.
|
||||
si, err = jsa.lookupStream(sname)
|
||||
if err != nil {
|
||||
return formatError("lookup session stream", err)
|
||||
}
|
||||
}
|
||||
// The stream is supposed to have at most 1 record, if it is empty, it means
|
||||
// that we just created it.
|
||||
if si.State.Msgs == 0 {
|
||||
// Message not found, so reate the session...
|
||||
// Create a session and indicate that this session did not exist.
|
||||
sess := mqttSessionCreate(jsa, clientID, hash, 0, opts)
|
||||
sess.domainTk = as.domainTk
|
||||
return sess, false, nil
|
||||
}
|
||||
// We need to recover the existing record now.
|
||||
smsg, err := jsa.loadMsg(sname, si.State.LastSeq)
|
||||
if err != nil {
|
||||
return formatError("loading session record", err)
|
||||
}
|
||||
ps := &mqttPersistedSession{}
|
||||
if err := json.Unmarshal(smsg.Data, ps); err != nil {
|
||||
return formatError(fmt.Sprintf("unmarshal of session record at sequence %v", smsg.Sequence), err)
|
||||
}
|
||||
// Restore this session (even if we don't own it), the caller will do the right thing.
|
||||
sess := mqttSessionCreate(jsa, clientID, hash, smsg.Sequence, opts)
|
||||
sess.domainTk = as.domainTk
|
||||
sess.clean = ps.Clean
|
||||
sess.subs = ps.Subs
|
||||
sess.cons = ps.Cons
|
||||
@@ -1947,6 +2045,66 @@ func (as *mqttAccountSessionManager) notifyRetainedMsgDeleted(subject string, se
|
||||
}
|
||||
}
|
||||
|
||||
func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Server) {
|
||||
// Set retry to true, will be set to false on success.
|
||||
retry := true
|
||||
defer func() {
|
||||
if retry {
|
||||
next := mqttDefaultTransferRetry
|
||||
log.Warnf("Failed to transfer all MQTT session streams, will try again in %v", next)
|
||||
time.AfterFunc(next, func() { as.transferUniqueSessStreamsToMuxed(log) })
|
||||
}
|
||||
}()
|
||||
|
||||
jsa := &as.jsa
|
||||
sni, err := jsa.newRequestEx(mqttJSAStreamNames, JSApiStreams, 0, nil, 5*time.Second)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to transfer MQTT session streams: %v", err)
|
||||
return
|
||||
}
|
||||
snames := sni.(*JSApiStreamNamesResponse)
|
||||
if snames.Error != nil {
|
||||
log.Errorf("Unable to transfer MQTT session streams: %v", snames.ToError())
|
||||
return
|
||||
}
|
||||
var oldMQTTSessStreams []string
|
||||
for _, sn := range snames.Streams {
|
||||
if strings.HasPrefix(sn, mqttSessionsStreamNamePrefix) {
|
||||
oldMQTTSessStreams = append(oldMQTTSessStreams, sn)
|
||||
}
|
||||
}
|
||||
ns := len(oldMQTTSessStreams)
|
||||
if ns == 0 {
|
||||
// Nothing to do
|
||||
retry = false
|
||||
return
|
||||
}
|
||||
log.Noticef("Transferring %v MQTT session streams...", ns)
|
||||
for _, sn := range oldMQTTSessStreams {
|
||||
log.Noticef(" Transferring stream %q to %q", sn, mqttSessStreamName)
|
||||
smsg, err := jsa.loadLastMsgFor(sn, sn)
|
||||
if err != nil {
|
||||
log.Errorf(" Unable to load session record: %v", err)
|
||||
return
|
||||
}
|
||||
ps := &mqttPersistedSession{}
|
||||
if err := json.Unmarshal(smsg.Data, ps); err != nil {
|
||||
log.Warnf(" Unable to unmarshal the content of this stream, may not be a legitimate MQTT session stream, skipping")
|
||||
continue
|
||||
}
|
||||
// Compute subject where the session is being stored
|
||||
subject := mqttSessStreamSubjectPrefix + as.domainTk + string(getHash(ps.ID))
|
||||
// Store record to MQTT session stream
|
||||
if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil {
|
||||
log.Errorf(" Unable to transfer the session record: %v", err)
|
||||
return
|
||||
}
|
||||
jsa.deleteStream(sn)
|
||||
}
|
||||
log.Noticef("Transfer of %v MQTT session streams done!", ns)
|
||||
retry = false
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// MQTT session related functions
|
||||
@@ -1978,13 +2136,13 @@ func (sess *mqttSession) save() error {
|
||||
}
|
||||
b, _ := json.Marshal(&ps)
|
||||
|
||||
sname := mqttSessionsStreamNamePrefix + sess.idHash
|
||||
subject := mqttSessStreamSubjectPrefix + sess.domainTk + sess.idHash
|
||||
seq := sess.seq
|
||||
sess.mu.Unlock()
|
||||
|
||||
bb := bytes.Buffer{}
|
||||
bb.WriteString(hdrLine)
|
||||
bb.WriteString(JSExpectedLastSeq)
|
||||
bb.WriteString(JSExpectedLastSubjSeq)
|
||||
bb.WriteString(":")
|
||||
bb.WriteString(strconv.FormatInt(int64(seq), 10))
|
||||
bb.WriteString(CR_LF)
|
||||
@@ -1992,7 +2150,7 @@ func (sess *mqttSession) save() error {
|
||||
hdr := bb.Len()
|
||||
bb.Write(b)
|
||||
|
||||
resp, err := sess.jsa.storeMsgWithKind(mqttJSASessPersist, sname, hdr, bb.Bytes())
|
||||
resp, err := sess.jsa.storeMsgWithKind(mqttJSASessPersist, subject, hdr, bb.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -2007,13 +2165,13 @@ func (sess *mqttSession) save() error {
|
||||
//
|
||||
// Runs from the client's readLoop.
|
||||
// Lock not held on entry, but session is in the locked map.
|
||||
func (sess *mqttSession) clear(deleteStream bool) {
|
||||
func (sess *mqttSession) clear(deleteSess bool, seq uint64) {
|
||||
for sid, cc := range sess.cons {
|
||||
delete(sess.cons, sid)
|
||||
sess.deleteConsumer(cc)
|
||||
}
|
||||
if deleteStream {
|
||||
sess.jsa.deleteStream(mqttSessionsStreamNamePrefix + sess.idHash)
|
||||
if deleteSess {
|
||||
sess.jsa.deleteMsg(mqttSessStreamName, seq)
|
||||
}
|
||||
sess.mu.Lock()
|
||||
sess.subs, sess.pending, sess.cpending, sess.seq, sess.tmaxack = nil, nil, nil, 0, 0
|
||||
@@ -2476,7 +2634,7 @@ CHECK:
|
||||
// This Session lasts as long as the Network Connection. State data
|
||||
// associated with this Session MUST NOT be reused in any subsequent
|
||||
// Session.
|
||||
es.clear(false)
|
||||
es.clear(false, 0)
|
||||
} else {
|
||||
// Report to the client that the session was present
|
||||
sessp = true
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -2700,7 +2701,7 @@ func TestMQTTCluster(t *testing.T) {
|
||||
clientID := nuid.Next()
|
||||
|
||||
o := cl.opts[0]
|
||||
mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: clientID, cleanSess: false}, o.MQTT.Host, o.MQTT.Port)
|
||||
mc, r := testMQTTConnectRetry(t, &mqttConnInfo{clientID: clientID, cleanSess: false}, o.MQTT.Host, o.MQTT.Port, 5)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
@@ -2775,7 +2776,7 @@ func TestMQTTCluster(t *testing.T) {
|
||||
cl.stopAll()
|
||||
cl.restartAll()
|
||||
|
||||
streams := []string{mqttStreamName, mqttRetainedMsgsStreamName}
|
||||
streams := []string{mqttStreamName, mqttRetainedMsgsStreamName, mqttSessStreamName}
|
||||
for _, sn := range streams {
|
||||
cl.waitOnStreamLeader(globalAccountName, sn)
|
||||
}
|
||||
@@ -2794,7 +2795,7 @@ func TestMQTTClusterRetainedMsg(t *testing.T) {
|
||||
srv2Opts := cl.opts[1]
|
||||
|
||||
// Connect subscription on server 1.
|
||||
mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, srv1Opts.MQTT.Host, srv1Opts.MQTT.Port)
|
||||
mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, srv1Opts.MQTT.Host, srv1Opts.MQTT.Port, 5)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
@@ -3014,7 +3015,7 @@ func TestMQTTClusterReplicasCount(t *testing.T) {
|
||||
for _, sname := range []string{
|
||||
mqttStreamName,
|
||||
mqttRetainedMsgsStreamName,
|
||||
mqttSessionsStreamNamePrefix + string(getHash("sub")),
|
||||
mqttSessStreamName,
|
||||
} {
|
||||
t.Run(sname, func(t *testing.T) {
|
||||
si, err := js.StreamInfo(sname)
|
||||
@@ -3030,25 +3031,27 @@ func TestMQTTClusterReplicasCount(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTClusterSessionReplicasAdjustment(t *testing.T) {
|
||||
func TestMQTTClusterCanCreateSessionWithOnServerDown(t *testing.T) {
|
||||
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 3)
|
||||
defer cl.shutdown()
|
||||
o := cl.opts[0]
|
||||
|
||||
mc, rc := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)
|
||||
mc.Close()
|
||||
|
||||
// Shutdown one of the server.
|
||||
sd := cl.servers[1].StoreDir()
|
||||
defer os.RemoveAll(strings.TrimSuffix(sd, JetStreamStoreDir))
|
||||
cl.servers[1].Shutdown()
|
||||
|
||||
// Make sure there is a meta leader
|
||||
cl.waitOnPeerCount(2)
|
||||
cl.waitOnLeader()
|
||||
|
||||
// Now try to create a new session. With R(3) this would fail, but now server will
|
||||
// adjust it down to R(2).
|
||||
// Now try to create a new session. Since we use a single stream now for all sessions,
|
||||
// this should succeed.
|
||||
o = cl.opts[2]
|
||||
// We may still get failures because of some JS APIs may timeout while things
|
||||
// settle, so try again for a certain amount of times.
|
||||
@@ -3069,7 +3072,7 @@ func TestMQTTClusterPlacement(t *testing.T) {
|
||||
sc.waitOnLeader()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
mc, rc := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, lnc.opts[i%3].MQTT.Host, lnc.opts[i%3].MQTT.Port)
|
||||
mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, lnc.opts[i%3].MQTT.Host, lnc.opts[i%3].MQTT.Port, 5)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)
|
||||
}
|
||||
@@ -3258,7 +3261,7 @@ func TestMQTTSessionMovingDomains(t *testing.T) {
|
||||
|
||||
connectSubAndDisconnect := func(host string, port int, present bool) {
|
||||
t.Helper()
|
||||
mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, host, port)
|
||||
mc, rc := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, host, port, 5)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, present)
|
||||
testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
|
||||
@@ -4962,6 +4965,108 @@ func TestMQTTWebsocketNotSupported(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) {
|
||||
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 3)
|
||||
defer cl.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, cl.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Create 2 streams that start with "$MQTT_sess_" to check for transfer to new
|
||||
// mux'ed unique "$MQTT_sess" stream. One of this stream will not contain a
|
||||
// proper session record, and we will check that the stream does not get deleted.
|
||||
sessStreamName1 := mqttSessionsStreamNamePrefix + string(getHash("sub"))
|
||||
if _, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: sessStreamName1,
|
||||
Subjects: []string{sessStreamName1},
|
||||
Replicas: 3,
|
||||
MaxMsgs: 1,
|
||||
}); err != nil {
|
||||
t.Fatalf("Unable to add stream: %v", err)
|
||||
}
|
||||
// Then add the session record
|
||||
ps := mqttPersistedSession{
|
||||
ID: "sub",
|
||||
Subs: map[string]byte{"foo": 1},
|
||||
Cons: map[string]*ConsumerConfig{"foo": {
|
||||
Durable: "d6INCtp3_cK39H5WHEtOSU7sLy2oQv3",
|
||||
DeliverSubject: "$MQTT.sub.cK39H5WHEtOSU7sLy2oQrR",
|
||||
DeliverPolicy: DeliverNew,
|
||||
AckPolicy: AckExplicit,
|
||||
FilterSubject: "$MQTT.msgs.foo",
|
||||
MaxAckPending: 1024,
|
||||
}},
|
||||
}
|
||||
b, _ := json.Marshal(&ps)
|
||||
if _, err := js.Publish(sessStreamName1, b); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
|
||||
// Create the stream that has "$MQTT_sess_" prefix, but that is not really a MQTT session stream
|
||||
sessStreamName2 := mqttSessionsStreamNamePrefix + "ivan"
|
||||
if _, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: sessStreamName2,
|
||||
Subjects: []string{sessStreamName2},
|
||||
Replicas: 3,
|
||||
MaxMsgs: 1,
|
||||
}); err != nil {
|
||||
t.Fatalf("Unable to add stream: %v", err)
|
||||
}
|
||||
if _, err := js.Publish(sessStreamName2, []byte("some content")); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
|
||||
nc.Close()
|
||||
cl.stopAll()
|
||||
cl.restartAll()
|
||||
|
||||
cl.waitOnStreamLeader(globalAccountName, sessStreamName1)
|
||||
cl.waitOnStreamLeader(globalAccountName, sessStreamName2)
|
||||
|
||||
// Now create a real MQTT connection
|
||||
o := cl.opts[0]
|
||||
sc, sr := testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub"}, o.MQTT.Host, o.MQTT.Port, 10)
|
||||
defer sc.Close()
|
||||
testMQTTCheckConnAck(t, sr, mqttConnAckRCConnectionAccepted, true)
|
||||
|
||||
nc, js = jsClientConnect(t, cl.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Check that old session stream is gone, but the non session stream is still present.
|
||||
var gotIt = false
|
||||
for info := range js.StreamsInfo() {
|
||||
if strings.HasPrefix(info.Config.Name, mqttSessionsStreamNamePrefix) {
|
||||
if strings.HasSuffix(info.Config.Name, "_ivan") {
|
||||
gotIt = true
|
||||
} else {
|
||||
t.Fatalf("The stream %q should have been deleted", info.Config.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !gotIt {
|
||||
t.Fatalf("The stream %q should not have been deleted", mqttSessionsStreamNamePrefix+"ivan")
|
||||
}
|
||||
|
||||
// We want to check that the record was properly transferred.
|
||||
rmsg, err := js.GetMsg(mqttSessStreamName, 2)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to get session message: %v", err)
|
||||
}
|
||||
ps2 := &mqttPersistedSession{}
|
||||
if err := json.Unmarshal(rmsg.Data, ps2); err != nil {
|
||||
t.Fatalf("Error unpacking session record: %v", err)
|
||||
}
|
||||
if ps2.ID != "sub" {
|
||||
t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps)
|
||||
}
|
||||
if qos, ok := ps2.Subs["foo"]; !ok || qos != 1 {
|
||||
t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps)
|
||||
}
|
||||
if cons, ok := ps2.Cons["foo"]; !ok || !reflect.DeepEqual(cons, ps.Cons["foo"]) {
|
||||
t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps)
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Benchmarks
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -200,6 +201,7 @@ func (c *cluster) shutdown() {
|
||||
os.Remove(cf)
|
||||
}
|
||||
if sd != _EMPTY_ {
|
||||
sd = strings.TrimSuffix(sd, JetStreamStoreDir)
|
||||
os.RemoveAll(sd)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user