JetStream Clustering WIP

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-12-11 09:22:43 -08:00
parent 9c2bf8e4a9
commit f0cdf89c61
294 changed files with 43709 additions and 95909 deletions

View File

@@ -373,6 +373,8 @@ func TestNoRaceQueueSubWeightOrderMultipleConnections(t *testing.T) {
info := checkInfoMsg(t, rc)
info.ID = routeID
info.Name = routeID
b, err := json.Marshal(info)
if err != nil {
t.Fatalf("Could not marshal test route info: %v", err)
@@ -442,10 +444,14 @@ func TestNoRaceClusterLeaksSubscriptions(t *testing.T) {
numResponses := 100
repliers := make([]*nats.Conn, 0, numResponses)
var noOpErrHandler = func(_ *nats.Conn, _ *nats.Subscription, _ error) {}
// Create 100 repliers
for i := 0; i < 50; i++ {
nc1, _ := nats.Connect(urlA)
nc1.SetErrorHandler(noOpErrHandler)
nc2, _ := nats.Connect(urlB)
nc2.SetErrorHandler(noOpErrHandler)
repliers = append(repliers, nc1, nc2)
nc1.Subscribe("test.reply", func(m *nats.Msg) {
m.Respond([]byte("{\"sender\": 22 }"))
@@ -469,11 +475,13 @@ func TestNoRaceClusterLeaksSubscriptions(t *testing.T) {
// Create 8 queue Subscribers for responses.
for i := 0; i < 8; i++ {
nc, _ := nats.Connect(servers)
nc.SetErrorHandler(noOpErrHandler)
nc.ChanQueueSubscribe(inbox, grp, msgs)
nc.Flush()
defer nc.Close()
}
nc, _ := nats.Connect(servers)
nc.SetErrorHandler(noOpErrHandler)
nc.PublishRequest("test.reply", inbox, req)
defer nc.Close()