Formalized PubAckResponse

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-11-22 16:31:37 -08:00
parent a50f96461b
commit afa5cae58c
2 changed files with 105 additions and 55 deletions

View File

@@ -195,13 +195,12 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo
// Setup our internal send go routine.
mset.setupSendCapabilities()
// Create our pubAck here. This will be reused and for +OK will contain JSON
// for stream name and sequence.
longestSeq := strconv.FormatUint(math.MaxUint64, 10)
lpubAck := len(OK) + len(cfg.Name) + len("{\"stream\": ,\"seq\": }") + len(longestSeq)
mset.pubAck = make([]byte, 0, lpubAck)
mset.pubAck = append(mset.pubAck, OK...)
mset.pubAck = append(mset.pubAck, fmt.Sprintf(" {\"stream\": %q, \"seq\": ", cfg.Name)...)
// Create our pubAck template here. Better than json marshal each time on success.
b, _ := json.Marshal(&JSPubAckResponse{
ApiResponse: ApiResponse{Type: JSApiPubAckResponseType},
PubAck: &PubAck{Stream: cfg.Name, Sequence: math.MaxUint64},
})
mset.pubAck = b[:bytes.Index(b, []byte(strconv.FormatUint(math.MaxUint64, 10)))]
// Rebuild dedupe as needed.
mset.rebuildDedupe()
@@ -921,6 +920,8 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
numConsumers := len(mset.consumers)
interestRetention := mset.config.Retention == InterestPolicy
var resp = &JSPubAckResponse{ApiResponse: ApiResponse{Type: JSApiPubAckResponseType}}
// Process msg headers if present.
var msgId string
if pc != nil && pc.pa.hdr > 0 {
@@ -931,7 +932,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ", \"duplicate\": true}"...)
response = append(response, ",\"duplicate\": true}"...)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
return
@@ -940,8 +941,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := []byte("-ERR 'wrong expected stream'")
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
resp.Error = &ApiError{Code: 400, Description: "expected stream does not match"}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
}
return
}
@@ -950,8 +952,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
lseq := mset.lseq
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := []byte(fmt.Sprintf("-ERR 'wrong last sequence: %d'", lseq))
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last sequence: %d", lseq)}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
}
return
}
@@ -960,8 +963,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
last := mset.lmsgId
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := []byte(fmt.Sprintf("-ERR 'wrong last msg ID: %s'", last))
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last msg ID: %s", last)}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
}
return
}
@@ -986,9 +990,10 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && len(msg) > maxMsgSize {
mset.mu.Unlock()
response = []byte("-ERR 'message size exceeds maximum allowed'")
if doAck && len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
b, _ := json.Marshal(resp)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
}
return
}
@@ -1048,10 +1053,16 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
if err != ErrStoreClosed {
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
}
response = []byte(fmt.Sprintf("-ERR '%v'", err))
if doAck && len(reply) > 0 {
resp.Error = &ApiError{Code: 400, Description: err.Error()}
response, _ = json.Marshal(resp)
}
} else if jsa.limitsExceeded(stype) {
c.Warnf("JetStream resource limits exceeded for account: %q", accName)
response = []byte("-ERR 'resource limits exceeded for account'")
if doAck && len(reply) > 0 {
resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"}
response, _ = json.Marshal(resp)
}
store.RemoveMsg(seq)
seq = 0
} else {

View File

@@ -285,8 +285,8 @@ func TestJetStreamAddStreamDiscardNew(t *testing.T) {
if resp == nil {
t.Fatalf("No response, possible timeout?")
}
if string(resp.Data) != "-ERR 'maximum messages exceeded'" {
t.Fatalf("Expected to get an error about maximum messages, got %q", resp.Data)
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error.Description != "maximum messages exceeded" {
t.Fatalf("Expected to get an error about maximum messages, got %q", pa.Error)
}
// Now do bytes.
@@ -297,7 +297,7 @@ func TestJetStreamAddStreamDiscardNew(t *testing.T) {
if resp == nil {
t.Fatalf("No response, possible timeout?")
}
if string(resp.Data) != "-ERR 'maximum bytes exceeded'" {
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error.Description != "maximum bytes exceeded" {
t.Fatalf("Expected to get an error about maximum bytes, got %q", resp.Data)
}
})
@@ -382,18 +382,15 @@ func TestJetStreamPubAck(t *testing.T) {
if resp == nil {
t.Fatalf("No response from send stream msg")
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Did not get a correct response: %q", resp.Data)
pa := getPubAckResponse(resp.Data)
if pa == nil || pa.Error != nil {
t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data)
}
var pubAck server.PubAck
if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil {
t.Fatalf("Unexpected error: %v", err)
if pa.Stream != sname {
t.Fatalf("Expected %q for stream name, got %q", sname, pa.Stream)
}
if pubAck.Stream != sname {
t.Fatalf("Expected %q for stream name, got %q", sname, pubAck.Stream)
}
if pubAck.Sequence != seq {
t.Fatalf("Expected %d for sequence, got %d", seq, pubAck.Sequence)
if pa.Sequence != seq {
t.Fatalf("Expected %d for sequence, got %d", seq, pa.Sequence)
}
}
@@ -914,8 +911,8 @@ func TestJetStreamAddStreamMaxMsgSize(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(resp.Data) != "-ERR 'message size exceeds maximum allowed'" {
t.Fatalf("Expected to get an error for maximum message size, got %q", resp.Data)
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error.Description != "message size exceeds maximum allowed" {
t.Fatalf("Expected to get an error for maximum message size, got %q", pa.Error)
}
})
}
@@ -1105,14 +1102,11 @@ func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) *server.Pub
if resp == nil {
t.Fatalf("No response for %q, possible timeout?", msg)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
pa := getPubAckResponse(resp.Data)
if pa == nil || pa.Error != nil {
t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data)
}
var pubAck server.PubAck
if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return &pubAck
return pa.PubAck
}
func TestJetStreamBasicAckPublish(t *testing.T) {
@@ -3350,17 +3344,14 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
if resp == nil {
t.Fatalf("No response for %q, possible timeout?", msg)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
pa := getPubAckResponse(resp.Data)
if pa == nil || pa.Error != nil {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
}
var pubAck server.PubAck
if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil {
t.Fatalf("Unexpected error: %v", err)
if pa.Sequence != seq {
t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pa.Sequence)
}
if pubAck.Sequence != seq {
t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pubAck.Sequence)
}
return &pubAck
return pa.PubAck
}
expect := func(n uint64) {
@@ -3464,6 +3455,14 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
nmids(0)
}
func getPubAckResponse(msg []byte) *server.JSPubAckResponse {
var par server.JSPubAckResponse
if err := json.Unmarshal(msg, &par); err != nil {
return nil
}
return &par
}
func TestJetStreamPublishExpect(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
@@ -3490,8 +3489,8 @@ func TestJetStreamPublishExpect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error != nil {
t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data)
}
// Now test that we get an error back when expecting a different stream.
@@ -3500,7 +3499,7 @@ func TestJetStreamPublishExpect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) {
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil {
t.Fatalf("Expected an error, got %q", resp.Data)
}
@@ -3511,7 +3510,7 @@ func TestJetStreamPublishExpect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) {
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil {
t.Fatalf("Expected an error, got %q", resp.Data)
}
@@ -3530,7 +3529,7 @@ func TestJetStreamPublishExpect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) {
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil {
t.Fatalf("Expected an error, got %q", resp.Data)
}
@@ -3555,8 +3554,8 @@ func TestJetStreamPublishExpect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error != nil {
t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data)
}
}
@@ -8851,6 +8850,46 @@ func TestJetStreamPubPerf(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}
func TestJetStreamPubWithAsyncResponsePerf(t *testing.T) {
// Comment out to run, holding place for now.
t.SkipNow()
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
acc := s.GlobalAccount()
msetConfig := server.StreamConfig{
Name: "sr33",
Storage: server.MemoryStorage,
Subjects: []string{"foo"},
}
if _, err := acc.AddStream(&msetConfig); err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
nc := clientConnectToServer(t, s)
defer nc.Close()
toSend := 1000000
payload := []byte("Hello World")
start := time.Now()
for i := 0; i < toSend; i++ {
nc.PublishRequest("foo", "bar", payload)
}
nc.Flush()
tt := time.Since(start)
fmt.Printf("time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}
func TestJetStreamConsumerPerf(t *testing.T) {
// Comment out to run, holding place for now.
t.SkipNow()