Add in headers to consumer delivered messages

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-05-30 14:10:58 -07:00
parent eca04c6fce
commit dbde2479c2
8 changed files with 62 additions and 12 deletions

View File

@@ -94,7 +94,10 @@ func RunJetStreamServerOnPort(port int, sd string) *server.Server {
}
func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn {
nc, err := nats.Connect(s.ClientURL(), nats.Name("JS-TEST"), nats.ReconnectWait(5*time.Millisecond), nats.MaxReconnects(-1))
nc, err := nats.Connect(s.ClientURL(),
nats.Name("JS-TEST"),
nats.ReconnectWait(5*time.Millisecond),
nats.MaxReconnects(-1))
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
@@ -793,7 +796,7 @@ func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) {
t.Helper()
resp, _ := nc.Request(subject, []byte(msg), 100*time.Millisecond)
if resp == nil {
t.Fatalf("No response, possible timeout?")
t.Fatalf("No response for %q, possible timeout?", msg)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
@@ -6245,10 +6248,20 @@ func TestJetStreamMsgHeaders(t *testing.T) {
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Remove reply subject and sub for comparison.
cm.Sub, cm.Reply = nil, ""
if !reflect.DeepEqual(cm, m) {
t.Fatalf("Messages do not match: %+v vs %+v", cm, m)
// Check the message.
// Check out original headers.
if cm.Header.Get("Accept-Encoding") != "json" ||
cm.Header.Get("Authorization") != "s3cr3t" {
t.Fatalf("Original headers not present")
}
// Now check for jetstream headers.
if cm.Header.Get("Jetstream-Stream-Sequence") != "1" ||
cm.Header.Get("Jetstream-Sequence") != "1" ||
cm.Header.Get("Jetstream-Deliver-Count") != "1" {
t.Fatalf("Did not get proper Jetstream headers: %+v", cm.Header)
}
if !bytes.Equal(m.Data, cm.Data) {
t.Fatalf("Message payloads are not the same: %q vs %q", cm.Data, m.Data)
}
})
}