Cross account pull consumers would not work correctly due to reply subject rewrite.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-24 13:33:24 -08:00
parent 338e017dbe
commit 36a2cbeff7
3 changed files with 61 additions and 7 deletions

View File

@@ -117,7 +117,10 @@ var readLoopReportThreshold = readLoopReport
// Represent client booleans with a bitmask
type clientFlag uint16
const hdrLine = "NATS/1.0\r\n"
const (
hdrLine = "NATS/1.0\r\n"
emptyHdrLine = "NATS/1.0\r\n\r\n"
)
// Some client state represented as flags
const (
@@ -3587,7 +3590,11 @@ func removeHeaderIfPresent(hdr []byte, key string) []byte {
if end < 0 {
return hdr
}
return append(hdr[:start], hdr[start+end+len(_CRLF_):]...)
nhdr := append(hdr[:start], hdr[start+end+len(_CRLF_):]...)
if len(nhdr) <= len(emptyHdrLine) {
return nil
}
return nhdr
}
// Generate a new header based on optional original header and key value.
@@ -3703,8 +3710,15 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// TODO(dlc) - restrict to configured service imports and not responses?
tracking, headers := shouldSample(si.latency, c)
if len(c.pa.reply) > 0 {
if rsi = c.setupResponseServiceImport(acc, si, tracking, headers); rsi != nil {
nrr = []byte(rsi.from)
// Special case for now, need to formalize.
// TODO(dlc) - Formalize as a service import option for reply rewrite.
// For now we can't do $JS.ACK since that breaks pull consumers across accounts.
if !bytes.HasPrefix(c.pa.reply, []byte(jsAckPre)) {
if rsi = c.setupResponseServiceImport(acc, si, tracking, headers); rsi != nil {
nrr = []byte(rsi.from)
}
} else {
nrr = c.pa.reply
}
} else {
// Check to see if this was a bad request with no reply and we were supposed to be tracking.

View File

@@ -3910,6 +3910,34 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
if len(streams.Streams) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams))
}
// Now send to stream.
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
sub, err = js.SubscribeSync("TEST", nats.Durable("tr"), nats.Pull(1))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 1)
m, err := sub.NextMsg(0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if m.Subject != "TEST" {
t.Fatalf("Expected subject of %q, got %q", "TEST", m.Subject)
}
if m.Header != nil {
t.Fatalf("Expected no header on the message, got: %v", m.Header)
}
meta, err := m.MetaData()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if meta.Consumer != 1 || meta.Stream != 1 || meta.Delivered != 1 || meta.Pending != 0 {
t.Fatalf("Bad meta: %+v", meta)
}
}
// Support functions
@@ -4111,11 +4139,17 @@ var jsClusterImportsTempl = `
JS {
jetstream: enabled
users = [ { user: "rip", pass: "pass" } ]
exports [ { service: "$JS.API.>" } ]
exports [
{ service: "$JS.API.>" }
{ service: "TEST" } # For publishing to the stream.
]
}
IA {
users = [ { user: "dlc", pass: "pass" } ]
imports [ { service: { subject: "$JS.API.>", account: JS }} ]
imports [
{ service: { subject: "$JS.API.>", account: JS }}
{ service: { subject: "TEST", account: JS }}
]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}

View File

@@ -1726,7 +1726,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}
// Process msg headers if present.
// If we have received this message across an account we may have request information attached.
// For now remove. TODO(dlc) - Should this be opt-in or opt-out?
if len(hdr) > 0 {
hdr = removeHeaderIfPresent(hdr, ClientInfoHdr)
}
// Process additional msg headers if still present.
var msgId string
if len(hdr) > 0 {
msgId = getMsgId(hdr)