Add in formal support for multiple JetStream domains across leafnodes.

This CL adds in support for multiple JetStream domains using mapped subjects.
Mapping subjects aligns well with the JetStream context APIPrefix in clients.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-05-02 20:46:12 -07:00
committed by Ivan Kozlovic
parent 9eb12b6e1c
commit 0bd92e85da
8 changed files with 435 additions and 86 deletions

View File

@@ -41,6 +41,7 @@ type pubArg struct {
account []byte
subject []byte
deliver []byte
mapped []byte
reply []byte
szb []byte
hdb []byte
@@ -267,6 +268,7 @@ func (c *client) parse(buf []byte) error {
if err := c.processHeaderPub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
// If we don't have a saved buffer then jump ahead with
// the index. If this overruns what is left we fall out
@@ -405,17 +407,7 @@ func (c *client) parse(buf []byte) error {
if err := c.processPub(arg); err != nil {
return err
}
// Check if we have and account mappings or tees or filters.
// FIXME(dlc) - Probably better way to do this.
// Could add in cache but will be tricky since results based on pub subject are dynamic
// due to wildcard matching and weight sets.
if c.kind == CLIENT && c.in.flags.isSet(hasMappings) {
old := c.pa.subject
changed := c.selectMappedSubject()
if trace && changed {
c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", old, c.pa.subject)))
}
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
// If we don't have a saved buffer then jump ahead with
// the index. If this overruns what is left we fall out
@@ -470,14 +462,23 @@ func (c *client) parse(buf []byte) error {
} else {
c.msgBuf = buf[c.as : i+1]
}
// Check for mappings.
if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) {
changed := c.selectMappedSubject()
if trace && changed {
c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject)))
}
}
if trace {
c.traceMsg(c.msgBuf)
}
c.processInboundMsg(c.msgBuf)
c.argBuf, c.msgBuf, c.header = nil, nil, nil
c.drop, c.as, c.state = 0, i+1, OP_START
// Drop all pub args
c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject = nil, nil, nil, nil, nil
c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil
c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
lmsg = false
case OP_A: