diff --git a/server/consumer.go b/server/consumer.go index 70a0fb43..60a952d5 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -350,8 +350,9 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { // We will remember the template to generate replies with sequence numbers and use // that to scanf them back in. mn := mset.config.Name - o.ackReplyT = fmt.Sprintf("%s.%s.%s.%%d.%%d.%%d", JetStreamAckPre, mn, o.name) - ackSubj := fmt.Sprintf("%s.%s.%s.*.*.*", JetStreamAckPre, mn, o.name) + pre := fmt.Sprintf(JetStreamAckT, mn, o.name) + o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d", pre) + ackSubj := fmt.Sprintf("%s.*.*.*", pre) if sub, err := mset.subscribeInternal(ackSubj, o.processAck); err != nil { return nil, err } else { @@ -359,7 +360,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { } // Setup the internal sub for next message requests. if !o.isPushMode() { - o.nextMsgSubj = fmt.Sprintf("%s.%s.%s", JetStreamRequestNextPre, mn, o.name) + o.nextMsgSubj = fmt.Sprintf(JetStreamRequestNextT, mn, o.name) if sub, err := mset.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil { o.Delete() return nil, err diff --git a/server/jetstream.go b/server/jetstream.go index 43e73a19..76c8f1e7 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -127,14 +127,15 @@ const ( JetStreamDeleteConsumer = "$JS.STREAM.*.CONSUMER.*.DELETE" JetStreamDeleteConsumerT = "$JS.STREAM.%s.CONSUMER.%s.DELETE" - // JetStreamAckPre is the prefix for the ack stream coming back to an consumer. - JetStreamAckPre = "$JS.A" + // JetStreamAckT is the template for the ack message stream coming back from an consumer + // when they ACK/NAK, etc a message. + JetStreamAckT = "$JS.STREAM.%s.CONSUMER.%s.ACK" - // JetStreamRequestNextPre is the prefix for the request next message(s) for a consumer in worker/pull mode. - JetStreamRequestNextPre = "$JS.NEXT" + // JetStreamRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. + JetStreamRequestNextT = "$JS.STREAM.%s.CONSUMER.%s.NEXT" - // JetStreamMsgBySeqPre is the prefix for direct requests for a message by its stream sequence number. - JetStreamMsgBySeqPre = "$JS.BYSEQ" + // JetStreamMsgBySeqT is the template for direct requests for a message by its stream sequence number. + JetStreamMsgBySeqT = "$JS.STREAM.%s.MSG.BYSEQ" // JetStreamAdvisoryPrefix is a prefix for all JetStream advisories JetStreamAdvisoryPrefix = "$JS.EVENT.ADVISORY" diff --git a/server/stream.go b/server/stream.go index f4250ad3..4cd37bd1 100644 --- a/server/stream.go +++ b/server/stream.go @@ -245,7 +245,7 @@ func (mset *Stream) subscribeToStream() error { } } // Now subscribe for direct access - subj := fmt.Sprintf("%s.%s", JetStreamMsgBySeqPre, mset.config.Name) + subj := fmt.Sprintf(JetStreamMsgBySeqT, mset.config.Name) if _, err := mset.subscribeInternal(subj, mset.processMsgBySeq); err != nil { return err } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index cf8f3ae6..865ba8c3 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -187,6 +187,7 @@ func TestJetStreamAddStream(t *testing.T) { if err != nil { t.Fatalf("Unexpected error adding stream: %v", err) } + defer mset.Delete() nc := clientConnectToServer(t, s) defer nc.Close()