diff --git a/server/client.go b/server/client.go index aab8306b..c48405cd 100644 --- a/server/client.go +++ b/server/client.go @@ -117,7 +117,10 @@ var readLoopReportThreshold = readLoopReport // Represent client booleans with a bitmask type clientFlag uint16 -const hdrLine = "NATS/1.0\r\n" +const ( + hdrLine = "NATS/1.0\r\n" + emptyHdrLine = "NATS/1.0\r\n\r\n" +) // Some client state represented as flags const ( @@ -3587,7 +3590,11 @@ func removeHeaderIfPresent(hdr []byte, key string) []byte { if end < 0 { return hdr } - return append(hdr[:start], hdr[start+end+len(_CRLF_):]...) + nhdr := append(hdr[:start], hdr[start+end+len(_CRLF_):]...) + if len(nhdr) <= len(emptyHdrLine) { + return nil + } + return nhdr } // Generate a new header based on optional original header and key value. @@ -3703,8 +3710,15 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // TODO(dlc) - restrict to configured service imports and not responses? tracking, headers := shouldSample(si.latency, c) if len(c.pa.reply) > 0 { - if rsi = c.setupResponseServiceImport(acc, si, tracking, headers); rsi != nil { - nrr = []byte(rsi.from) + // Special case for now, need to formalize. + // TODO(dlc) - Formalize as a service import option for reply rewrite. + // For now we can't do $JS.ACK since that breaks pull consumers across accounts. + if !bytes.HasPrefix(c.pa.reply, []byte(jsAckPre)) { + if rsi = c.setupResponseServiceImport(acc, si, tracking, headers); rsi != nil { + nrr = []byte(rsi.from) + } + } else { + nrr = c.pa.reply } } else { // Check to see if this was a bad request with no reply and we were supposed to be tracking. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index d6dcfa39..b185e515 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3910,6 +3910,34 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) { if len(streams.Streams) != 1 { t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams)) } + + // Now send to stream. + if _, err := js.Publish("TEST", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + + sub, err = js.SubscribeSync("TEST", nats.Durable("tr"), nats.Pull(1)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + checkSubsPending(t, sub, 1) + m, err := sub.NextMsg(0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if m.Subject != "TEST" { + t.Fatalf("Expected subject of %q, got %q", "TEST", m.Subject) + } + if m.Header != nil { + t.Fatalf("Expected no header on the message, got: %v", m.Header) + } + meta, err := m.MetaData() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if meta.Consumer != 1 || meta.Stream != 1 || meta.Delivered != 1 || meta.Pending != 0 { + t.Fatalf("Bad meta: %+v", meta) + } } // Support functions @@ -4111,11 +4139,17 @@ var jsClusterImportsTempl = ` JS { jetstream: enabled users = [ { user: "rip", pass: "pass" } ] - exports [ { service: "$JS.API.>" } ] + exports [ + { service: "$JS.API.>" } + { service: "TEST" } # For publishing to the stream. + ] } IA { users = [ { user: "dlc", pass: "pass" } ] - imports [ { service: { subject: "$JS.API.>", account: JS }} ] + imports [ + { service: { subject: "$JS.API.>", account: JS }} + { service: { subject: "TEST", account: JS }} + ] } $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } diff --git a/server/stream.go b/server/stream.go index e498e044..7330cc31 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1726,7 +1726,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } - // Process msg headers if present. + // If we have received this message across an account we may have request information attached. + // For now remove. TODO(dlc) - Should this be opt-in or opt-out? + if len(hdr) > 0 { + hdr = removeHeaderIfPresent(hdr, ClientInfoHdr) + } + + // Process additional msg headers if still present. var msgId string if len(hdr) > 0 { msgId = getMsgId(hdr)