Fixed panic when server needs to send message to more than 8 routes

Resolves #955

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2019-04-17 13:02:41 -06:00
parent 0c8bf0ee8b
commit 288f00ff81
2 changed files with 53 additions and 4 deletions

View File

@@ -2352,15 +2352,15 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
}
}
var rt *routeTarget
lrts := len(c.in.rts)
// If we are here we do not have the sub yet in our list
// If we have to grow do so here.
if len(c.in.rts) == cap(c.in.rts) {
if lrts == cap(c.in.rts) {
c.in.rts = append(c.in.rts, routeTarget{})
}
var rt *routeTarget
lrts := len(c.in.rts)
c.in.rts = c.in.rts[:lrts+1]
rt = &c.in.rts[lrts]
rt.sub = sub

View File

@@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -1025,3 +1026,51 @@ func TestRouteSendLocalSubsWithLowMaxPending(t *testing.T) {
// Check that all subs have been sent ok
checkExpectedSubs(t, numSubs, srvA, srvB)
}
func TestRouteNoCrashOnAddingSubToRoute(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
numRoutes := routeTargetInit + 2
total := int32(numRoutes)
count := int32(0)
ch := make(chan bool, 1)
cb := func(_ *nats.Msg) {
if n := atomic.AddInt32(&count, 1); n == total {
ch <- true
}
}
var servers []*Server
servers = append(servers, s)
seedURL := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port)
for i := 0; i < numRoutes; i++ {
ropts := DefaultOptions()
ropts.Routes = RoutesFromStr(seedURL)
rs := RunServer(ropts)
defer rs.Shutdown()
servers = append(servers, rs)
// Create a sub on each routed server
nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", ropts.Host, ropts.Port))
defer nc.Close()
natsSub(t, nc, "foo", cb)
}
checkClusterFormed(t, servers...)
// Make sure all subs are registered in s.
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if s.globalAccount().TotalSubs() != int(numRoutes) {
return fmt.Errorf("Not all %v routed subs were registered", numRoutes)
}
return nil
})
pubNC := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer pubNC.Close()
natsPub(t, pubNC, "foo", []byte("hello world!"))
waitCh(t, ch, "Did not get all messages")
}