Formalize requests for next msg, support NoWait and Expires

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-10-20 16:25:19 -07:00
parent a37f53e236
commit c2b8de377c
3 changed files with 153 additions and 18 deletions

View File

@@ -1051,15 +1051,25 @@ func (o *Consumer) needAck(sseq uint64) bool {
return needAck
}
// Default is 1 if msg is nil.
func batchSizeFromMsg(msg []byte) int {
bs := 1
if len(msg) > 0 {
if n, err := strconv.Atoi(string(msg)); err == nil {
bs = n
}
// Helper for the next message requests.
func nextReqFromMsg(msg []byte) (time.Time, int, bool, error) {
req := strings.TrimSpace(string(msg))
if len(req) == 0 {
return time.Time{}, 1, false, nil
}
return bs
if req[0] == '{' {
var cr JSApiConsumerGetNextRequest
if err := json.Unmarshal(msg, &cr); err != nil {
return time.Time{}, -1, false, err
}
return cr.Expires, cr.Batch, cr.NoWait, nil
}
// Naked batch size here for backward compatibility.
bs := 1
if n, err := strconv.Atoi(req); err == nil {
bs = n
}
return time.Time{}, bs, false, nil
}
// Represents a request that is on the internal waiting queue
@@ -1068,6 +1078,7 @@ type waitingRequest struct {
reply string
n int // For batching
expires time.Time
noWait bool
}
// waiting queue for requests that are waiting for new messages to arrive.
@@ -1153,9 +1164,6 @@ func (wq *waitQueue) pop() *waitingRequest {
// a single message. If the payload is a number parseable with Atoi(), then we will send a batch of messages without
// requiring another request to this endpoint, or an ACK.
func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string, msg []byte) {
// Check payload here to see if they sent in batch size.
batchSize := batchSizeFromMsg(msg)
o.mu.Lock()
mset := o.mset
if mset == nil || o.isPushMode() {
@@ -1163,20 +1171,30 @@ func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
return
}
sendErr := func(status int, description string) {
sendq := mset.sendq
o.mu.Unlock()
hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description))
pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0}
sendq <- pmsg // Send message.
}
if o.waiting.isFull() {
// If our waiting queue is full return an empty response with the proper header.
// FIXME(dlc) - Should we do advisory here as well?
sendq := mset.sendq
o.mu.Unlock()
hdr := []byte("NATS/1.0 500 WaitQueue Exceeded\r\n\r\n")
pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0}
// Send message.
sendq <- pmsg
sendErr(500, "WaitQueue Exceeded")
return
}
// Check payload here to see if they sent in batch size or a formal request.
expires, batchSize, noWait, err := nextReqFromMsg(msg)
if err != nil {
sendErr(400, "Bad Request")
return
}
// In case we have to queue up this request. This is all on stack pre-allocated.
wr := waitingRequest{client: c, reply: reply, n: batchSize}
wr := waitingRequest{client: c, reply: reply, n: batchSize, noWait: noWait, expires: expires}
// If we are in replay mode, defer to processReplay for delivery.
if o.replay {
@@ -1192,6 +1210,10 @@ func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
// Need to discount this from the total n for the request.
wr.n--
} else {
if wr.noWait {
sendErr(404, "No Messages")
return
}
o.waiting.add(&wr)
break
}

View File

@@ -416,6 +416,13 @@ type JSApiConsumerListResponse struct {
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
type JSApiConsumerGetNextRequest struct {
Expires time.Time `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
}
// JSApiStreamTemplateCreateResponse for creating templates.
type JSApiStreamTemplateCreateResponse struct {
ApiResponse

View File

@@ -1558,6 +1558,112 @@ func TestJetStreamWorkQueueWrapWaiting(t *testing.T) {
}
}
func TestJetStreamWorkQueueRequest(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{Name: "MY_MSG_SET", Storage: server.MemoryStorage, Subjects: []string{"foo", "bar"}}},
{"FileStore", &server.StreamConfig{Name: "MY_MSG_SET", Storage: server.FileStorage, Subjects: []string{"foo", "bar"}}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
o, err := mset.AddConsumer(workerModeConfig("WRAP"))
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
toSend := 25
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, "bar", "Hello World!")
}
reply := "_.consumer._"
sub, _ := nc.SubscribeSync(reply)
defer sub.Unsubscribe()
getSubj := o.RequestNextMsgSubject()
checkSubPending := func(numExpected int) {
t.Helper()
checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
}
return nil
})
}
// Create a formal request object.
req := &server.JSApiConsumerGetNextRequest{Batch: toSend}
jreq, _ := json.Marshal(req)
nc.PublishRequest(getSubj, reply, jreq)
checkSubPending(toSend)
// Now check that we can ask for NoWait
req.Batch = 1
req.NoWait = true
jreq, _ = json.Marshal(req)
resp, err := nc.Request(getSubj, jreq, 50*time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if status := resp.Header.Get("Status"); !strings.HasPrefix(status, "404") {
t.Fatalf("Expected status code of 404")
}
// Load up more messages.
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, "foo", "Hello World!")
}
// Now we will ask for a batch larger then what is queued up.
req.Batch = toSend + 10
req.NoWait = true
jreq, _ = json.Marshal(req)
nc.PublishRequest(getSubj, reply, jreq)
// We should now have 2 * toSend + the 404 message.
checkSubPending(2*toSend + 1)
for i := 0; i < 2*toSend+1; i++ {
sub.NextMsg(time.Millisecond)
}
checkSubPending(0)
mset.Purge()
// Now do expiration
req.Batch = 1
req.NoWait = false
req.Expires = time.Now().Add(10 * time.Millisecond)
jreq, _ = json.Marshal(req)
nc.PublishRequest(getSubj, reply, jreq)
// Let it expire
time.Sleep(20 * time.Millisecond)
// Send a few more messages. These should not be delivered to the sub.
sendStreamMsg(t, nc, "foo", "Hello World!")
sendStreamMsg(t, nc, "bar", "Hello World!")
checkSubPending(0)
})
}
}
func TestJetStreamSubjectFiltering(t *testing.T) {
cases := []struct {
name string