mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Incorporating comments and renaming claims pack as well
Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -2936,9 +2936,6 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
s.Errorf("update resulted in error %v", err)
|
||||
}
|
||||
}
|
||||
const accountPackRequest = "$SYS.ACCOUNT.CLAIMS.PACK"
|
||||
const accountLookupRequest = "$SYS.REQ.ACCOUNT.*.CLAIMS.LOOKUP"
|
||||
const accountLookupTokens = 6
|
||||
packRespIb := s.newRespInbox()
|
||||
// subscribe to account jwt update requests
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(accUpdateEventSubj, "*"), func(_ *subscription, _ *client, subj, resp string, msg []byte) {
|
||||
@@ -2959,13 +2956,13 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
}
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up update handling: %v", err)
|
||||
} else if _, err := s.sysSubscribe(accountLookupRequest, func(_ *subscription, _ *client, subj, reply string, msg []byte) {
|
||||
} else if _, err := s.sysSubscribe(fmt.Sprintf(accLookupReqSubj, "*"), func(_ *subscription, _ *client, subj, reply string, msg []byte) {
|
||||
// respond to lookups with our version
|
||||
if reply == "" {
|
||||
return
|
||||
}
|
||||
tk := strings.Split(subj, tsep)
|
||||
if len(tk) != accountLookupTokens {
|
||||
if len(tk) != accLookupReqTokens {
|
||||
return
|
||||
}
|
||||
if theJWT, err := dr.DirJWTStore.LoadAcc(tk[accReqAccIndex]); err != nil {
|
||||
@@ -2975,7 +2972,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
}
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up lookup request handling: %v", err)
|
||||
} else if _, err = s.sysSubscribeQ(accountPackRequest, "responder",
|
||||
} else if _, err = s.sysSubscribeQ(accPackReqSubj, "responder",
|
||||
// respond to pack requests with one or more pack messages
|
||||
// an empty message signifies the end of the response responder
|
||||
func(_ *subscription, _ *client, _, reply string, theirHash []byte) {
|
||||
@@ -3025,7 +3022,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
}
|
||||
ourHash := dr.DirJWTStore.Hash()
|
||||
s.Debugf("Checking store state: %x", ourHash)
|
||||
s.sendInternalMsgLocked(accountPackRequest, packRespIb, nil, ourHash[:])
|
||||
s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:])
|
||||
}
|
||||
})
|
||||
s.Noticef("Managing all jwt in exclusive directory %s", dr.directory)
|
||||
@@ -3071,7 +3068,7 @@ func (dr *CacheDirAccResolver) Fetch(name string) (string, error) {
|
||||
return "", ErrNoAccountResolver
|
||||
}
|
||||
respC := make(chan []byte, 1)
|
||||
accountLookupRequest := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CLAIMS.LOOKUP", name)
|
||||
accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name)
|
||||
s.mu.Lock()
|
||||
replySubj := s.newRespInbox()
|
||||
if s.sys == nil || s.sys.replies == nil {
|
||||
|
||||
@@ -32,6 +32,10 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
accLookupReqTokens = 6
|
||||
accLookupReqSubj = "$SYS.REQ.ACCOUNT.%s.CLAIMS.LOOKUP"
|
||||
accPackReqSubj = "$SYS.REQ.ACCOUNT.CLAIMS.PACK"
|
||||
|
||||
connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
|
||||
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
|
||||
accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS"
|
||||
@@ -41,8 +45,9 @@ const (
|
||||
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
|
||||
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
|
||||
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
|
||||
serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ"
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING"
|
||||
serverDirectReqSubj = "$SYS.REQ.SERVER.%s.%s"
|
||||
serverPingReqSubj = "$SYS.REQ.SERVER.PING.%s"
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" // use $SYS.REQ.SERVER.PING.STATSZ instead
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT"
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
inboxRespSubj = "$SYS._INBOX.%s.%s"
|
||||
@@ -630,13 +635,12 @@ func (s *Server) initEventTracking() {
|
||||
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) })
|
||||
},
|
||||
}
|
||||
|
||||
for name, req := range monSrvc {
|
||||
subject = fmt.Sprintf("$SYS.REQ.SERVER.%s.%s", s.info.ID, name)
|
||||
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
|
||||
if _, err := s.sysSubscribe(subject, req); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
subject = fmt.Sprintf("$SYS.REQ.SERVER.PING.%s", name)
|
||||
subject = fmt.Sprintf(serverPingReqSubj, name)
|
||||
if _, err := s.sysSubscribe(subject, req); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
|
||||
@@ -1380,6 +1380,7 @@ func TestSystemAccountWithGateways(t *testing.T) {
|
||||
}
|
||||
}
|
||||
func TestServerEventsStatsZ(t *testing.T) {
|
||||
serverStatsReqSubj := "$SYS.REQ.SERVER.%s.STATSZ"
|
||||
preStart := time.Now()
|
||||
// Add little bit of delay to make sure that time check
|
||||
// between pre-start and actual start does not fail.
|
||||
|
||||
Reference in New Issue
Block a user