mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -2922,22 +2922,18 @@ func respondToUpdate(s *Server, respSubj string, acc string, message string, err
|
||||
}
|
||||
server := &ServerInfo{}
|
||||
response := map[string]interface{}{"server": server}
|
||||
m := map[string]interface{}{}
|
||||
if acc != "" {
|
||||
m["account"] = acc
|
||||
}
|
||||
if err == nil {
|
||||
response["data"] = map[string]interface{}{
|
||||
"code": http.StatusOK,
|
||||
"message": message,
|
||||
}
|
||||
if acc != "" {
|
||||
response["data"].(map[string]interface{})["account"] = acc
|
||||
}
|
||||
m["code"] = http.StatusOK
|
||||
m["message"] = message
|
||||
response["data"] = m
|
||||
} else {
|
||||
response["error"] = map[string]interface{}{
|
||||
"code": http.StatusInternalServerError,
|
||||
"description": fmt.Sprintf("%s - %v", message, err),
|
||||
}
|
||||
if acc != "" {
|
||||
response["error"].(map[string]interface{})["account"] = acc
|
||||
}
|
||||
m["code"] = http.StatusInternalServerError
|
||||
m["description"] = fmt.Sprintf("%s - %v", message, err)
|
||||
response["error"] = m
|
||||
}
|
||||
s.sendInternalMsgLocked(respSubj, _EMPTY_, server, response)
|
||||
}
|
||||
@@ -3035,8 +3031,8 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up update handling: %v", err)
|
||||
}
|
||||
// respond to lookups with our version
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(accLookupReqSubj, "*"), func(_ *subscription, _ *client, subj, reply string, msg []byte) {
|
||||
// respond to lookups with our version
|
||||
if reply == "" {
|
||||
return
|
||||
}
|
||||
@@ -3051,41 +3047,42 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
}
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up lookup request handling: %v", err)
|
||||
} else if _, err = s.sysSubscribeQ(accPackReqSubj, "responder",
|
||||
// respond to pack requests with one or more pack messages
|
||||
// an empty message signifies the end of the response responder
|
||||
func(_ *subscription, _ *client, _, reply string, theirHash []byte) {
|
||||
if reply == "" {
|
||||
return
|
||||
}
|
||||
ourHash := dr.DirJWTStore.Hash()
|
||||
if bytes.Equal(theirHash, ourHash[:]) {
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte{})
|
||||
s.Debugf("pack request matches hash %x", ourHash[:])
|
||||
} else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) {
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte(partialPackMsg))
|
||||
}); err != nil {
|
||||
// let them timeout
|
||||
s.Errorf("pack request error: %v", err)
|
||||
} else {
|
||||
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte{})
|
||||
}
|
||||
}
|
||||
// respond to pack requests with one or more pack messages
|
||||
// an empty message signifies the end of the response responder
|
||||
if _, err := s.sysSubscribeQ(accPackReqSubj, "responder", func(_ *subscription, _ *client, _, reply string, theirHash []byte) {
|
||||
if reply == "" {
|
||||
return
|
||||
}
|
||||
ourHash := dr.DirJWTStore.Hash()
|
||||
if bytes.Equal(theirHash, ourHash[:]) {
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte{})
|
||||
s.Debugf("pack request matches hash %x", ourHash[:])
|
||||
} else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) {
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte(partialPackMsg))
|
||||
}); err != nil {
|
||||
// let them timeout
|
||||
s.Errorf("pack request error: %v", err)
|
||||
} else {
|
||||
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte{})
|
||||
}
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up pack request handling: %v", err)
|
||||
} else if _, err = s.sysSubscribe(accListReqSubj,
|
||||
// respond to list requests with one message containing all account ids
|
||||
func(_ *subscription, _ *client, _, reply string, _ []byte) {
|
||||
handleListRequest(dr.DirJWTStore, s, reply)
|
||||
}); err != nil {
|
||||
}
|
||||
// respond to list requests with one message containing all account ids
|
||||
if _, err := s.sysSubscribe(accListReqSubj, func(_ *subscription, _ *client, _, reply string, _ []byte) {
|
||||
handleListRequest(dr.DirJWTStore, s, reply)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up list request handling: %v", err)
|
||||
} else if _, err := s.sysSubscribe(accDeleteReqSubj,
|
||||
func(_ *subscription, _ *client, _, reply string, msg []byte) {
|
||||
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
|
||||
}); err != nil {
|
||||
}
|
||||
if _, err := s.sysSubscribe(accDeleteReqSubj, func(_ *subscription, _ *client, _, reply string, msg []byte) {
|
||||
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up delete request handling: %v", err)
|
||||
} else if _, err = s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _, _ string, msg []byte) {
|
||||
// embed pack responses into store
|
||||
}
|
||||
// embed pack responses into store
|
||||
if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _, _ string, msg []byte) {
|
||||
hash := dr.DirJWTStore.Hash()
|
||||
if len(msg) == 0 { // end of response stream
|
||||
s.Debugf("Merging Finished and resulting in: %x", dr.DirJWTStore.Hash())
|
||||
@@ -3270,17 +3267,15 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up update handling: %v", err)
|
||||
}
|
||||
if _, err := s.sysSubscribe(accListReqSubj,
|
||||
// respond to list requests with one message containing all account ids
|
||||
func(_ *subscription, _ *client, _, reply string, _ []byte) {
|
||||
handleListRequest(dr.DirJWTStore, s, reply)
|
||||
}); err != nil {
|
||||
// respond to list requests with one message containing all account ids
|
||||
if _, err := s.sysSubscribe(accListReqSubj, func(_ *subscription, _ *client, _, reply string, _ []byte) {
|
||||
handleListRequest(dr.DirJWTStore, s, reply)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up list request handling: %v", err)
|
||||
}
|
||||
if _, err := s.sysSubscribe(accDeleteReqSubj,
|
||||
func(_ *subscription, _ *client, _, reply string, msg []byte) {
|
||||
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
|
||||
}); err != nil {
|
||||
if _, err := s.sysSubscribe(accDeleteReqSubj, func(_ *subscription, _ *client, _, reply string, msg []byte) {
|
||||
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error setting up list request handling: %v", err)
|
||||
}
|
||||
s.Noticef("Managing some jwt in exclusive directory %s", dr.directory)
|
||||
|
||||
@@ -431,9 +431,8 @@ func (store *DirJWTStore) delete(publicKey string) error {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
} else {
|
||||
store.expiration.unTrack(publicKey)
|
||||
}
|
||||
store.expiration.unTrack(publicKey)
|
||||
// TODO do cb
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -875,9 +875,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
|
||||
*errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"})
|
||||
return
|
||||
}
|
||||
if info, err := os.Stat(dir); err != nil {
|
||||
|
||||
} else if !info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0 {
|
||||
if info, err := os.Stat(dir); err != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) {
|
||||
*errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"})
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user