PubAck details that provide stream name and sequence assigned

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-04-13 17:20:24 -07:00
parent 4139777ea8
commit 0a36706958
3 changed files with 75 additions and 4 deletions

View File

@@ -16,6 +16,7 @@ package server
import (
"encoding/json"
"fmt"
"math"
"path"
"strconv"
"sync"
@@ -39,6 +40,14 @@ type StreamConfig struct {
Template string `json:"template_owner,omitempty"`
}
// PubAck is the detail you get back from a publish to a stream that was successful.
// e.g. +OK {"stream": "my_stream", "seq": 22}
type PubAck struct {
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
}
// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
Config StreamConfig `json:"config"`
State StreamState `json:"state"`
@@ -53,6 +62,7 @@ type Stream struct {
jsa *jsAccount
client *client
sid int
pubAck []byte
sendq chan *jsPubMsg
store StreamStore
consumers map[string]*Consumer
@@ -137,6 +147,14 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo
return nil, err
}
// Create our pubAck here. This will be reused and for +OK will contain JSON
// for stream name and sequence.
longestSeq := strconv.FormatUint(math.MaxUint64, 10)
lpubAck := len(AckAck) + len(cfg.Name) + len("{\"stream\": ,\"seq\": }") + len(longestSeq)
mset.pubAck = make([]byte, 0, lpubAck)
mset.pubAck = append(mset.pubAck, AckAck...)
mset.pubAck = append(mset.pubAck, fmt.Sprintf(" {\"stream\": %q, \"seq\": ", cfg.Name)...)
return mset, nil
}
@@ -507,6 +525,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje
accName = c.acc.Name
}
doAck := !mset.config.NoAck
pubAck := mset.pubAck
jsa := mset.jsa
stype := mset.config.Storage
name := mset.config.Name
@@ -519,7 +538,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje
}
// Response to send.
response := AckAck
var response []byte
var seq uint64
var err error
@@ -538,6 +557,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje
response = []byte("-ERR 'resource limits exceeded for account'")
store.RemoveMsg(seq)
seq = 0
} else if err == nil && doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
}

View File

@@ -14,6 +14,7 @@
package test
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
@@ -221,6 +222,55 @@ func TestJetStreamAddStream(t *testing.T) {
}
}
func TestJetStreamPubAck(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
sname := "PUBACK"
acc := s.GlobalAccount()
mconfig := &server.StreamConfig{Name: sname, Subjects: []string{"foo"}, Storage: server.MemoryStorage}
mset, err := acc.AddStream(mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
checkRespDetails := func(resp *nats.Msg, err error, seq uint64) {
if err != nil {
t.Fatalf("Unexpected error from send stream msg: %v", err)
}
if resp == nil {
t.Fatalf("No response from send stream msg")
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Did not get a correct response: %q", resp.Data)
}
var pubAck server.PubAck
if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if pubAck.Stream != sname {
t.Fatalf("Expected %q for stream name, got %q", sname, pubAck.Stream)
}
if pubAck.Seq != seq {
t.Fatalf("Expected %d for sequence, got %d", seq, pubAck.Seq)
}
}
// Send messages and make sure pubAck details are correct.
for i := uint64(1); i <= 1000; i++ {
resp, err := nc.Request("foo", []byte("HELLO"), 100*time.Millisecond)
checkRespDetails(resp, err, i)
}
}
func TestJetStreamConsumerWithStartTime(t *testing.T) {
subj := "my_stream"
cases := []struct {
@@ -588,7 +638,7 @@ func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) {
if resp == nil {
t.Fatalf("No response, possible timeout?")
}
if string(resp.Data) != server.OK {
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
}
}

View File

@@ -604,8 +604,7 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
sendSubj := "bar"
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
sendStreamMsg(t, nc, sendSubj, "Hello World!")
}
// Wait for test to complete.