From 0bd92e85dab3ec56de7db09fcdc7ca2b3603a27d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 2 May 2021 20:46:12 -0700 Subject: [PATCH] Add in formal support for multiple JetStream domains across leafnodes. This CL adds in support for multiple JetStream domains using mapped subjects. Mapping subjects aligns well with the JetStream context APIPrefix in clients. Signed-off-by: Derek Collison --- server/accounts.go | 8 +- server/client.go | 3 +- server/jetstream.go | 23 +++- server/jetstream_api.go | 194 +++++++++++++++++++++------- server/jetstream_cluster_test.go | 209 ++++++++++++++++++++++++++++--- server/leafnode.go | 57 +++++++-- server/parser.go | 25 ++-- server/server.go | 2 +- 8 files changed, 435 insertions(+), 86 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index f3e045fd..88363274 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -574,7 +574,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { if err != nil { return err } - if d.Cluster == "" { + if d.Cluster == _EMPTY_ { m.dests = append(m.dests, &destination{tr, d.Weight}) } else { // We have a cluster scoped filter. @@ -644,6 +644,12 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { // If we did not replace add to the end. a.mappings = append(a.mappings, m) + // If we have connected leafnodes make sure to update. + if len(a.lleafs) > 0 { + for _, lc := range a.lleafs { + lc.forceAddToSmap(src) + } + } return nil } diff --git a/server/client.go b/server/client.go index cf6ff14d..2470657a 100644 --- a/server/client.go +++ b/server/client.go @@ -1141,7 +1141,7 @@ func (c *client) readLoop(pre []byte) { // Check if the account has mappings and if so set the local readcache flag. // We check here to make sure any changes such as config reload are reflected here. - if c.kind == CLIENT { + if c.kind == CLIENT || c.kind == LEAF { if acc.hasMappings() { c.in.flags.set(hasMappings) } else { @@ -3397,6 +3397,7 @@ func (c *client) processInboundMsg(msg []byte) { func (c *client) selectMappedSubject() bool { nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject)) if changed { + c.pa.mapped = c.pa.subject c.pa.subject = []byte(nsubj) } return changed diff --git a/server/jetstream.go b/server/jetstream.go index db25b1b3..8b1a0bf6 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -443,7 +443,6 @@ func (s *Server) configJetStream(acc *Account) error { s.switchAccountToInterestMode(acc.GetName()) } } - acc.jsLimits = nil } else if acc != s.SystemAccount() { if acc.JetStreamEnabled() { acc.DisableJetStream() @@ -684,6 +683,12 @@ func (s *Server) getJetStream() *jetStream { return js } +func (a *Account) assignJetStreamLimits(limits *JetStreamAccountLimits) { + a.mu.Lock() + a.jsLimits = limits + a.mu.Unlock() +} + // EnableJetStream will enable JetStream on this account with the defined limits. // This is a helper for JetStreamEnableAccount. func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { @@ -700,9 +705,16 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.mu.Unlock() js := s.getJetStream() + if js == nil { + // Place limits here so we know that the account is configured for JetStream. + if limits == nil { + limits = dynamicJSAccountLimits + } + a.assignJetStreamLimits(limits) return ErrJetStreamNotEnabled } + if s.SystemAccount() == a { return fmt.Errorf("jetstream can not be enabled on the system account") } @@ -711,6 +723,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { if limits == nil { limits = js.dynamicAccountLimits() } + a.assignJetStreamLimits(limits) js.mu.Lock() // Check the limits against existing reservations. @@ -1147,6 +1160,14 @@ func (js *jetStream) disableJetStream(jsa *jsAccount) error { return nil } +// jetStreamConfigured reports whether the account has JetStream configured, regardless of this +// servers JetStream status. +func (a *Account) jetStreamConfigured() bool { + a.mu.RLock() + defer a.mu.RUnlock() + return a.jsLimits != nil +} + // JetStreamEnabled is a helper to determine if jetstream is enabled for an account. func (a *Account) JetStreamEnabled() bool { if a == nil { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 2cce01a1..e5c8d6e1 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -886,6 +886,11 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, rep } if !acc.JetStreamEnabled() { + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() > 0 { + return + } resp.Error = jsNotEnabledErr } else { stats := acc.JetStreamUsage() @@ -1158,8 +1163,12 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } var cfg StreamConfig @@ -1336,8 +1345,12 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } var ncfg StreamConfig @@ -1416,8 +1429,12 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } @@ -1541,8 +1558,12 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } @@ -1625,8 +1646,12 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } // No stream present. @@ -1657,8 +1682,12 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } @@ -1750,8 +1779,12 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, sub } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if !isEmptyRequest(msg) { @@ -1857,8 +1890,12 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, s } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if !isEmptyRequest(msg) { @@ -1931,8 +1968,12 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, subject } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if isEmptyRequest(msg) { @@ -2179,8 +2220,12 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if !isEmptyRequest(msg) { @@ -2248,8 +2293,12 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } // No stream present. @@ -2274,8 +2323,12 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if isEmptyRequest(msg) { @@ -2353,8 +2406,12 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } // No stream present. @@ -2379,8 +2436,12 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if isEmptyRequest(msg) { @@ -2448,8 +2509,12 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } // No stream present. @@ -2482,10 +2547,15 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } + if !isEmptyRequest(msg) { resp.Error = jsNotEmptyRequestErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -2560,6 +2630,16 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r return } + if !acc.JetStreamEnabled() { + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } + return + } + s.processStreamRestore(ci, acc, &req.Config, subject, reply, string(msg)) } @@ -2789,6 +2869,16 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, return } + if !acc.JetStreamEnabled() { + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp)) + } + return + } + var req JSApiStreamSnapshotRequest if err := json.Unmarshal(msg, &req); err != nil { resp.Error = jsInvalidJSONErr @@ -3008,8 +3098,12 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if streamName != req.Stream { @@ -3105,8 +3199,12 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } @@ -3224,8 +3322,12 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } @@ -3315,8 +3417,12 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re if isLeader && ca == nil { // We can't find the consumer, so mimic what would be the errors below. if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if sa == nil { @@ -3420,8 +3526,12 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject, } if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + // This local account is not enabled, but we need to check of we are using leafnodes to bridge + // this account and if so not answer locally. + if acc.NumLeafNodes() == 0 { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } return } if !isEmptyRequest(msg) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5158acff..40708337 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3972,7 +3972,7 @@ func TestJetStreamClusterSuperClusterMetaPlacement(t *testing.T) { } // Client based API - s := sc.randomCluster().randomServer() + s := sc.randomServer() nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) if err != nil { t.Fatalf("Failed to create system client: %v", err) @@ -4022,7 +4022,7 @@ func TestJetStreamClusterSuperClusterBasics(t *testing.T) { defer sc.shutdown() // Client based API - s := sc.randomCluster().randomServer() + s := sc.randomServer() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -4145,7 +4145,7 @@ func TestJetStreamClusterSuperClusterPeerReassign(t *testing.T) { defer sc.shutdown() // Client based API - s := sc.randomCluster().randomServer() + s := sc.randomServer() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -5688,7 +5688,7 @@ func TestJetStreamClusterSuperClusterAndSingleLeafNodeWithSharedSystemAccount(t t.Fatalf("Expected to be placed in leafnode with %q as cluster name, got %q", "LNS", si.Cluster.Name) } // Now check we can place on here as well but connect to the hub. - nc, js = jsClientConnect(t, sc.randomCluster().randomServer()) + nc, js = jsClientConnect(t, sc.randomServer()) defer nc.Close() si, err = js.AddStream(&nats.StreamConfig{ @@ -5704,6 +5704,133 @@ func TestJetStreamClusterSuperClusterAndSingleLeafNodeWithSharedSystemAccount(t } } +// Multiple JS domains. +func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T) { + c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 3, 14333, true) + defer c.shutdown() + + ln := c.createSingleLeafNodeNoSystemAccount() + defer ln.Shutdown() + + // The setup here has a single leafnode server with two accounts. One has JS, the other does not. + // We want to to test the following. + // 1. For the account without JS, we simply will pass through to the HUB. Meaning since our local account + // does not have it, we simply inherit the hub's by default. + // 2. For the JS enabled account, we are isolated and use our local one only. + + // Check behavior of the account without JS. + // Normally this should fail since our local account is not enabled. However, since we are bridging + // via the leafnode we expect this to work here. + nc, js := jsClientConnect(t, ln, nats.UserInfo("n", "p")) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Cluster == nil || si.Cluster.Name != "HUB" { + t.Fatalf("Expected stream to be placed in the \"HUB\"") + } + // Do some other API calls. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "C1", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + seen := 0 + for name := range js.StreamNames() { + seen++ + if name != "TEST" { + t.Fatalf("Expected only %q but got %q", "TEST", name) + } + } + if seen != 1 { + t.Fatalf("Expected only 1 stream, got %d", seen) + } + if _, err := js.StreamInfo("TEST"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := js.PurgeStream("TEST"); err != nil { + t.Fatalf("Unexpected purge error: %v", err) + } + if err := js.DeleteConsumer("TEST", "C1"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.UpdateStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"bar"}, Replicas: 2}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := js.DeleteStream("TEST"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Now check the enabled account. + // Check the enabled account only talks to its local JS domain by default. + nc, js = jsClientConnect(t, ln, nats.UserInfo("y", "p")) + defer nc.Close() + + sub, err := nc.SubscribeSync(JSAdvisoryStreamCreatedPre + ".>") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + si, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Cluster != nil { + t.Fatalf("Expected no cluster designation for stream since created on single LN server") + } + + // Wait for a bit and make sure we only get one of these. + // The HUB domain should be cut off by default. + time.Sleep(200 * time.Millisecond) + checkSubsPending(t, sub, 1) + // Drain. + for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) { + } + + // Now try to talk to the HUB JS domain through a new context that uses a different mapped subject. + // This is similar to how we let users cross JS domains between accounts as well. + js, err = nc.JetStream(nats.APIPrefix("$JS.HUB.API")) + if err != nil { + t.Fatalf("Unexpected error getting JetStream context: %v", err) + } + // This should fail here with no responders. + if _, err := js.AccountInfo(); err != nats.ErrNoResponders { + t.Fatalf("Unexpected error: %v", err) + } + + // Now add in a mapping to the connected account in the HUB. + // This aligns with the APIPrefix context above and works across leafnodes. + // TODO(dlc) - Should we have a mapping section for leafnode solicit? + c.addSubjectMapping("ONE", "$JS.HUB.API.>", "$JS.API.>") + + // How it should work. + if _, err := js.AccountInfo(); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Make sure we can add a stream, etc. + si, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Cluster == nil || si.Cluster.Name != "HUB" { + t.Fatalf("Expected stream to be placed in the \"HUB\"") + } + +} + func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) { c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 33133, false) defer c.shutdown() @@ -6067,6 +6194,10 @@ func (sc *supercluster) shutdown() { } } +func (sc *supercluster) randomServer() *Server { + return sc.randomCluster().randomServer() +} + var jsClusterAccountsTempl = ` listen: 127.0.0.1:-1 server_name: %s @@ -6082,13 +6213,11 @@ var jsClusterAccountsTempl = ` routes = [%s] } - no_auth_user: u + no_auth_user: one accounts { - ONE { - users = [ { user: "u", pass: "s3cr3t!" } ] - jetstream: enabled - } + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + TWO { users = [ { user: "two", pass: "p" } ]; jetstream: enabled } $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` @@ -6243,13 +6372,11 @@ func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) * func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster { // Create our leafnode cluster template first. - c := sc.randomCluster() - return c.createLeafNodes(clusterName, numServers) + return sc.randomCluster().createLeafNodes(clusterName, numServers) } func (sc *supercluster) createSingleLeafNode() *Server { - c := sc.randomCluster() - return c.createLeafNode() + return sc.randomCluster().createLeafNode() } func (sc *supercluster) leader() *Server { @@ -6425,12 +6552,10 @@ func createMixedModeCluster(t *testing.T, clusterName string, numJsServers, numN // This will create a cluster that is explicitly configured for the routes, etc. // and also has a defined clustername. All configs for routes and cluster name will be the same. func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster { - t.Helper() return createJetStreamClusterWithTemplate(t, jsClusterTempl, clusterName, numServers) } func createJetStreamClusterWithTemplate(t *testing.T, tmpl string, clusterName string, numServers int) *cluster { - t.Helper() startPorts := []int{7_022, 9_022, 11_022, 15_022} port := startPorts[rand.Intn(len(startPorts))] return createJetStreamCluster(t, tmpl, clusterName, _EMPTY_, numServers, port, true) @@ -6484,6 +6609,39 @@ func (c *cluster) addInNewServer() *Server { return s } +// This is tied to jsClusterAccountsTempl, so changes there to users needs to be reflected here. +func (c *cluster) createSingleLeafNodeNoSystemAccount() *Server { + as := c.randomServer() + lno := as.getOpts().LeafNode + ln1 := fmt.Sprintf("nats://one:p@%s:%d", lno.Host, lno.Port) + ln2 := fmt.Sprintf("nats://two:p@%s:%d", lno.Host, lno.Port) + conf := fmt.Sprintf(jsClusterSingleLeafNodeTempl, createDir(c.t, JetStreamStoreDir), ln1, ln2) + s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf))) + c.servers = append(c.servers, s) + c.opts = append(c.opts, o) + + checkLeafNodeConnectedCount(c.t, as, 2) + + return s +} + +var jsClusterSingleLeafNodeTempl = ` + listen: 127.0.0.1:-1 + server_name: LNJS + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + + leaf { remotes [ + { urls: [ %s ], account: "JSY" } + { urls: [ %s ], account: "JSN" } ] + } + + accounts { + JSY { users = [ { user: "y", pass: "p" } ]; jetstream: true } + JSN { users = [ { user: "n", pass: "p" } ] } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + var jsClusterTemplWithLeafNode = ` listen: 127.0.0.1:-1 server_name: %s @@ -6524,8 +6682,7 @@ var jsLeafFrag = ` ` func (c *cluster) createLeafNodes(clusterName string, numServers int) *cluster { - const startClusterPort = 22111 - return c.createLeafNodesWithStartPort(clusterName, numServers, startClusterPort) + return c.createLeafNodesWithStartPort(clusterName, numServers, 22111) } func (c *cluster) createLeafNodesWithStartPort(clusterName string, numServers int, portStart int) *cluster { @@ -6569,6 +6726,24 @@ func (c *cluster) createLeafNodesWithStartPortAndMQTT(clusterName string, numSer return lc } +// Will add in the mapping for the account to each server. +func (c *cluster) addSubjectMapping(account, src, dest string) { + for _, s := range c.servers { + if s.ClusterName() != c.name { + continue + } + acc, err := s.LookupAccount(account) + if err != nil { + c.t.Fatalf("Unexpected error on %v: %v", s, err) + } + if err := acc.AddMapping(src, dest); err != nil { + c.t.Fatalf("Error adding mapping: %v", err) + } + } + // Make sure interest propagates. + time.Sleep(200 * time.Millisecond) +} + // Adjust limits for the given account. func (c *cluster) updateLimits(account string, newLimits *JetStreamAccountLimits) { c.t.Helper() diff --git a/server/leafnode.go b/server/leafnode.go index 7eab112d..fd477733 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -140,18 +140,26 @@ func (s *Server) checkForSystemRemoteLeaf(remotes []*RemoteLeafOpts) { sysShared, sysAcc := false, s.SystemAccount().GetName() for _, r := range remotes { if r.LocalAccount == sysAcc { + s.Noticef("Detected sharing of the system account across a leafnode") sysShared = true break } } - if !sysShared { - return - } - s.Noticef("Detected sharing of the system account across a leafnode") for _, r := range remotes { - if r.LocalAccount != sysAcc { + if r.LocalAccount == sysAcc { + continue + } + + // We want to deny normal JS API if we share the system account or our local account has JS enabled. + if sysShared { s.addInJSDeny(r) + } else { + // Here we want to suppress if this local account has JS enabled. + // This is regardless of whether or not this server is actually running JS. + if acc, _ := s.lookupAccount(r.LocalAccount); acc != nil && acc.jetStreamConfigured() { + s.addInJSDeny(r) + } } } } @@ -1284,6 +1292,7 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { _subs := [32]*subscription{} subs := _subs[:0] ims := []string{} + acc.mu.Lock() accName := acc.Name accNTag := acc.nameTag @@ -1306,6 +1315,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { } ims = append(ims, isubj) } + // Likewise for mappings. + for _, m := range acc.mappings { + ims = append(ims, m.src) + } + // Create a unique subject that will be used for loop detection. lds := acc.lds if lds == _EMPTY_ { @@ -1494,6 +1508,20 @@ func (c *client) updateSmap(sub *subscription, delta int32) { c.mu.Unlock() } +// Used to force add subjects to the subject map. +func (c *client) forceAddToSmap(subj string) { + c.mu.Lock() + defer c.mu.Unlock() + + n := c.leaf.smap[subj] + if n != 0 { + return + } + // Place into the map since it was not there. + c.leaf.smap[subj] = 1 + c.sendLeafNodeSubUpdate(subj, 1) +} + // Send the subscription interest change to the other side. // Lock should be held. func (c *client) sendLeafNodeSubUpdate(key string, n int32) { @@ -1908,13 +1936,20 @@ func (c *client) processInboundLeafMsg(msg []byte) { c.in.bytes += int32(len(msg) - LEN_CR_LF) // Check pub permissions - if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) { - if c.isHubLeafNode() { - c.leafPubPermViolation(c.pa.subject) - } else { - c.Debugf("Not permitted to receive from %q", c.pa.subject) + if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) { + subject := c.pa.subject + // If this subject was mapped we need to check the original subject, not the new one. + if len(c.pa.mapped) > 0 { + subject = c.pa.mapped + } + if !c.pubAllowed(string(subject)) { + if c.isHubLeafNode() { + c.leafPubPermViolation(subject) + } else { + c.Debugf("Not permitted to receive from %q", subject) + } + return } - return } srv := c.srv diff --git a/server/parser.go b/server/parser.go index 1b8ea0c7..b7dd7634 100644 --- a/server/parser.go +++ b/server/parser.go @@ -41,6 +41,7 @@ type pubArg struct { account []byte subject []byte deliver []byte + mapped []byte reply []byte szb []byte hdb []byte @@ -267,6 +268,7 @@ func (c *client) parse(buf []byte) error { if err := c.processHeaderPub(arg); err != nil { return err } + c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD // If we don't have a saved buffer then jump ahead with // the index. If this overruns what is left we fall out @@ -405,17 +407,7 @@ func (c *client) parse(buf []byte) error { if err := c.processPub(arg); err != nil { return err } - // Check if we have and account mappings or tees or filters. - // FIXME(dlc) - Probably better way to do this. - // Could add in cache but will be tricky since results based on pub subject are dynamic - // due to wildcard matching and weight sets. - if c.kind == CLIENT && c.in.flags.isSet(hasMappings) { - old := c.pa.subject - changed := c.selectMappedSubject() - if trace && changed { - c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", old, c.pa.subject))) - } - } + c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD // If we don't have a saved buffer then jump ahead with // the index. If this overruns what is left we fall out @@ -470,14 +462,23 @@ func (c *client) parse(buf []byte) error { } else { c.msgBuf = buf[c.as : i+1] } + + // Check for mappings. + if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) { + changed := c.selectMappedSubject() + if trace && changed { + c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject))) + } + } if trace { c.traceMsg(c.msgBuf) } + c.processInboundMsg(c.msgBuf) c.argBuf, c.msgBuf, c.header = nil, nil, nil c.drop, c.as, c.state = 0, i+1, OP_START // Drop all pub args - c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject = nil, nil, nil, nil, nil + c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil lmsg = false case OP_A: diff --git a/server/server.go b/server/server.go index a8443330..96b0aab5 100644 --- a/server/server.go +++ b/server/server.go @@ -1304,7 +1304,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) { } // If we have a resolver see if it can fetch the account. if s.AccountResolver() == nil { - return nil, ErrNoAccountResolver + return nil, ErrMissingAccount } return s.fetchAccount(name) }