mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge branch 'VidScale-master' into fix_632
This commit is contained in:
124
server/client.go
124
server/client.go
@@ -915,9 +915,9 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
|
||||
var needFlush = struct{}{}
|
||||
var routeSeen = struct{}{}
|
||||
|
||||
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
|
||||
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
if sub.client == nil {
|
||||
return
|
||||
return false
|
||||
}
|
||||
client := sub.client
|
||||
client.mu.Lock()
|
||||
@@ -944,13 +944,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
|
||||
if shouldForward {
|
||||
client.srv.broadcastUnSubscribe(sub)
|
||||
}
|
||||
return
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if client.nc == nil {
|
||||
client.mu.Unlock()
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// Update statistics
|
||||
@@ -999,7 +999,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
|
||||
|
||||
client.mu.Unlock()
|
||||
c.pcd[client] = needFlush
|
||||
return
|
||||
return true
|
||||
|
||||
writeErr:
|
||||
if deadlineSet {
|
||||
@@ -1014,6 +1014,10 @@ writeErr:
|
||||
} else {
|
||||
c.Debugf("Error writing msg: %v", err)
|
||||
}
|
||||
// Honor at most once semantic:
|
||||
// treat message that we attempted to send as actually sent
|
||||
// and don't let a higher-level code an attempt to resend it.
|
||||
return true
|
||||
}
|
||||
|
||||
// processMsg is called to process an inbound msg from a client.
|
||||
@@ -1131,6 +1135,7 @@ func (c *client) processMsg(msg []byte) {
|
||||
si := len(msgh)
|
||||
|
||||
isRoute := c.typ == ROUTER
|
||||
isRouteQsub := false
|
||||
|
||||
// If we are a route and we have a queue subscription, deliver direct
|
||||
// since they are sent direct via L2 semantics. If the match is a queue
|
||||
@@ -1139,53 +1144,69 @@ func (c *client) processMsg(msg []byte) {
|
||||
if sub, ok := srv.routeSidQueueSubscriber(c.pa.sid); ok {
|
||||
if sub != nil {
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
if c.deliverMsg(sub, mh, msg) {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
isRouteQsub = true
|
||||
// for queue subscription try hard to deliver a message at least once.
|
||||
// Right now we know fo sure that it's a queue subscription and
|
||||
// we didn't make a delivery attempt, because either a subscriber limit
|
||||
// was exceeded or a subscription is already gone.
|
||||
// So, let a code below find yet another matching subscription.
|
||||
// We are at risk that a message might go forth and back
|
||||
// between routes during these attempts, but at the end
|
||||
// it shall either be delivered (at most once) or drop.
|
||||
c.Debugf("Re-sending message of a detached queue sid %s", c.pa.sid)
|
||||
}
|
||||
}
|
||||
|
||||
// Used to only send normal subscriptions once across a given route.
|
||||
var rmap map[string]struct{}
|
||||
// Don't process normal subscriptions in case of a queue subscription resend.
|
||||
// Otherwise, we'd end up with potentially delivering the same message twice.
|
||||
if !isRouteQsub {
|
||||
// Used to only send normal subscriptions once across a given route.
|
||||
var rmap map[string]struct{}
|
||||
|
||||
// Loop over all normal subscriptions that match.
|
||||
// Loop over all normal subscriptions that match.
|
||||
|
||||
for _, sub := range r.psubs {
|
||||
// Check if this is a send to a ROUTER, make sure we only send it
|
||||
// once. The other side will handle the appropriate re-processing
|
||||
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
|
||||
if sub.client.typ == ROUTER {
|
||||
// Skip if sourced from a ROUTER and going to another ROUTER.
|
||||
// This is 1-Hop semantics for ROUTERs.
|
||||
if isRoute {
|
||||
continue
|
||||
}
|
||||
// Check to see if we have already sent it here.
|
||||
if rmap == nil {
|
||||
rmap = make(map[string]struct{}, srv.numRoutes())
|
||||
}
|
||||
sub.client.mu.Lock()
|
||||
if sub.client.nc == nil || sub.client.route == nil ||
|
||||
sub.client.route.remoteID == "" {
|
||||
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
|
||||
for _, sub := range r.psubs {
|
||||
// Check if this is a send to a ROUTER, make sure we only send it
|
||||
// once. The other side will handle the appropriate re-processing
|
||||
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
|
||||
if sub.client.typ == ROUTER {
|
||||
// Skip if sourced from a ROUTER and going to another ROUTER.
|
||||
// This is 1-Hop semantics for ROUTERs.
|
||||
if isRoute {
|
||||
continue
|
||||
}
|
||||
// Check to see if we have already sent it here.
|
||||
if rmap == nil {
|
||||
rmap = make(map[string]struct{}, srv.numRoutes())
|
||||
}
|
||||
sub.client.mu.Lock()
|
||||
if sub.client.nc == nil || sub.client.route == nil ||
|
||||
sub.client.route.remoteID == "" {
|
||||
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
|
||||
sub.client.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
if _, ok := rmap[sub.client.route.remoteID]; ok {
|
||||
c.Debugf("Ignoring route, already processed")
|
||||
sub.client.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
rmap[sub.client.route.remoteID] = routeSeen
|
||||
sub.client.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
if _, ok := rmap[sub.client.route.remoteID]; ok {
|
||||
c.Debugf("Ignoring route, already processed")
|
||||
sub.client.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
rmap[sub.client.route.remoteID] = routeSeen
|
||||
sub.client.mu.Unlock()
|
||||
// Normal delivery
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
}
|
||||
// Normal delivery
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
}
|
||||
|
||||
// Now process any queue subs we have if not a route
|
||||
if !isRoute {
|
||||
// Now process any queue subs we have if not a route.
|
||||
// ... Or if it's a route and we need to resend.
|
||||
if isRouteQsub || !isRoute {
|
||||
// Check to see if we have our own rand yet. Global rand
|
||||
// has contention with lots of clients, etc.
|
||||
if c.cache.prand == nil {
|
||||
@@ -1194,11 +1215,22 @@ func (c *client) processMsg(msg []byte) {
|
||||
// Process queue subs
|
||||
for i := 0; i < len(r.qsubs); i++ {
|
||||
qsubs := r.qsubs[i]
|
||||
index := c.cache.prand.Intn(len(qsubs))
|
||||
sub := qsubs[index]
|
||||
if sub != nil {
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
// Iterate over all subscribed clients starting at a random index
|
||||
// until we find one that's able to deliver a message.
|
||||
// Drop a message on the floor if there are noone.
|
||||
start_index := c.cache.prand.Intn(len(qsubs))
|
||||
for i := 0; i < len(qsubs); i++ {
|
||||
index := (start_index + i) % len(qsubs)
|
||||
sub := qsubs[index]
|
||||
if sub != nil {
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
if isRouteQsub {
|
||||
c.Tracef("Re-sending msg of %s to %s", c.pa.sid, sub.sid)
|
||||
}
|
||||
if c.deliverMsg(sub, mh, msg) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user