From 49cd38c0648fb3a6b39cfdf4e068f5d4cd79dcb6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 2 Mar 2021 06:36:57 -0800 Subject: [PATCH] Enable cross account behaviors for mirrors and sources. Signed-off-by: Derek Collison --- server/client.go | 10 +- server/jetstream_api.go | 3 + server/jetstream_cluster_test.go | 151 ++++++++++++++++++++++++++++++- server/raft.go | 7 +- server/stream.go | 63 +++++++++++-- 5 files changed, 219 insertions(+), 15 deletions(-) diff --git a/server/client.go b/server/client.go index 9705ddf3..63d5c024 100644 --- a/server/client.go +++ b/server/client.go @@ -3035,6 +3035,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g srv.trackGWReply(client, c.pa.reply) } client.mu.Unlock() + // Internal account clients are for service imports and need the '\r\n'. if client.kind == ACCOUNT { sub.icb(sub, c, string(subject), string(reply), msg) @@ -3743,11 +3744,17 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // Pick correct to subject. If we matched on a wildcard use the literal publish subject. to, subject := si.to, string(c.pa.subject) + + hadPrevSi := c.pa.psi != nil if si.tr != nil { // FIXME(dlc) - This could be slow, may want to look at adding cache to bare transforms? to, _ = si.tr.transformSubject(subject) } else if si.usePub { - to = subject + if hadPrevSi && c.pa.psi.tr != nil { + to, _ = c.pa.psi.tr.transformSubject(subject) + } else { + to = subject + } } // Now check to see if this account has mappings that could affect the service import. // Can't use non-locked trick like in processInboundClientMsg, so just call into selectMappedSubject @@ -3761,7 +3768,6 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // Change this so that we detect recursion // Remember prior. share := si.share - hadPrevSi := c.pa.psi != nil if hadPrevSi { share = c.pa.psi.share } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 9b8f6289..6278b08f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -32,6 +32,9 @@ import ( // Request API subjects for JetStream. const ( + // JSApiPrefix + JSApiPrefix = "$JS.API" + // JSApiInfo is for obtaining general information about JetStream for this account. // Will return JSON response. JSApiAccountInfo = "$JS.API.INFO" diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 3dcb7282..b44349a0 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4227,6 +4227,119 @@ func TestJetStreamClusterLeaderStepdown(t *testing.T) { } } +func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, jsClusterMirrorSourceImportsTempl, "C1", 3) + defer c.shutdown() + + // Create source stream under RI account. + s := c.randomServer() + nc, js := jsClientConnect(t, s, nats.UserInfo("rip", "pass")) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 2}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + toSend := 100 + for i := 0; i < toSend; i++ { + if _, err := js.Publish("TEST", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + nc2, _ := jsClientConnect(t, s) + defer nc2.Close() + + // Have to do this direct until we get Go client support. + // Need to match jsClusterMirrorSourceImportsTempl imports. + cfg := StreamConfig{ + Name: "MY_MIRROR_TEST", + Storage: FileStorage, + Mirror: &StreamSource{ + Name: "TEST", + External: &ExternalStream{ + ApiPrefix: "RI.JS.API", + DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS", + }, + }, + } + + req, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resp, err := nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var scResp JSApiStreamCreateResponse + if err := json.Unmarshal(resp.Data, &scResp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if scResp.StreamInfo == nil || scResp.Error != nil { + t.Fatalf("Did not receive correct response: %+v", scResp.Error) + } + + js2, err := nc2.JetStream(nats.MaxWait(50 * time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js2.StreamInfo("MY_MIRROR_TEST") + if err != nil { + t.Fatalf("Could not retrieve stream info") + } + if si.State.Msgs != uint64(toSend) { + return fmt.Errorf("Expected %d msgs, got state: %+v", toSend, si.State) + } + return nil + }) + + // Now do sources as well. + cfg = StreamConfig{ + Name: "MY_SOURCE_TEST", + Storage: FileStorage, + Sources: []*StreamSource{ + &StreamSource{ + Name: "TEST", + External: &ExternalStream{ + ApiPrefix: "RI.JS.API", + DeliverPrefix: "RI.DELIVER.SYNC.SOURCES", + }, + }, + }, + } + + req, err = json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resp, err = nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + scResp.Error = nil + if err := json.Unmarshal(resp.Data, &scResp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if scResp.StreamInfo == nil || scResp.Error != nil { + t.Fatalf("Did not receive correct response: %+v", scResp.Error) + } + + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js2.StreamInfo("MY_SOURCE_TEST") + if err != nil { + t.Fatalf("Could not retrieve stream info") + } + if si.State.Msgs != uint64(toSend) { + return fmt.Errorf("Expected %d msgs, got state: %+v", toSend, si.State) + } + return nil + }) + +} + func TestJetStreamClusterJSAPIImport(t *testing.T) { c := createJetStreamClusterWithTemplate(t, jsClusterImportsTempl, "C1", 3) defer c.shutdown() @@ -4477,6 +4590,40 @@ func (sc *supercluster) randomCluster() *cluster { return clusters[0] } +var jsClusterMirrorSourceImportsTempl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + no_auth_user: dlc + + accounts { + JS { + jetstream: enabled + users = [ { user: "rip", pass: "pass" } ] + exports [ + { service: "$JS.API.CONSUMER.>" } # To create internal consumers to mirror/source. + { stream: "RI.DELIVER.SYNC.>" } # For the mirror/source consumers sending to IA via delivery subject. + ] + } + IA { + jetstream: enabled + users = [ { user: "dlc", pass: "pass" } ] + imports [ + { service: { account: JS, subject: "$JS.API.CONSUMER.>"}, to: "RI.JS.API.CONSUMER.>" } + { stream: { account: JS, subject: "RI.DELIVER.SYNC.>"} } + ] + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + var jsClusterImportsTempl = ` listen: 127.0.0.1:-1 server_name: %s @@ -4582,9 +4729,9 @@ var skip = func(t *testing.T) { t.SkipNow() } -func jsClientConnect(t *testing.T, s *Server) (*nats.Conn, nats.JetStreamContext) { +func jsClientConnect(t *testing.T, s *Server, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext) { t.Helper() - nc, err := nats.Connect(s.ClientURL()) + nc, err := nats.Connect(s.ClientURL(), opts...) if err != nil { t.Fatalf("Failed to create client: %v", err) } diff --git a/server/raft.go b/server/raft.go index abbce9f9..c6aa4312 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2210,7 +2210,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // If we are catching up ignore old catchup subs. // This could happen when we stall or cancel a catchup. - if !isNew && n.catchup != nil && sub != n.catchup.sub { + if !isNew && catchingUp && sub != n.catchup.sub { n.Unlock() n.debug("AppendEntry ignoring old entry from previous catchup") return @@ -2344,8 +2344,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } + // Heartbeat or do we have entries. + isHeartbeat := len(ae.entries) == 0 + // Save to our WAL if we have entries. - if len(ae.entries) > 0 { + if !isHeartbeat { // Only store if an original which will have sub != nil if sub != nil { if err := n.storeToWAL(ae); err != nil { diff --git a/server/stream.go b/server/stream.go index c7e821e3..c76af3ce 100644 --- a/server/stream.go +++ b/server/stream.go @@ -111,10 +111,17 @@ type StreamSourceInfo struct { // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + External *ExternalStream `json:"external,omitempty"` +} + +// ExternalStream allows you to qualify access to a stream source in another account. +type ExternalStream struct { + ApiPrefix string `json:"api"` + DeliverPrefix string `json:"deliver"` } // Stream is a jetstream stream of messages. When we receive a message internally destined @@ -1122,11 +1129,21 @@ func (mset *stream) setupMirrorConsumer() error { } } + // Determine subjects etc. + var deliverSubject string + ext := mset.cfg.Mirror.External + + if ext != nil { + deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".") + } else { + deliverSubject = syncSubject("$JS.M") + } + mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgsC: make(chan *imr, sourceMaxAckPending)} - sub, err := mset.subscribeInternal(syncSubject("$JS.M"), func(sub *subscription, c *client, subject, reply string, rmsg []byte) { + sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { mset.mu.RLock() // Ignore anything not current. - if mset.mirror == nil || sub != mset.mirror.sub || mset.mirror.msgsC == nil { + if mset.mirror == nil || !bytes.Equal(sub.subject, mset.mirror.sub.subject) || mset.mirror.msgsC == nil { mset.mu.RUnlock() return } @@ -1151,6 +1168,10 @@ func (mset *stream) setupMirrorConsumer() error { // Make sure to delete any prior durable consumers. subject := fmt.Sprintf(JSApiConsumerDeleteT, mset.cfg.Mirror.Name, durable) + if ext != nil { + subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) + subject = strings.ReplaceAll(subject, "..", ".") + } mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} req := &CreateConsumerRequest{ @@ -1197,6 +1218,11 @@ func (mset *stream) setupMirrorConsumer() error { b, _ := json.Marshal(req) subject = fmt.Sprintf(JSApiDurableCreateT, mset.cfg.Mirror.Name, durable) + if ext != nil { + subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) + subject = strings.ReplaceAll(subject, "..", ".") + } + mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} go func() { @@ -1276,17 +1302,32 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { si.sseq, si.dseq = 0, 0 durable := mset.sourceDurable(sname) + ssi := mset.streamSource(sname) + + // Determine subjects etc. + var deliverSubject string + ext := ssi.External // Need to delete the old one. subject := fmt.Sprintf(JSApiConsumerDeleteT, sname, durable) + if ext != nil { + subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) + subject = strings.ReplaceAll(subject, "..", ".") + } mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + if ext != nil { + deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".") + } else { + deliverSubject = syncSubject("$JS.S") + } + si.msgsC = make(chan *imr, sourceMaxAckPending) - sub, err := mset.subscribeInternal(syncSubject("$JS.S"), func(sub *subscription, c *client, subject, reply string, rmsg []byte) { + sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { mset.mu.RLock() defer mset.mu.RUnlock() // Ignore anything not current. - if sub != si.sub || si.msgsC == nil { + if si.msgsC == nil || !bytes.Equal(sub.subject, si.sub.subject) { return } hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. @@ -1314,7 +1355,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { }, } // If starting, check any configs. - ssi := mset.streamSource(sname) if seq <= 1 { if ssi.OptStartSeq > 0 { req.Config.OptStartSeq = ssi.OptStartSeq @@ -1347,6 +1387,11 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { b, _ := json.Marshal(req) subject = fmt.Sprintf(JSApiDurableCreateT, sname, durable) + if ext != nil { + subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) + subject = strings.ReplaceAll(subject, "..", ".") + } + mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} go func() {