diff --git a/server/client.go b/server/client.go index bb563c46..bc52f4af 100644 --- a/server/client.go +++ b/server/client.go @@ -2744,7 +2744,7 @@ func (c *client) processUnsub(arg []byte) error { if unsub { c.unsubscribe(acc, sub, false, true) - if acc != nil && kind == CLIENT || kind == SYSTEM || kind == ACCOUNT { + if acc != nil && (kind == CLIENT || kind == SYSTEM || kind == ACCOUNT || kind == JETSTREAM) { srv.updateRouteSubscriptionMap(acc, sub, -1) if updateGWs { srv.gatewayUpdateSubInterest(acc.Name, sub, -1) diff --git a/server/const.go b/server/const.go index 5762d60b..75584754 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-RC.7.2" + VERSION = "2.2.0-RC.7.3" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/stream.go b/server/stream.go index 2192d02b..3618b0f4 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1007,9 +1007,11 @@ func (mset *stream) processMirrorMsgs() { }() // Grab stream quit channel. - mset.mu.RLock() + mset.mu.Lock() msgs, mch, qch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch - mset.mu.RUnlock() + // Set the last seen as now so that we don't fail at the first check. + mset.mirror.last = time.Now() + mset.mu.Unlock() t := time.NewTicker(sourceHealthCheckInterval) defer t.Stop() @@ -1452,9 +1454,11 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { } // Grab stream quit channel. - mset.mu.RLock() + mset.mu.Lock() msgs, mch, qch := si.msgs, si.msgs.mch, mset.qch - mset.mu.RUnlock() + // Set the last seen as now so that we don't fail at the first check. + si.last = time.Now() + mset.mu.Unlock() t := time.NewTicker(sourceHealthCheckInterval) defer t.Stop()