Merge pull request #1733 from nats-io/jsimport

Allow complete $JS.API to be imported from another account.
This commit is contained in:
Derek Collison
2020-11-23 07:35:24 -08:00
committed by GitHub
4 changed files with 110 additions and 11 deletions

View File

@@ -131,6 +131,7 @@ type serviceImport struct {
share bool
tracking bool
didDeliver bool
isSysAcc bool
trackingHdr http.Header // header from request
}
@@ -1606,8 +1607,19 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
rt = se.respType
lat = se.latency
}
s := dest.srv
dest.mu.RUnlock()
// Track if this maps us to the system account.
var isSysAcc bool
if s != nil {
s.mu.Lock()
if s.sys != nil && dest == s.sys.account {
isSysAcc = true
}
s.mu.Unlock()
}
a.mu.Lock()
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
@@ -1640,8 +1652,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
}
}
}
si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, nil}
si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, isSysAcc, nil}
a.imports.services[from] = si
a.mu.Unlock()
@@ -2056,7 +2067,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp
// dest is the requestor's account. a is the service responder with the export.
// Marked as internal here, that is how we distinguish.
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, nil}
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, false, nil}
if a.exports.responses == nil {
a.exports.responses = make(map[string]*serviceImport)

View File

@@ -3534,6 +3534,13 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// so we only lock once.
to, _ = si.acc.selectMappedSubject(to)
oreply, oacc := c.pa.reply, c.acc
c.pa.reply = nrr
if !si.isSysAcc {
c.mu.Lock()
c.acc = si.acc
c.mu.Unlock()
}
// FIXME(dlc) - Do L1 cache trick like normal client?
rr := si.acc.sl.Match(to)
@@ -3568,6 +3575,13 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// Put what was there back now.
c.in.rts = orts
c.pa.reply = oreply
if !si.isSysAcc {
c.mu.Lock()
c.acc = oacc
c.mu.Unlock()
}
// Determine if we should remove this service import. This is for response service imports.
// We will remove if we did not deliver, or if we are a response service import and we are

View File

@@ -322,6 +322,15 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
}
}
// Grab the client, account and server reference.
c := mset.client
if c == nil {
return nil, fmt.Errorf("stream not valid")
}
c.mu.Lock()
s, a := c.srv, c.acc
c.mu.Unlock()
// Hold mset lock here.
mset.mu.Lock()
@@ -387,6 +396,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
// Set name, which will be durable name if set, otherwise we create one at random.
o := &Consumer{
mset: mset,
acc: a,
config: *config,
dsubj: config.DeliverSubject,
active: true,
@@ -459,14 +469,6 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
o.selectStartingSeqNo()
// Now register with mset and create the ack subscription.
c := mset.client
if c == nil {
mset.mu.Unlock()
return nil, fmt.Errorf("stream not valid")
}
s, a := c.srv, c.acc
o.acc = a
// Check if we already have this one registered.
if eo, ok := mset.consumers[o.name]; ok {
mset.mu.Unlock()

View File

@@ -9944,3 +9944,75 @@ func TestJetStreamAccountImportBasics(t *testing.T) {
t.Fatalf("Did not receive the ack properly")
}
}
// This is for importing all of JetStream into another account for admin purposes.
func TestJetStreamAccountImportAll(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
no_auth_user: rip
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
accounts: {
JS: {
jetstream: enabled
users: [ {user: dlc, password: foo} ]
exports [ { service: "$JS.API.>" } ]
},
IU: {
users: [ {user: rip, password: bar} ]
imports [ { service: { subject: "$JS.API.>", account: JS } , to: "jsapi.>" } ]
},
}
`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
acc, err := s.LookupAccount("JS")
if err != nil {
t.Fatalf("Unexpected error looking up account: %v", err)
}
mset, err := acc.AddStream(&server.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
// This should be the rip user, the one that imports all of JS.
nc := clientConnectToServer(t, s)
defer nc.Close()
mapSubj := func(subject string) string {
return strings.Replace(subject, "$JS.API.", "jsapi.", 1)
}
// This will get the current information about usage and limits for this account.
resp, err := nc.Request(mapSubj(server.JSApiAccountInfo), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var info server.JSApiAccountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if info.Error != nil {
t.Fatalf("Unexpected error: %+v", info.Error)
}
// Lookup streams.
resp, err = nc.Request(mapSubj(server.JSApiStreams), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var namesResponse server.JSApiStreamNamesResponse
if err = json.Unmarshal(resp.Data, &namesResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if namesResponse.Error != nil {
t.Fatalf("Unexpected error: %+v", namesResponse.Error)
}
}