mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Be more forgiving on empty api requests, force json request for delete msg api
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
@@ -192,6 +193,11 @@ type JSApiStreamUpdateResponse struct {
|
||||
*StreamInfo
|
||||
}
|
||||
|
||||
// JSApiMsgDeleteRequest delete message request.
|
||||
type JSApiMsgDeleteRequest struct {
|
||||
Seq uint64 `json:"seq"`
|
||||
}
|
||||
|
||||
// JSApiMsgDeleteResponse.
|
||||
type JSApiMsgDeleteResponse struct {
|
||||
Error *ApiError `json:"error,omitempty"`
|
||||
@@ -413,7 +419,7 @@ func (s *Server) jsTemplateInfoRequest(sub *subscription, c *client, subject, re
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -445,7 +451,7 @@ func (s *Server) jsTemplateDeleteRequest(sub *subscription, c *client, subject,
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -579,7 +585,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -595,6 +601,25 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
|
||||
}
|
||||
|
||||
func isEmptyRequest(req []byte) bool {
|
||||
if len(req) == 0 {
|
||||
return true
|
||||
}
|
||||
if bytes.Equal(req, []byte("{}")) {
|
||||
return true
|
||||
}
|
||||
// If we are here we didn't get our simple match, but still could be valid.
|
||||
var v interface{}
|
||||
if err := json.Unmarshal(req, &v); err != nil {
|
||||
return false
|
||||
}
|
||||
vm, ok := v.(map[string]interface{})
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return len(vm) == 0
|
||||
}
|
||||
|
||||
// Request to delete a stream.
|
||||
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
if c == nil || c.acc == nil {
|
||||
@@ -606,7 +631,7 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -644,6 +669,13 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
var req JSApiMsgDeleteRequest
|
||||
if err := json.Unmarshal(msg, &req); err != nil {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
stream := subjectToken(subject, 2)
|
||||
mset, err := c.acc.LookupStream(stream)
|
||||
if err != nil {
|
||||
@@ -651,14 +683,14 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
seq, _ := strconv.Atoi(string(msg))
|
||||
removed, err := mset.EraseMsg(uint64(seq))
|
||||
|
||||
removed, err := mset.EraseMsg(req.Seq)
|
||||
if err != nil {
|
||||
resp.Error = jsError(err)
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
} else if !removed {
|
||||
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("sequence [%d] not found", seq)}
|
||||
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("sequence [%d] not found", req.Seq)}
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
@@ -677,7 +709,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -803,7 +835,7 @@ func (s *Server) jsConsumersRequest(sub *subscription, c *client, subject, reply
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -833,7 +865,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -868,7 +900,7 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = jsBadRequestErr
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
|
||||
@@ -4419,13 +4419,18 @@ func TestJetStreamRequestAPI(t *testing.T) {
|
||||
checkBadRequest(ccResp.Error, "consumer name in subject does not match durable name in request")
|
||||
|
||||
// Now delete a msg.
|
||||
resp, _ = nc.Request(fmt.Sprintf(server.JetStreamDeleteMsgT, msetCfg.Name), []byte("2"), time.Second)
|
||||
dreq := server.JSApiMsgDeleteRequest{Seq: 2}
|
||||
dreqj, err := json.Marshal(dreq)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
resp, _ = nc.Request(fmt.Sprintf(server.JetStreamDeleteMsgT, msetCfg.Name), dreqj, time.Second)
|
||||
var delMsgResp server.JSApiMsgDeleteResponse
|
||||
if err = json.Unmarshal(resp.Data, &delMsgResp); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if !delMsgResp.Success || delMsgResp.Error != nil {
|
||||
t.Fatalf("Got a bad response %+v", ccResp)
|
||||
t.Fatalf("Got a bad response %+v", delMsgResp.Error)
|
||||
}
|
||||
|
||||
// Now purge the stream.
|
||||
@@ -4435,7 +4440,7 @@ func TestJetStreamRequestAPI(t *testing.T) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if !pResp.Success || pResp.Error != nil {
|
||||
t.Fatalf("Got a bad response %+v", ccResp)
|
||||
t.Fatalf("Got a bad response %+v", pResp)
|
||||
}
|
||||
if pResp.Purged != 9 {
|
||||
t.Fatalf("Expected 9 purged, got %d", pResp.Purged)
|
||||
@@ -4582,6 +4587,29 @@ func TestJetStreamRequestAPI(t *testing.T) {
|
||||
if ti.Streams[0] != server.CanonicalName("kv.22") {
|
||||
t.Fatalf("Expected stream with name %q, but got %q", server.CanonicalName("kv.22"), ti.Streams[0])
|
||||
}
|
||||
|
||||
// Test that we can send nil or an empty legal json for requests that take no args.
|
||||
// We know this stream does not exist, this just checking request processing.
|
||||
checkEmptyReqArg := func(arg string) {
|
||||
t.Helper()
|
||||
var req []byte
|
||||
if len(arg) > 0 {
|
||||
req = []byte(arg)
|
||||
}
|
||||
resp, err = nc.Request(fmt.Sprintf(server.JetStreamDeleteStreamT, "foo_bar_baz"), req, time.Second)
|
||||
var dResp server.JSApiStreamDeleteResponse
|
||||
if err = json.Unmarshal(resp.Data, &dResp); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if dResp.Error == nil || dResp.Error.Code != 500 {
|
||||
t.Fatalf("Got a bad response, expected a non 400 response %+v", dResp.Error)
|
||||
}
|
||||
}
|
||||
|
||||
checkEmptyReqArg("")
|
||||
checkEmptyReqArg("{}")
|
||||
checkEmptyReqArg(" {} ")
|
||||
checkEmptyReqArg(" { } ")
|
||||
}
|
||||
|
||||
func TestJetStreamUpdateStream(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user