From 98c1f0ecb2c683df08bddd5c9590c7fda2c0b3ee Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 29 Mar 2022 19:02:41 -0600 Subject: [PATCH] Fixed some data race and some flappers Got a data race: ``` ================== WARNING: DATA RACE Write at 0x00c001c736b0 by goroutine 605: runtime.mapassign_faststr() /home/travis/.gimme/versions/go1.17.8.linux.amd64/src/runtime/map_faststr.go:202 +0x0 github.com/nats-io/nats-server/v2/server.(*Account).addServiceImport() /home/travis/gopath/src/github.com/nats-io/nats-server/server/accounts.go:1868 +0xb7b github.com/nats-io/nats-server/v2/server.(*Account).AddServiceImportWithClaim() ... Previous read at 0x00c001c736b0 by goroutine 301: runtime.mapaccess2_faststr() /home/travis/.gimme/versions/go1.17.8.linux.amd64/src/runtime/map_faststr.go:107 +0x0 github.com/nats-io/nats-server/v2/server.(*Server).registerSystemImports() /home/travis/gopath/src/github.com/nats-io/nats-server/server/events.go:1577 +0x284 github.com/nats-io/nats-server/v2/server.(*Server).updateAccountClaimsWithRefresh() ... ``` Also, remove some condition in gateway.go on how we were checking if a subject was a serviec reply, which was causing a test to flap. Finally, used AckSync() in a rest (instead of m.Respond(nil)) to prevent it from flapping. Signed-off-by: Ivan Kozlovic --- server/events.go | 4 ++-- server/gateway.go | 2 +- server/gateway_test.go | 6 +++--- server/jetstream_test.go | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/events.go b/server/events.go index b490a2cb..8955812d 100644 --- a/server/events.go +++ b/server/events.go @@ -1574,12 +1574,12 @@ func (s *Server) registerSystemImports(a *Account) { // Add in this to the account in 2 places. // "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.PING.CONNZ" - if _, ok := a.imports.services[connzSubj]; !ok { + if !a.serviceImportExists(connzSubj) { if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil { s.Errorf("Error setting up system service imports for account: %v", err) } } - if _, ok := a.imports.services[accConnzReqSubj]; !ok { + if !a.serviceImportExists(accConnzReqSubj) { if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil { s.Errorf("Error setting up system service imports for account: %v", err) } diff --git a/server/gateway.go b/server/gateway.go index 75022d03..8e4939d8 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2824,7 +2824,7 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { // If route is nil, we will process the incoming message locally. if route == nil { // Check if this is a service reply subject (_R_) - isServiceReply := len(acc.imports.services) > 0 && isServiceReply(c.pa.subject) + isServiceReply := isServiceReply(c.pa.subject) var queues [][]byte if len(r.psubs)+len(r.qsubs) > 0 { diff --git a/server/gateway_test.go b/server/gateway_test.go index 234462c3..bbde745c 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -3995,13 +3995,13 @@ func TestGatewayServiceImportWithQueue(t *testing.T) { // For B, we expect it to send to gateway on the two subjects: test.request // and foo.request then send the reply to the client and optimistically - // to the other gateway. Also send on _R_. + // to the other gateway. if i == 0 { - expected = 5 + expected = 4 } else { // The second time, one of the accounts will be suppressed and the reply going // back so we should get only 2 more messages. - expected = 7 + expected = 6 } vz, _ = sb.Varz(nil) if vz.OutMsgs != expected { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 883be034..9e99d9df 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -2817,7 +2817,7 @@ func TestJetStreamWorkQueueAckWaitRedelivery(t *testing.T) { t.Fatalf("Expected these to be marked as redelivered") } // Ack the message here. - m.Respond(nil) + m.AckSync() } if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 0 { @@ -16714,8 +16714,8 @@ func TestJetStreamLimits(t *testing.T) { listen: 127.0.0.1:-1 server_name: %s jetstream: { - max_mem_store: 2MB, - max_file_store: 8MB, + max_mem_store: 2MB, + max_file_store: 8MB, store_dir: '%s', limits: {max_ack_pending: 1000, duplicate_window: "1m"} }