mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 03:00:42 -07:00
Merge pull request #1090 from nats-io/streams
Add ability for cross account import services to return streams
This commit is contained in:
@@ -53,12 +53,14 @@ type Account struct {
|
||||
lqws map[string]int32
|
||||
usersRevoked map[string]int64
|
||||
actsRevoked map[string]int64
|
||||
respMap map[string][]*serviceRespEntry
|
||||
lleafs []*client
|
||||
imports importMap
|
||||
exports exportMap
|
||||
limits
|
||||
nae int32
|
||||
pruning bool
|
||||
rmPruning bool
|
||||
expired bool
|
||||
signingKeys []string
|
||||
srv *Server // server this account is registered with (possibly nil)
|
||||
@@ -71,6 +73,7 @@ type limits struct {
|
||||
mconns int32
|
||||
mleafs int32
|
||||
maxnae int32
|
||||
maxnrm int32
|
||||
maxaettl time.Duration
|
||||
}
|
||||
|
||||
@@ -91,13 +94,47 @@ type streamImport struct {
|
||||
|
||||
// Import service mapping struct
|
||||
type serviceImport struct {
|
||||
acc *Account
|
||||
from string
|
||||
to string
|
||||
ae bool
|
||||
ts int64
|
||||
claim *jwt.Import
|
||||
invalid bool
|
||||
acc *Account
|
||||
claim *jwt.Import
|
||||
from string
|
||||
to string
|
||||
rt ServiceRespType
|
||||
ts int64
|
||||
ae bool
|
||||
internal bool
|
||||
invalid bool
|
||||
}
|
||||
|
||||
// This is used to record when we create a mapping for implicit service
|
||||
// imports. We use this to clean up entries that are not singeltons when
|
||||
// we detect that interest is no longer present. The key to the map will
|
||||
// be the actual interest. We record the mapped subject and the serviceImport
|
||||
type serviceRespEntry struct {
|
||||
acc *Account
|
||||
msub string
|
||||
}
|
||||
|
||||
// ServiceRespType represents the types of service request response types.
|
||||
type ServiceRespType uint8
|
||||
|
||||
// Service response types. Defaults to a singelton.
|
||||
const (
|
||||
Singelton ServiceRespType = iota
|
||||
Stream
|
||||
Chunked
|
||||
)
|
||||
|
||||
// String helper.
|
||||
func (rt ServiceRespType) String() string {
|
||||
switch rt {
|
||||
case Singelton:
|
||||
return "Singelton"
|
||||
case Stream:
|
||||
return "Stream"
|
||||
case Chunked:
|
||||
return "Chunked"
|
||||
}
|
||||
return "Unknown ServiceResType"
|
||||
}
|
||||
|
||||
// exportAuth holds configured approvals or boolean indicating an
|
||||
@@ -105,6 +142,7 @@ type serviceImport struct {
|
||||
type exportAuth struct {
|
||||
tokenReq bool
|
||||
approved map[string]*Account
|
||||
respType ServiceRespType
|
||||
}
|
||||
|
||||
// importMap tracks the imported streams and services.
|
||||
@@ -123,7 +161,7 @@ type exportMap struct {
|
||||
func NewAccount(name string) *Account {
|
||||
a := &Account{
|
||||
Name: name,
|
||||
limits: limits{-1, -1, -1, -1, 0, 0},
|
||||
limits: limits{-1, -1, -1, -1, 0, 0, 0},
|
||||
}
|
||||
return a
|
||||
}
|
||||
@@ -322,6 +360,11 @@ func (a *Account) randomClient() *client {
|
||||
|
||||
// AddServiceExport will configure the account with the defined export.
|
||||
func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
|
||||
return a.AddServiceExportWithResponse(subject, Singelton, accounts)
|
||||
}
|
||||
|
||||
// AddServiceExportWithresponse will configure the account with the defined export and response type.
|
||||
func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceRespType, accounts []*Account) error {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a == nil {
|
||||
@@ -330,7 +373,16 @@ func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
|
||||
if a.exports.services == nil {
|
||||
a.exports.services = make(map[string]*exportAuth)
|
||||
}
|
||||
|
||||
ea := a.exports.services[subject]
|
||||
|
||||
if respType != Singelton {
|
||||
if ea == nil {
|
||||
ea = &exportAuth{}
|
||||
}
|
||||
ea.respType = respType
|
||||
}
|
||||
|
||||
if accounts != nil {
|
||||
if ea == nil {
|
||||
ea = &exportAuth{}
|
||||
@@ -375,7 +427,8 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin
|
||||
return ErrServiceImportAuthorization
|
||||
}
|
||||
|
||||
return a.addImplicitServiceImport(destination, from, to, false, imClaim)
|
||||
a.addServiceImport(destination, from, to, imClaim)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddServiceImport will add a route to an account to send published messages / requests
|
||||
@@ -400,6 +453,57 @@ func (a *Account) removeServiceImport(subject string) {
|
||||
}
|
||||
}
|
||||
|
||||
// This tracks responses to service requests mappings. This is used for cleanup.
|
||||
func (a *Account) addRespMapEntry(acc *Account, reply, from string) {
|
||||
a.mu.Lock()
|
||||
if a.respMap == nil {
|
||||
a.respMap = make(map[string][]*serviceRespEntry)
|
||||
}
|
||||
sre := &serviceRespEntry{acc, from}
|
||||
sra := a.respMap[reply]
|
||||
a.respMap[reply] = append(sra, sre)
|
||||
if len(a.respMap) > int(a.maxnrm) && !a.rmPruning {
|
||||
a.rmPruning = true
|
||||
go a.pruneNonAutoExpireResponseMaps()
|
||||
}
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
// This checks for any response map entries.
|
||||
func (a *Account) checkForRespEntry(reply string) {
|
||||
a.mu.RLock()
|
||||
if len(a.imports.services) == 0 || len(a.respMap) == 0 {
|
||||
a.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
sra := a.respMap[reply]
|
||||
if sra == nil {
|
||||
a.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
// If we are here we have an entry we should check. We will first check
|
||||
// if there is any interest for this subject for the entire account. If
|
||||
// there is we can not delete any entries yet.
|
||||
rr := a.sl.Match(reply)
|
||||
a.mu.RUnlock()
|
||||
|
||||
// No interest.
|
||||
if len(rr.psubs)+len(rr.qsubs) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Delete all the entries here.
|
||||
a.mu.Lock()
|
||||
delete(a.respMap, reply)
|
||||
a.mu.Unlock()
|
||||
|
||||
// If we are here we no longer have interest and we have a respMap entry
|
||||
// that we should clean up.
|
||||
for _, sre := range sra {
|
||||
sre.acc.removeServiceImport(sre.msub)
|
||||
}
|
||||
}
|
||||
|
||||
// Return the number of AutoExpireResponseMaps for request/reply. These are mapped to the account that
|
||||
// has the service import.
|
||||
func (a *Account) numAutoExpireResponseMaps() int {
|
||||
@@ -440,8 +544,8 @@ func (a *Account) SetAutoExpireTTL(ttl time.Duration) {
|
||||
// Return a list of the current autoExpireResponseMaps.
|
||||
func (a *Account) autoExpireResponseMaps() []*serviceImport {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
if len(a.imports.services) == 0 {
|
||||
a.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
aesis := make([]*serviceImport, 0, len(a.imports.services))
|
||||
@@ -453,20 +557,58 @@ func (a *Account) autoExpireResponseMaps() []*serviceImport {
|
||||
sort.Slice(aesis, func(i, j int) bool {
|
||||
return aesis[i].ts < aesis[j].ts
|
||||
})
|
||||
|
||||
a.mu.RUnlock()
|
||||
return aesis
|
||||
}
|
||||
|
||||
// MaxResponseMaps return the maximum number of
|
||||
// non auto-expire response maps we will allow.
|
||||
func (a *Account) MaxResponseMaps() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return int(a.maxnrm)
|
||||
}
|
||||
|
||||
// SetMaxResponseMaps sets the max outstanding non auto-expire response maps.
|
||||
func (a *Account) SetMaxResponseMaps(max int) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
a.maxnrm = int32(max)
|
||||
}
|
||||
|
||||
// Add a route to connect from an implicit route created for a response to a request.
|
||||
// This does no checks and should be only called by the msg processing code. Use
|
||||
// addServiceImport from above if responding to user input or config changes, etc.
|
||||
func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool, claim *jwt.Import) error {
|
||||
// AddServiceImport from above if responding to user input or config changes, etc.
|
||||
func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Import) *serviceImport {
|
||||
rt := Singelton
|
||||
dest.mu.Lock()
|
||||
if ae := dest.exports.services[to]; ae != nil {
|
||||
rt = ae.respType
|
||||
}
|
||||
dest.mu.Unlock()
|
||||
|
||||
a.mu.Lock()
|
||||
if a.imports.services == nil {
|
||||
a.imports.services = make(map[string]*serviceImport)
|
||||
}
|
||||
si := &serviceImport{destination, from, to, autoexpire, 0, claim, false}
|
||||
si := &serviceImport{dest, claim, from, to, rt, 0, false, false, false}
|
||||
a.imports.services[from] = si
|
||||
if autoexpire {
|
||||
a.mu.Unlock()
|
||||
|
||||
return si
|
||||
}
|
||||
|
||||
// This is for internal responses.
|
||||
func (a *Account) addResponseServiceImport(dest *Account, from, to string, rt ServiceRespType) *serviceImport {
|
||||
a.mu.Lock()
|
||||
if a.imports.services == nil {
|
||||
a.imports.services = make(map[string]*serviceImport)
|
||||
}
|
||||
ae := rt == Singelton
|
||||
si := &serviceImport{dest, nil, from, to, rt, 0, ae, true, false}
|
||||
a.imports.services[from] = si
|
||||
if ae {
|
||||
a.nae++
|
||||
si.ts = time.Now().Unix()
|
||||
if a.nae > a.maxnae && !a.pruning {
|
||||
@@ -475,7 +617,26 @@ func (a *Account) addImplicitServiceImport(destination *Account, from, to string
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
return nil
|
||||
return si
|
||||
}
|
||||
|
||||
// This will prune off the non auto-expire (non singelton) response maps.
|
||||
func (a *Account) pruneNonAutoExpireResponseMaps() {
|
||||
var sres []*serviceRespEntry
|
||||
a.mu.Lock()
|
||||
for subj, sra := range a.respMap {
|
||||
rr := a.sl.Match(subj)
|
||||
if len(rr.psubs)+len(rr.qsubs) == 0 {
|
||||
delete(a.respMap, subj)
|
||||
sres = append(sres, sra...)
|
||||
}
|
||||
}
|
||||
a.rmPruning = false
|
||||
a.mu.Unlock()
|
||||
|
||||
for _, sre := range sres {
|
||||
sre.acc.removeServiceImport(sre.msub)
|
||||
}
|
||||
}
|
||||
|
||||
// This will prune the list to below the threshold and remove all ttl'd maps.
|
||||
@@ -604,8 +765,8 @@ func (a *Account) checkExportApproved(account *Account, subject string, imClaim
|
||||
// Check direct match of subject first
|
||||
ea, ok := m[subject]
|
||||
if ok {
|
||||
// if ea is nil that denotes a public export
|
||||
if ea == nil {
|
||||
// if ea is nil or eq.approved is nil, that denotes a public export
|
||||
if ea == nil || (ea.approved == nil && !ea.tokenReq) {
|
||||
return true
|
||||
}
|
||||
// Check if token required
|
||||
|
||||
@@ -421,13 +421,35 @@ func TestAccountParseConfigImportsExports(t *testing.T) {
|
||||
if lis := len(natsAcc.imports.services); lis != 1 {
|
||||
t.Fatalf("Expected 1 imported service, got %d\n", lis)
|
||||
}
|
||||
if les := len(natsAcc.exports.services); les != 1 {
|
||||
t.Fatalf("Expected 1 exported service, got %d\n", les)
|
||||
if les := len(natsAcc.exports.services); les != 4 {
|
||||
t.Fatalf("Expected 4 exported services, got %d\n", les)
|
||||
}
|
||||
if les := len(natsAcc.exports.streams); les != 0 {
|
||||
t.Fatalf("Expected no exported streams, got %d\n", les)
|
||||
}
|
||||
|
||||
ea := natsAcc.exports.services["nats.time"]
|
||||
if ea == nil {
|
||||
t.Fatalf("Expected to get a non-nil exportAuth for service")
|
||||
}
|
||||
if ea.respType != Stream {
|
||||
t.Fatalf("Expected to get a Stream response type, got %q", ea.respType)
|
||||
}
|
||||
ea = natsAcc.exports.services["nats.photo"]
|
||||
if ea == nil {
|
||||
t.Fatalf("Expected to get a non-nil exportAuth for service")
|
||||
}
|
||||
if ea.respType != Chunked {
|
||||
t.Fatalf("Expected to get a Chunked response type, got %q", ea.respType)
|
||||
}
|
||||
ea = natsAcc.exports.services["nats.add"]
|
||||
if ea == nil {
|
||||
t.Fatalf("Expected to get a non-nil exportAuth for service")
|
||||
}
|
||||
if ea.respType != Singelton {
|
||||
t.Fatalf("Expected to get a Singelton response type, got %q", ea.respType)
|
||||
}
|
||||
|
||||
if synAcc == nil {
|
||||
t.Fatalf("Error retrieving account for 'synadia'")
|
||||
}
|
||||
@@ -673,6 +695,46 @@ func TestSimpleMapping(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestShadowSubsCleanupOnClientClose(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
// Now map the subject space between foo and bar.
|
||||
// Need to do export first.
|
||||
if err := fooAcc.AddStreamExport("foo", nil); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
}
|
||||
if err := barAcc.AddStreamImport(fooAcc, "foo", "import"); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
}
|
||||
|
||||
cbar, _, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
// Normal and Queue Subscription on bar client.
|
||||
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
|
||||
if fslc := fooAcc.sl.Count(); fslc != 2 {
|
||||
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
|
||||
}
|
||||
|
||||
// Now close cbar and make sure we remove shadows.
|
||||
cbar.closeConnection(ClientClosed)
|
||||
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if fslc := fooAcc.sl.Count(); fslc != 0 {
|
||||
return fmt.Errorf("Number of shadow subscriptions is %d", fslc)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestNoPrefixWildcardMapping(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
@@ -1142,7 +1204,7 @@ func TestCrossAccountRequestReply(t *testing.T) {
|
||||
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
|
||||
}
|
||||
|
||||
// Now add in the Route for request to be routed to the foo account.
|
||||
// Now add in the route mapping for request to be routed to the foo account.
|
||||
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
|
||||
t.Fatalf("Error adding account service import to client bar: %v", err)
|
||||
}
|
||||
@@ -1265,6 +1327,209 @@ func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestCrossAccountServiceResponseTypes(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
cfoo, crFoo, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
// Add in the service export for the requests. Make it public.
|
||||
if err := cfoo.acc.AddServiceExportWithResponse("test.request", Stream, nil); err != nil {
|
||||
t.Fatalf("Error adding account service export to client foo: %v", err)
|
||||
}
|
||||
// Now add in the route mapping for request to be routed to the foo account.
|
||||
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
|
||||
t.Fatalf("Error adding account service import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Now setup the resonder under cfoo
|
||||
cfoo.parse([]byte("SUB test.request 1\r\n"))
|
||||
|
||||
// Now send the request. Remember we expect the request on our local foo. We added the route
|
||||
// with that "from" and will map it to "test.request"
|
||||
go cbar.parseAndFlush([]byte("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n"))
|
||||
|
||||
// Now read the request from crFoo
|
||||
l, err := crFoo.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
|
||||
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches := mraw[0]
|
||||
reply := matches[REPLY_INDEX]
|
||||
if !strings.HasPrefix(reply, "_R_.") {
|
||||
t.Fatalf("Expected an _R_.* like reply, got '%s'", reply)
|
||||
}
|
||||
crFoo.ReadString('\n')
|
||||
|
||||
replyOp := []byte(fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX]))
|
||||
var mReply []byte
|
||||
for i := 0; i < 10; i++ {
|
||||
mReply = append(mReply, replyOp...)
|
||||
}
|
||||
|
||||
go cfoo.parseAndFlush(mReply)
|
||||
|
||||
var b [256]byte
|
||||
n, err := crBar.Read(b[:])
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading response: %v", err)
|
||||
}
|
||||
mraw = msgPat.FindAllStringSubmatch(string(b[:n]), -1)
|
||||
if len(mraw) != 10 {
|
||||
t.Fatalf("Expected a response but got %d", len(mraw))
|
||||
}
|
||||
|
||||
// Also make sure the response map gets cleaned up when interest goes away.
|
||||
cbar.closeConnection(ClientClosed)
|
||||
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 0 {
|
||||
return fmt.Errorf("Number of implicit service imports is %d", nr)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Now test bogus reply subjects are handled and do not accumulate the response maps.
|
||||
|
||||
cbar, _, _ = newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
// Do not create any interest in the reply subject 'bar'. Just send a request.
|
||||
go cbar.parseAndFlush([]byte("PUB foo bar 4\r\nhelp\r\n"))
|
||||
|
||||
// Now read the request from crFoo
|
||||
l, err = crFoo.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
mraw = msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches = mraw[0]
|
||||
reply = matches[REPLY_INDEX]
|
||||
if !strings.HasPrefix(reply, "_R_.") {
|
||||
t.Fatalf("Expected an _R_.* like reply, got '%s'", reply)
|
||||
}
|
||||
crFoo.ReadString('\n')
|
||||
|
||||
replyOp = []byte(fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX]))
|
||||
|
||||
// Make sure we have the response map.
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 1 {
|
||||
t.Fatalf("Expected a response map to be present, got %d", nr)
|
||||
}
|
||||
|
||||
go cfoo.parseAndFlush(replyOp)
|
||||
|
||||
// Now wait for a bit, the reply should trip a no interest condition
|
||||
// which should clean this up.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 0 {
|
||||
return fmt.Errorf("Number of implicit service imports is %d", nr)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Also make sure the response map entry is gone as well.
|
||||
barAcc.mu.RLock()
|
||||
lrm := len(barAcc.respMap)
|
||||
barAcc.mu.RUnlock()
|
||||
|
||||
if lrm != 0 {
|
||||
t.Fatalf("Expected the respMap tp be cleared, got %d entries", lrm)
|
||||
}
|
||||
}
|
||||
|
||||
// This is for bogus reply subjects and no responses from a service provider.
|
||||
func TestCrossAccountServiceResponseLeaks(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
// Set max response maps to < 100
|
||||
barAcc.SetMaxResponseMaps(99)
|
||||
|
||||
cfoo, crFoo, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, _, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
// Add in the service export for the requests. Make it public.
|
||||
if err := cfoo.acc.AddServiceExportWithResponse("test.request", Stream, nil); err != nil {
|
||||
t.Fatalf("Error adding account service export to client foo: %v", err)
|
||||
}
|
||||
// Now add in the route mapping for request to be routed to the foo account.
|
||||
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
|
||||
t.Fatalf("Error adding account service import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Now setup the resonder under cfoo
|
||||
cfoo.parse([]byte("SUB test.request 1\r\n"))
|
||||
|
||||
// Now send some requests..We will not respond.
|
||||
var sb strings.Builder
|
||||
for i := 0; i < 100; i++ {
|
||||
sb.WriteString(fmt.Sprintf("PUB foo REPLY.%d 4\r\nhelp\r\n", i))
|
||||
}
|
||||
go cbar.parseAndFlush([]byte(sb.String()))
|
||||
|
||||
// Make sure requests are processed.
|
||||
_, err := crFoo.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
|
||||
// We should have leaked response maps.
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 100 {
|
||||
t.Fatalf("Expected response maps to be present, got %d", nr)
|
||||
}
|
||||
|
||||
// They should be gone here eventually.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 0 {
|
||||
return fmt.Errorf("Number of implicit service imports is %d", nr)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Also make sure the response map entry is gone as well.
|
||||
barAcc.mu.RLock()
|
||||
lrm := len(barAcc.respMap)
|
||||
barAcc.mu.RUnlock()
|
||||
|
||||
if lrm != 0 {
|
||||
t.Fatalf("Expected the respMap tp be cleared, got %d entries", lrm)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountMapsUsers(t *testing.T) {
|
||||
// Used for the nkey users to properly sign.
|
||||
seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM"
|
||||
|
||||
@@ -341,7 +341,7 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
|
||||
// FIXME(dlc) - This is getting bloated for normal subs, need
|
||||
// to optionally have an opts section for non-normal stuff.
|
||||
type subscription struct {
|
||||
nm int64 // Will atomicall be set to -1 on unsub or connection close
|
||||
nm int64 // Will atomically be set to -1 on unsub or connection close
|
||||
client *client
|
||||
im *streamImport // This is for import stream support.
|
||||
shadow []*subscription // This is to track shadowed accounts.
|
||||
@@ -1932,7 +1932,7 @@ func (c *client) canSubscribe(subject string) bool {
|
||||
}
|
||||
|
||||
// Low level unsubscribe for a given client.
|
||||
func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
|
||||
func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) {
|
||||
c.mu.Lock()
|
||||
if !force && sub.max > 0 && sub.nm < sub.max {
|
||||
c.Debugf(
|
||||
@@ -1943,13 +1943,17 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
|
||||
}
|
||||
c.traceOp("<-> %s", "DELSUB", sub.sid)
|
||||
|
||||
delete(c.subs, string(sub.sid))
|
||||
if c.kind != CLIENT && c.kind != SYSTEM {
|
||||
c.removeReplySubTimeout(sub)
|
||||
}
|
||||
|
||||
if acc != nil {
|
||||
acc.sl.Remove(sub)
|
||||
// Remove accounting if requested. This will be false when we close a connection
|
||||
// with open subscriptions.
|
||||
if remove {
|
||||
delete(c.subs, string(sub.sid))
|
||||
if acc != nil {
|
||||
acc.sl.Remove(sub)
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if we have shadow subscriptions.
|
||||
@@ -1962,6 +1966,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
|
||||
sub.close()
|
||||
c.mu.Unlock()
|
||||
|
||||
// Process shadow subs if we have them.
|
||||
for _, nsub := range shadowSubs {
|
||||
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
|
||||
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
|
||||
@@ -1971,6 +1976,11 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
|
||||
// Now check on leafnode updates.
|
||||
c.srv.updateLeafNodes(nsub.im.acc, nsub, -1)
|
||||
}
|
||||
|
||||
// Now check to see if this was part of a respMap entry for service imports.
|
||||
if acc != nil {
|
||||
acc.checkForRespEntry(string(sub.subject))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) processUnsub(arg []byte) error {
|
||||
@@ -2020,7 +2030,7 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
}
|
||||
|
||||
if unsub {
|
||||
c.unsubscribe(acc, sub, false)
|
||||
c.unsubscribe(acc, sub, false, true)
|
||||
if acc != nil && kind == CLIENT || kind == SYSTEM {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
if updateGWs {
|
||||
@@ -2127,11 +2137,11 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
if shouldForward {
|
||||
defer srv.updateRouteSubscriptionMap(client.acc, sub, -1)
|
||||
}
|
||||
defer client.unsubscribe(client.acc, sub, true)
|
||||
defer client.unsubscribe(client.acc, sub, true, true)
|
||||
} else if sub.nm > sub.max {
|
||||
client.Debugf("Auto-unsubscribe limit [%d] exceeded", sub.max)
|
||||
client.mu.Unlock()
|
||||
client.unsubscribe(client.acc, sub, true)
|
||||
client.unsubscribe(client.acc, sub, true, true)
|
||||
if shouldForward {
|
||||
srv.updateRouteSubscriptionMap(client.acc, sub, -1)
|
||||
}
|
||||
@@ -2463,45 +2473,57 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
}
|
||||
|
||||
acc.mu.RLock()
|
||||
rm := acc.imports.services[string(c.pa.subject)]
|
||||
invalid := rm != nil && rm.invalid
|
||||
si := acc.imports.services[string(c.pa.subject)]
|
||||
invalid := si != nil && si.invalid
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// Get the results from the other account for the mapped "to" subject.
|
||||
// If we have been marked invalid simply return here.
|
||||
if rm != nil && !invalid && rm.acc != nil && rm.acc.sl != nil {
|
||||
if si != nil && !invalid && si.acc != nil && si.acc.sl != nil {
|
||||
var nrr []byte
|
||||
if rm.ae {
|
||||
acc.removeServiceImport(rm.from)
|
||||
if si.ae {
|
||||
acc.removeServiceImport(si.from)
|
||||
}
|
||||
if c.pa.reply != nil {
|
||||
// We want to remap this to provide anonymity.
|
||||
nrr = c.newServiceReply()
|
||||
rm.acc.addImplicitServiceImport(acc, string(nrr), string(c.pa.reply), true, nil)
|
||||
// If this is a client connection and we are in
|
||||
// gateway mode, we need to send RS+ to local cluster
|
||||
// and possibly to inbound GW connections for
|
||||
// which we are in interest-only mode.
|
||||
si.acc.addResponseServiceImport(acc, string(nrr), string(c.pa.reply), si.rt)
|
||||
|
||||
// Track our responses for cleanup if not auto-expire.
|
||||
if si.rt != Singelton {
|
||||
acc.addRespMapEntry(si.acc, string(c.pa.reply), string(nrr))
|
||||
}
|
||||
|
||||
// If this is a client or leaf connection and we are in gateway mode,
|
||||
// we need to send RS+ to our local cluster and possibly to inbound
|
||||
// GW connections for which we are in interest-only mode.
|
||||
if c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == LEAF) {
|
||||
c.srv.gatewayHandleServiceImport(rm.acc, nrr, c, 1)
|
||||
c.srv.gatewayHandleServiceImport(si.acc, nrr, c, 1)
|
||||
}
|
||||
}
|
||||
// FIXME(dlc) - Do L1 cache trick from above.
|
||||
rr := rm.acc.sl.Match(rm.to)
|
||||
rr := si.acc.sl.Match(si.to)
|
||||
|
||||
// Check to see if we have no results and this is an internal serviceImport. If so we
|
||||
// need to clean that up.
|
||||
if len(rr.psubs)+len(rr.qsubs) == 0 && si.internal {
|
||||
// We may also have a response entry, so go through that way.
|
||||
si.acc.checkForRespEntry(si.to)
|
||||
}
|
||||
|
||||
// If we are a route or gateway or leafnode and this message is flipped to a queue subscriber we
|
||||
// need to handle that since the processMsgResults will want a queue filter.
|
||||
if (c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) && c.pa.queues == nil && len(rr.qsubs) > 0 {
|
||||
if len(rr.qsubs) > 0 && c.pa.queues == nil && (c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) {
|
||||
c.makeQFilter(rr.qsubs)
|
||||
}
|
||||
|
||||
// If this is not a gateway connection but gateway is enabled,
|
||||
// try to send this converted message to all gateways.
|
||||
if c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) {
|
||||
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, pmrCollectQueueNames)
|
||||
c.sendMsgToGateways(rm.acc, msg, []byte(rm.to), nrr, queues)
|
||||
queues := c.processMsgResults(si.acc, rr, msg, []byte(si.to), nrr, pmrCollectQueueNames)
|
||||
c.sendMsgToGateways(si.acc, msg, []byte(si.to), nrr, queues)
|
||||
} else {
|
||||
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, pmrNoFlag)
|
||||
c.processMsgResults(si.acc, rr, msg, []byte(si.to), nrr, pmrNoFlag)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2968,7 +2990,7 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
|
||||
|
||||
// Unsubscribe all that need to be removed and report back to client and logs.
|
||||
for _, sub := range removed {
|
||||
c.unsubscribe(acc, sub, true)
|
||||
c.unsubscribe(acc, sub, true, true)
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %q)",
|
||||
sub.subject, sub.sid))
|
||||
srv.Noticef("Removed sub %q (sid %q) for %s - not authorized",
|
||||
@@ -3067,6 +3089,9 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
if acc != nil && (kind == CLIENT || kind == LEAF) {
|
||||
qsubs := map[string]*qsub{}
|
||||
for _, sub := range subs {
|
||||
// Call unsubscribe here to cleanup shadow subscriptions and such.
|
||||
c.unsubscribe(acc, sub, true, false)
|
||||
// Update route as normal for a normal subscriber.
|
||||
if sub.queue == nil {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
} else {
|
||||
|
||||
@@ -66,6 +66,7 @@ func newClientForServer(s *Server) (*client, *bufio.Reader, string) {
|
||||
}
|
||||
|
||||
var defaultServerOptions = Options{
|
||||
Host: "127.0.0.1",
|
||||
Trace: false,
|
||||
Debug: false,
|
||||
NoLog: true,
|
||||
@@ -295,7 +296,7 @@ func TestClientPing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
var msgPat = regexp.MustCompile(`\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n`)
|
||||
var msgPat = regexp.MustCompile(`MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n`)
|
||||
|
||||
const (
|
||||
SUB_INDEX = 1
|
||||
|
||||
@@ -39,7 +39,10 @@ accounts: {
|
||||
]
|
||||
|
||||
exports = [
|
||||
{service: "nats.time"}
|
||||
{service: "nats.time", response: stream}
|
||||
{service: "nats.photo", response: chunked}
|
||||
{service: "nats.add", response: singelton, accounts: [cncf]}
|
||||
{service: "nats.sub"}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@@ -133,6 +133,10 @@ const (
|
||||
// DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS is for auto-expire response maps for imports.
|
||||
DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS = 100000
|
||||
|
||||
// DEFAULT_MAX_ACCOUNT_INTERNAL_RESPONSE_MAPS is for non auto-expire response maps for imports.
|
||||
// These are present for non-singelton response types.
|
||||
DEFAULT_MAX_ACCOUNT_INTERNAL_RESPONSE_MAPS = 100000
|
||||
|
||||
// DEFAULT_TTL_AE_RESPONSE_MAP is the default time to expire auto-response map entries.
|
||||
DEFAULT_TTL_AE_RESPONSE_MAP = 10 * time.Minute
|
||||
|
||||
|
||||
@@ -934,7 +934,7 @@ func (s *Server) sysUnsubscribe(sub *subscription) {
|
||||
c := s.sys.client
|
||||
delete(s.sys.subs, string(sub.sid))
|
||||
s.mu.Unlock()
|
||||
c.unsubscribe(acc, sub, true)
|
||||
c.unsubscribe(acc, sub, true, true)
|
||||
}
|
||||
|
||||
// Helper to grab name for a client.
|
||||
|
||||
@@ -1189,7 +1189,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
|
||||
c.mu.Unlock()
|
||||
|
||||
if ok {
|
||||
c.unsubscribe(acc, sub, true)
|
||||
c.unsubscribe(acc, sub, true, true)
|
||||
updateGWs = srv.gateway.enabled
|
||||
}
|
||||
|
||||
|
||||
@@ -103,18 +103,20 @@ type RemoteGatewayOpts struct {
|
||||
|
||||
// LeafNodeOpts are options for a given server to accept leaf node connections and/or connect to a remote cluster.
|
||||
type LeafNodeOpts struct {
|
||||
Host string `json:"addr,omitempty"`
|
||||
Port int `json:"port,omitempty"`
|
||||
Username string `json:"-"`
|
||||
Password string `json:"-"`
|
||||
AuthTimeout float64 `json:"auth_timeout,omitempty"`
|
||||
TLSConfig *tls.Config `json:"-"`
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"`
|
||||
TLSMap bool `json:"-"`
|
||||
Remotes []*RemoteLeafOpts `json:"remotes,omitempty"`
|
||||
Advertise string `json:"-"`
|
||||
NoAdvertise bool `json:"-"`
|
||||
ReconnectInterval time.Duration `json:"-"`
|
||||
Host string `json:"addr,omitempty"`
|
||||
Port int `json:"port,omitempty"`
|
||||
Username string `json:"-"`
|
||||
Password string `json:"-"`
|
||||
AuthTimeout float64 `json:"auth_timeout,omitempty"`
|
||||
TLSConfig *tls.Config `json:"-"`
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"`
|
||||
TLSMap bool `json:"-"`
|
||||
Advertise string `json:"-"`
|
||||
NoAdvertise bool `json:"-"`
|
||||
ReconnectInterval time.Duration `json:"-"`
|
||||
|
||||
// For solicited connections to other clusters/superclusters.
|
||||
Remotes []*RemoteLeafOpts `json:"remotes,omitempty"`
|
||||
|
||||
// Not exported, for tests.
|
||||
resolver netResolver
|
||||
@@ -1247,6 +1249,7 @@ type export struct {
|
||||
acc *Account
|
||||
sub string
|
||||
accs []string
|
||||
rt ServiceRespType
|
||||
}
|
||||
|
||||
type importStream struct {
|
||||
@@ -1439,7 +1442,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
|
||||
}
|
||||
accounts = append(accounts, ta)
|
||||
}
|
||||
if err := service.acc.AddServiceExport(service.sub, accounts); err != nil {
|
||||
if err := service.acc.AddServiceExportWithResponse(service.sub, service.rt, accounts); err != nil {
|
||||
msg := fmt.Sprintf("Error adding service export %q: %v", service.sub, err)
|
||||
*errors = append(*errors, &configErr{tk, msg})
|
||||
continue
|
||||
@@ -1576,6 +1579,8 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
curStream *export
|
||||
curService *export
|
||||
accounts []string
|
||||
rt ServiceRespType
|
||||
rtSeen bool
|
||||
)
|
||||
tk, v := unwrapValue(v)
|
||||
vv, ok := v.(map[string]interface{})
|
||||
@@ -1591,7 +1596,11 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if rtSeen {
|
||||
err := &configErr{tk, "Detected response directive on non-service"}
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
mvs, ok := mv.(string)
|
||||
if !ok {
|
||||
err := &configErr{tk, fmt.Sprintf("Expected stream name to be string, got %T", mv)}
|
||||
@@ -1602,6 +1611,33 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
if accounts != nil {
|
||||
curStream.accs = accounts
|
||||
}
|
||||
case "response", "response_type":
|
||||
rtSeen = true
|
||||
mvs, ok := mv.(string)
|
||||
if !ok {
|
||||
err := &configErr{tk, fmt.Sprintf("Expected response type to be string, got %T", mv)}
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
switch strings.ToLower(mvs) {
|
||||
case "single", "singelton":
|
||||
rt = Singelton
|
||||
case "stream":
|
||||
rt = Stream
|
||||
case "chunk", "chunked":
|
||||
rt = Chunked
|
||||
default:
|
||||
err := &configErr{tk, fmt.Sprintf("Unknown response type: %q", mvs)}
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
if curService != nil {
|
||||
curService.rt = rt
|
||||
}
|
||||
if curStream != nil {
|
||||
err := &configErr{tk, "Detected response directive on non-service"}
|
||||
*errors = append(*errors, err)
|
||||
}
|
||||
case "service":
|
||||
if curStream != nil {
|
||||
err := &configErr{tk, fmt.Sprintf("Detected service %q but already saw a stream", mv)}
|
||||
@@ -1618,6 +1654,9 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
if accounts != nil {
|
||||
curService.accs = accounts
|
||||
}
|
||||
if rtSeen {
|
||||
curService.rt = rt
|
||||
}
|
||||
case "accounts":
|
||||
for _, iv := range mv.([]interface{}) {
|
||||
_, mv := unwrapValue(iv)
|
||||
|
||||
@@ -884,6 +884,7 @@ func (s *Server) reloadAuthorization() {
|
||||
}
|
||||
newAcc.sl = acc.sl
|
||||
newAcc.rm = acc.rm
|
||||
newAcc.respMap = acc.respMap
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// Check if current and new config of this account are same
|
||||
|
||||
@@ -116,7 +116,7 @@ func (c *client) addReplySubTimeout(acc *Account, sub *subscription, d time.Dura
|
||||
delete(rs, sub)
|
||||
sub.max = 0
|
||||
c.mu.Unlock()
|
||||
c.unsubscribe(acc, sub, true)
|
||||
c.unsubscribe(acc, sub, true, true)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -773,6 +773,9 @@ func (s *Server) registerAccount(acc *Account) {
|
||||
if acc.maxaettl == 0 {
|
||||
acc.maxaettl = DEFAULT_TTL_AE_RESPONSE_MAP
|
||||
}
|
||||
if acc.maxnrm == 0 {
|
||||
acc.maxnrm = DEFAULT_MAX_ACCOUNT_INTERNAL_RESPONSE_MAPS
|
||||
}
|
||||
if acc.clients == nil {
|
||||
acc.clients = make(map[*client]*client)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user