mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Enable cross account behaviors for mirrors and sources.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3035,6 +3035,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
|
||||
srv.trackGWReply(client, c.pa.reply)
|
||||
}
|
||||
client.mu.Unlock()
|
||||
|
||||
// Internal account clients are for service imports and need the '\r\n'.
|
||||
if client.kind == ACCOUNT {
|
||||
sub.icb(sub, c, string(subject), string(reply), msg)
|
||||
@@ -3743,11 +3744,17 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
|
||||
|
||||
// Pick correct to subject. If we matched on a wildcard use the literal publish subject.
|
||||
to, subject := si.to, string(c.pa.subject)
|
||||
|
||||
hadPrevSi := c.pa.psi != nil
|
||||
if si.tr != nil {
|
||||
// FIXME(dlc) - This could be slow, may want to look at adding cache to bare transforms?
|
||||
to, _ = si.tr.transformSubject(subject)
|
||||
} else if si.usePub {
|
||||
to = subject
|
||||
if hadPrevSi && c.pa.psi.tr != nil {
|
||||
to, _ = c.pa.psi.tr.transformSubject(subject)
|
||||
} else {
|
||||
to = subject
|
||||
}
|
||||
}
|
||||
// Now check to see if this account has mappings that could affect the service import.
|
||||
// Can't use non-locked trick like in processInboundClientMsg, so just call into selectMappedSubject
|
||||
@@ -3761,7 +3768,6 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
|
||||
// Change this so that we detect recursion
|
||||
// Remember prior.
|
||||
share := si.share
|
||||
hadPrevSi := c.pa.psi != nil
|
||||
if hadPrevSi {
|
||||
share = c.pa.psi.share
|
||||
}
|
||||
|
||||
@@ -32,6 +32,9 @@ import (
|
||||
|
||||
// Request API subjects for JetStream.
|
||||
const (
|
||||
// JSApiPrefix
|
||||
JSApiPrefix = "$JS.API"
|
||||
|
||||
// JSApiInfo is for obtaining general information about JetStream for this account.
|
||||
// Will return JSON response.
|
||||
JSApiAccountInfo = "$JS.API.INFO"
|
||||
|
||||
@@ -4227,6 +4227,119 @@ func TestJetStreamClusterLeaderStepdown(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
|
||||
c := createJetStreamClusterWithTemplate(t, jsClusterMirrorSourceImportsTempl, "C1", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
// Create source stream under RI account.
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s, nats.UserInfo("rip", "pass"))
|
||||
defer nc.Close()
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 2}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
toSend := 100
|
||||
for i := 0; i < toSend; i++ {
|
||||
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
nc2, _ := jsClientConnect(t, s)
|
||||
defer nc2.Close()
|
||||
|
||||
// Have to do this direct until we get Go client support.
|
||||
// Need to match jsClusterMirrorSourceImportsTempl imports.
|
||||
cfg := StreamConfig{
|
||||
Name: "MY_MIRROR_TEST",
|
||||
Storage: FileStorage,
|
||||
Mirror: &StreamSource{
|
||||
Name: "TEST",
|
||||
External: &ExternalStream{
|
||||
ApiPrefix: "RI.JS.API",
|
||||
DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
req, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
resp, err := nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
var scResp JSApiStreamCreateResponse
|
||||
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if scResp.StreamInfo == nil || scResp.Error != nil {
|
||||
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
|
||||
}
|
||||
|
||||
js2, err := nc2.JetStream(nats.MaxWait(50 * time.Millisecond))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
si, err := js2.StreamInfo("MY_MIRROR_TEST")
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve stream info")
|
||||
}
|
||||
if si.State.Msgs != uint64(toSend) {
|
||||
return fmt.Errorf("Expected %d msgs, got state: %+v", toSend, si.State)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Now do sources as well.
|
||||
cfg = StreamConfig{
|
||||
Name: "MY_SOURCE_TEST",
|
||||
Storage: FileStorage,
|
||||
Sources: []*StreamSource{
|
||||
&StreamSource{
|
||||
Name: "TEST",
|
||||
External: &ExternalStream{
|
||||
ApiPrefix: "RI.JS.API",
|
||||
DeliverPrefix: "RI.DELIVER.SYNC.SOURCES",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
req, err = json.Marshal(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
resp, err = nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
scResp.Error = nil
|
||||
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if scResp.StreamInfo == nil || scResp.Error != nil {
|
||||
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
|
||||
}
|
||||
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
si, err := js2.StreamInfo("MY_SOURCE_TEST")
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve stream info")
|
||||
}
|
||||
if si.State.Msgs != uint64(toSend) {
|
||||
return fmt.Errorf("Expected %d msgs, got state: %+v", toSend, si.State)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestJetStreamClusterJSAPIImport(t *testing.T) {
|
||||
c := createJetStreamClusterWithTemplate(t, jsClusterImportsTempl, "C1", 3)
|
||||
defer c.shutdown()
|
||||
@@ -4477,6 +4590,40 @@ func (sc *supercluster) randomCluster() *cluster {
|
||||
return clusters[0]
|
||||
}
|
||||
|
||||
var jsClusterMirrorSourceImportsTempl = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
|
||||
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
|
||||
no_auth_user: dlc
|
||||
|
||||
accounts {
|
||||
JS {
|
||||
jetstream: enabled
|
||||
users = [ { user: "rip", pass: "pass" } ]
|
||||
exports [
|
||||
{ service: "$JS.API.CONSUMER.>" } # To create internal consumers to mirror/source.
|
||||
{ stream: "RI.DELIVER.SYNC.>" } # For the mirror/source consumers sending to IA via delivery subject.
|
||||
]
|
||||
}
|
||||
IA {
|
||||
jetstream: enabled
|
||||
users = [ { user: "dlc", pass: "pass" } ]
|
||||
imports [
|
||||
{ service: { account: JS, subject: "$JS.API.CONSUMER.>"}, to: "RI.JS.API.CONSUMER.>" }
|
||||
{ stream: { account: JS, subject: "RI.DELIVER.SYNC.>"} }
|
||||
]
|
||||
}
|
||||
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
||||
}
|
||||
`
|
||||
|
||||
var jsClusterImportsTempl = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
@@ -4582,9 +4729,9 @@ var skip = func(t *testing.T) {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
func jsClientConnect(t *testing.T, s *Server) (*nats.Conn, nats.JetStreamContext) {
|
||||
func jsClientConnect(t *testing.T, s *Server, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext) {
|
||||
t.Helper()
|
||||
nc, err := nats.Connect(s.ClientURL())
|
||||
nc, err := nats.Connect(s.ClientURL(), opts...)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
@@ -2210,7 +2210,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
|
||||
// If we are catching up ignore old catchup subs.
|
||||
// This could happen when we stall or cancel a catchup.
|
||||
if !isNew && n.catchup != nil && sub != n.catchup.sub {
|
||||
if !isNew && catchingUp && sub != n.catchup.sub {
|
||||
n.Unlock()
|
||||
n.debug("AppendEntry ignoring old entry from previous catchup")
|
||||
return
|
||||
@@ -2344,8 +2344,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
}
|
||||
}
|
||||
|
||||
// Heartbeat or do we have entries.
|
||||
isHeartbeat := len(ae.entries) == 0
|
||||
|
||||
// Save to our WAL if we have entries.
|
||||
if len(ae.entries) > 0 {
|
||||
if !isHeartbeat {
|
||||
// Only store if an original which will have sub != nil
|
||||
if sub != nil {
|
||||
if err := n.storeToWAL(ae); err != nil {
|
||||
|
||||
@@ -111,10 +111,17 @@ type StreamSourceInfo struct {
|
||||
|
||||
// StreamSource dictates how streams can source from other streams.
|
||||
type StreamSource struct {
|
||||
Name string `json:"name"`
|
||||
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
|
||||
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
|
||||
FilterSubject string `json:"filter_subject,omitempty"`
|
||||
Name string `json:"name"`
|
||||
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
|
||||
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
|
||||
FilterSubject string `json:"filter_subject,omitempty"`
|
||||
External *ExternalStream `json:"external,omitempty"`
|
||||
}
|
||||
|
||||
// ExternalStream allows you to qualify access to a stream source in another account.
|
||||
type ExternalStream struct {
|
||||
ApiPrefix string `json:"api"`
|
||||
DeliverPrefix string `json:"deliver"`
|
||||
}
|
||||
|
||||
// Stream is a jetstream stream of messages. When we receive a message internally destined
|
||||
@@ -1122,11 +1129,21 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Determine subjects etc.
|
||||
var deliverSubject string
|
||||
ext := mset.cfg.Mirror.External
|
||||
|
||||
if ext != nil {
|
||||
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".")
|
||||
} else {
|
||||
deliverSubject = syncSubject("$JS.M")
|
||||
}
|
||||
|
||||
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgsC: make(chan *imr, sourceMaxAckPending)}
|
||||
sub, err := mset.subscribeInternal(syncSubject("$JS.M"), func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
mset.mu.RLock()
|
||||
// Ignore anything not current.
|
||||
if mset.mirror == nil || sub != mset.mirror.sub || mset.mirror.msgsC == nil {
|
||||
if mset.mirror == nil || !bytes.Equal(sub.subject, mset.mirror.sub.subject) || mset.mirror.msgsC == nil {
|
||||
mset.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
@@ -1151,6 +1168,10 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
|
||||
// Make sure to delete any prior durable consumers.
|
||||
subject := fmt.Sprintf(JSApiConsumerDeleteT, mset.cfg.Mirror.Name, durable)
|
||||
if ext != nil {
|
||||
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
|
||||
subject = strings.ReplaceAll(subject, "..", ".")
|
||||
}
|
||||
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
|
||||
|
||||
req := &CreateConsumerRequest{
|
||||
@@ -1197,6 +1218,11 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
|
||||
b, _ := json.Marshal(req)
|
||||
subject = fmt.Sprintf(JSApiDurableCreateT, mset.cfg.Mirror.Name, durable)
|
||||
if ext != nil {
|
||||
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
|
||||
subject = strings.ReplaceAll(subject, "..", ".")
|
||||
}
|
||||
|
||||
mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0}
|
||||
|
||||
go func() {
|
||||
@@ -1276,17 +1302,32 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
si.sseq, si.dseq = 0, 0
|
||||
|
||||
durable := mset.sourceDurable(sname)
|
||||
ssi := mset.streamSource(sname)
|
||||
|
||||
// Determine subjects etc.
|
||||
var deliverSubject string
|
||||
ext := ssi.External
|
||||
|
||||
// Need to delete the old one.
|
||||
subject := fmt.Sprintf(JSApiConsumerDeleteT, sname, durable)
|
||||
if ext != nil {
|
||||
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
|
||||
subject = strings.ReplaceAll(subject, "..", ".")
|
||||
}
|
||||
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
|
||||
|
||||
if ext != nil {
|
||||
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".")
|
||||
} else {
|
||||
deliverSubject = syncSubject("$JS.S")
|
||||
}
|
||||
|
||||
si.msgsC = make(chan *imr, sourceMaxAckPending)
|
||||
sub, err := mset.subscribeInternal(syncSubject("$JS.S"), func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
mset.mu.RLock()
|
||||
defer mset.mu.RUnlock()
|
||||
// Ignore anything not current.
|
||||
if sub != si.sub || si.msgsC == nil {
|
||||
if si.msgsC == nil || !bytes.Equal(sub.subject, si.sub.subject) {
|
||||
return
|
||||
}
|
||||
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
|
||||
@@ -1314,7 +1355,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
},
|
||||
}
|
||||
// If starting, check any configs.
|
||||
ssi := mset.streamSource(sname)
|
||||
if seq <= 1 {
|
||||
if ssi.OptStartSeq > 0 {
|
||||
req.Config.OptStartSeq = ssi.OptStartSeq
|
||||
@@ -1347,6 +1387,11 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
|
||||
b, _ := json.Marshal(req)
|
||||
subject = fmt.Sprintf(JSApiDurableCreateT, sname, durable)
|
||||
if ext != nil {
|
||||
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
|
||||
subject = strings.ReplaceAll(subject, "..", ".")
|
||||
}
|
||||
|
||||
mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0}
|
||||
|
||||
go func() {
|
||||
|
||||
Reference in New Issue
Block a user