mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Remove hdrs for now, find better way to deliver in client
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1276,40 +1276,14 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t
|
||||
return true
|
||||
}
|
||||
|
||||
// Some constants for headers.
|
||||
// TODO(dlc) - Move these to a more generic place.
|
||||
const (
|
||||
hdrLine = "NATS/1.0\r\n"
|
||||
crlf = "\r\n"
|
||||
cHdrsT = "Jetstream-Stream-Sequence: %d\r\nJetstream-Sequence: %d\r\nJetstream-Deliver-Count: %d\r\n\r\n"
|
||||
)
|
||||
|
||||
// createMsgHeader will add on custom headers.
|
||||
// TODO(dlc) - if we know client can not receive could avoid.
|
||||
func createMsgHeader(ohdr []byte, sseq, dseq, dcount uint64) []byte {
|
||||
var hdr []byte
|
||||
if len(ohdr) > 0 {
|
||||
// Strip ending CRLF, will put it back on at end.
|
||||
hdr = ohdr[:len(ohdr)-len(crlf)]
|
||||
} else {
|
||||
hdr = []byte(hdrLine)
|
||||
}
|
||||
// Now add in the consumer fields.
|
||||
// TODO(dlc) - Make more efficient.
|
||||
hdr = append(hdr, []byte(fmt.Sprintf(cHdrsT, sseq, dseq, dcount))...)
|
||||
return hdr
|
||||
}
|
||||
|
||||
// Deliver a msg to the observable.
|
||||
// Lock should be held and o.mset validated to be non-nil.
|
||||
func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount uint64, ts int64) {
|
||||
if o.mset == nil {
|
||||
return
|
||||
}
|
||||
// Create the headers.
|
||||
ahdr := createMsgHeader(hdr, seq, o.dseq, dcount)
|
||||
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), ahdr, msg, o, seq}
|
||||
|
||||
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), hdr, msg, o, seq}
|
||||
sendq := o.mset.sendq
|
||||
|
||||
// This needs to be unlocked since the other side may need this lock on failed delivery.
|
||||
|
||||
@@ -6345,12 +6345,6 @@ func TestJetStreamMsgHeaders(t *testing.T) {
|
||||
cm.Header.Get("Authorization") != "s3cr3t" {
|
||||
t.Fatalf("Original headers not present")
|
||||
}
|
||||
// Now check for jetstream headers.
|
||||
if cm.Header.Get("Jetstream-Stream-Sequence") != "1" ||
|
||||
cm.Header.Get("Jetstream-Sequence") != "1" ||
|
||||
cm.Header.Get("Jetstream-Deliver-Count") != "1" {
|
||||
t.Fatalf("Did not get proper Jetstream headers: %+v", cm.Header)
|
||||
}
|
||||
if !bytes.Equal(m.Data, cm.Data) {
|
||||
t.Fatalf("Message payloads are not the same: %q vs %q", cm.Data, m.Data)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user