diff --git a/server/stream.go b/server/stream.go index ee6dcd6c..14bb3363 100644 --- a/server/stream.go +++ b/server/stream.go @@ -195,13 +195,12 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo // Setup our internal send go routine. mset.setupSendCapabilities() - // Create our pubAck here. This will be reused and for +OK will contain JSON - // for stream name and sequence. - longestSeq := strconv.FormatUint(math.MaxUint64, 10) - lpubAck := len(OK) + len(cfg.Name) + len("{\"stream\": ,\"seq\": }") + len(longestSeq) - mset.pubAck = make([]byte, 0, lpubAck) - mset.pubAck = append(mset.pubAck, OK...) - mset.pubAck = append(mset.pubAck, fmt.Sprintf(" {\"stream\": %q, \"seq\": ", cfg.Name)...) + // Create our pubAck template here. Better than json marshal each time on success. + b, _ := json.Marshal(&JSPubAckResponse{ + ApiResponse: ApiResponse{Type: JSApiPubAckResponseType}, + PubAck: &PubAck{Stream: cfg.Name, Sequence: math.MaxUint64}, + }) + mset.pubAck = b[:bytes.Index(b, []byte(strconv.FormatUint(math.MaxUint64, 10)))] // Rebuild dedupe as needed. mset.rebuildDedupe() @@ -921,6 +920,8 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj numConsumers := len(mset.consumers) interestRetention := mset.config.Retention == InterestPolicy + var resp = &JSPubAckResponse{ApiResponse: ApiResponse{Type: JSApiPubAckResponseType}} + // Process msg headers if present. var msgId string if pc != nil && pc.pa.hdr > 0 { @@ -931,7 +932,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj mset.mu.Unlock() if doAck && len(reply) > 0 { response := append(pubAck, strconv.FormatUint(dde.seq, 10)...) - response = append(response, ", \"duplicate\": true}"...) + response = append(response, ",\"duplicate\": true}"...) sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} } return @@ -940,8 +941,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name { mset.mu.Unlock() if doAck && len(reply) > 0 { - response := []byte("-ERR 'wrong expected stream'") - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + resp.Error = &ApiError{Code: 400, Description: "expected stream does not match"} + b, _ := json.Marshal(resp) + sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} } return } @@ -950,8 +952,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj lseq := mset.lseq mset.mu.Unlock() if doAck && len(reply) > 0 { - response := []byte(fmt.Sprintf("-ERR 'wrong last sequence: %d'", lseq)) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last sequence: %d", lseq)} + b, _ := json.Marshal(resp) + sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} } return } @@ -960,8 +963,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj last := mset.lmsgId mset.mu.Unlock() if doAck && len(reply) > 0 { - response := []byte(fmt.Sprintf("-ERR 'wrong last msg ID: %s'", last)) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last msg ID: %s", last)} + b, _ := json.Marshal(resp) + sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} } return } @@ -986,9 +990,10 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj // Check to see if we are over the max msg size. if maxMsgSize >= 0 && len(msg) > maxMsgSize { mset.mu.Unlock() - response = []byte("-ERR 'message size exceeds maximum allowed'") if doAck && len(reply) > 0 { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"} + b, _ := json.Marshal(resp) + mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} } return } @@ -1048,10 +1053,16 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj if err != ErrStoreClosed { c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err) } - response = []byte(fmt.Sprintf("-ERR '%v'", err)) + if doAck && len(reply) > 0 { + resp.Error = &ApiError{Code: 400, Description: err.Error()} + response, _ = json.Marshal(resp) + } } else if jsa.limitsExceeded(stype) { c.Warnf("JetStream resource limits exceeded for account: %q", accName) - response = []byte("-ERR 'resource limits exceeded for account'") + if doAck && len(reply) > 0 { + resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"} + response, _ = json.Marshal(resp) + } store.RemoveMsg(seq) seq = 0 } else { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index e4bf5222..4cae4f65 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -285,8 +285,8 @@ func TestJetStreamAddStreamDiscardNew(t *testing.T) { if resp == nil { t.Fatalf("No response, possible timeout?") } - if string(resp.Data) != "-ERR 'maximum messages exceeded'" { - t.Fatalf("Expected to get an error about maximum messages, got %q", resp.Data) + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error.Description != "maximum messages exceeded" { + t.Fatalf("Expected to get an error about maximum messages, got %q", pa.Error) } // Now do bytes. @@ -297,7 +297,7 @@ func TestJetStreamAddStreamDiscardNew(t *testing.T) { if resp == nil { t.Fatalf("No response, possible timeout?") } - if string(resp.Data) != "-ERR 'maximum bytes exceeded'" { + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error.Description != "maximum bytes exceeded" { t.Fatalf("Expected to get an error about maximum bytes, got %q", resp.Data) } }) @@ -382,18 +382,15 @@ func TestJetStreamPubAck(t *testing.T) { if resp == nil { t.Fatalf("No response from send stream msg") } - if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { - t.Fatalf("Did not get a correct response: %q", resp.Data) + pa := getPubAckResponse(resp.Data) + if pa == nil || pa.Error != nil { + t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data) } - var pubAck server.PubAck - if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil { - t.Fatalf("Unexpected error: %v", err) + if pa.Stream != sname { + t.Fatalf("Expected %q for stream name, got %q", sname, pa.Stream) } - if pubAck.Stream != sname { - t.Fatalf("Expected %q for stream name, got %q", sname, pubAck.Stream) - } - if pubAck.Sequence != seq { - t.Fatalf("Expected %d for sequence, got %d", seq, pubAck.Sequence) + if pa.Sequence != seq { + t.Fatalf("Expected %d for sequence, got %d", seq, pa.Sequence) } } @@ -914,8 +911,8 @@ func TestJetStreamAddStreamMaxMsgSize(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if string(resp.Data) != "-ERR 'message size exceeds maximum allowed'" { - t.Fatalf("Expected to get an error for maximum message size, got %q", resp.Data) + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error.Description != "message size exceeds maximum allowed" { + t.Fatalf("Expected to get an error for maximum message size, got %q", pa.Error) } }) } @@ -1105,14 +1102,11 @@ func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) *server.Pub if resp == nil { t.Fatalf("No response for %q, possible timeout?", msg) } - if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { - t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data) + pa := getPubAckResponse(resp.Data) + if pa == nil || pa.Error != nil { + t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data) } - var pubAck server.PubAck - if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - return &pubAck + return pa.PubAck } func TestJetStreamBasicAckPublish(t *testing.T) { @@ -3350,17 +3344,14 @@ func TestJetStreamPublishDeDupe(t *testing.T) { if resp == nil { t.Fatalf("No response for %q, possible timeout?", msg) } - if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { + pa := getPubAckResponse(resp.Data) + if pa == nil || pa.Error != nil { t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data) } - var pubAck server.PubAck - if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil { - t.Fatalf("Unexpected error: %v", err) + if pa.Sequence != seq { + t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pa.Sequence) } - if pubAck.Sequence != seq { - t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pubAck.Sequence) - } - return &pubAck + return pa.PubAck } expect := func(n uint64) { @@ -3464,6 +3455,14 @@ func TestJetStreamPublishDeDupe(t *testing.T) { nmids(0) } +func getPubAckResponse(msg []byte) *server.JSPubAckResponse { + var par server.JSPubAckResponse + if err := json.Unmarshal(msg, &par); err != nil { + return nil + } + return &par +} + func TestJetStreamPublishExpect(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -3490,8 +3489,8 @@ func TestJetStreamPublishExpect(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { - t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data) + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error != nil { + t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data) } // Now test that we get an error back when expecting a different stream. @@ -3500,7 +3499,7 @@ func TestJetStreamPublishExpect(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) { + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil { t.Fatalf("Expected an error, got %q", resp.Data) } @@ -3511,7 +3510,7 @@ func TestJetStreamPublishExpect(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) { + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil { t.Fatalf("Expected an error, got %q", resp.Data) } @@ -3530,7 +3529,7 @@ func TestJetStreamPublishExpect(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) { + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil { t.Fatalf("Expected an error, got %q", resp.Data) } @@ -3555,8 +3554,8 @@ func TestJetStreamPublishExpect(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { - t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data) + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error != nil { + t.Fatalf("Expected a valid JetStreamPubAck, got %q", resp.Data) } } @@ -8851,6 +8850,46 @@ func TestJetStreamPubPerf(t *testing.T) { fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) } +func TestJetStreamPubWithAsyncResponsePerf(t *testing.T) { + // Comment out to run, holding place for now. + t.SkipNow() + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + acc := s.GlobalAccount() + + msetConfig := server.StreamConfig{ + Name: "sr33", + Storage: server.MemoryStorage, + Subjects: []string{"foo"}, + } + + if _, err := acc.AddStream(&msetConfig); err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + toSend := 1000000 + payload := []byte("Hello World") + + start := time.Now() + for i := 0; i < toSend; i++ { + nc.PublishRequest("foo", "bar", payload) + } + nc.Flush() + + tt := time.Since(start) + fmt.Printf("time is %v\n", tt) + fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) +} + func TestJetStreamConsumerPerf(t *testing.T) { // Comment out to run, holding place for now. t.SkipNow()