mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
@@ -59,7 +59,9 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, error) {
|
||||
|
||||
// Make copies - https://github.com/go101/go101/wiki
|
||||
// TODO(dlc) - Maybe be smarter here.
|
||||
msg = append(msg[:0:0], msg...)
|
||||
if len(msg) > 0 {
|
||||
msg = append(msg[:0:0], msg...)
|
||||
}
|
||||
|
||||
ms.msgs[seq] = &storedMsg{subj, msg, seq, time.Now().UnixNano()}
|
||||
ms.stats.Msgs++
|
||||
@@ -162,17 +164,10 @@ func (ms *memStore) deleteFirstMsgOrPanic() {
|
||||
}
|
||||
|
||||
func (ms *memStore) deleteFirstMsg() bool {
|
||||
sm, ok := ms.msgs[ms.stats.FirstSeq]
|
||||
if !ok || sm == nil {
|
||||
return false
|
||||
}
|
||||
delete(ms.msgs, ms.stats.FirstSeq)
|
||||
ms.stats.FirstSeq++
|
||||
ms.stats.Msgs--
|
||||
ms.stats.Bytes -= memStoreMsgSize(sm.subj, sm.msg)
|
||||
return true
|
||||
return ms.removeMsg(ms.stats.FirstSeq)
|
||||
}
|
||||
|
||||
// Lookup will lookup the message by sequence number.
|
||||
func (ms *memStore) Lookup(seq uint64) (string, []byte, int64, error) {
|
||||
ms.mu.RLock()
|
||||
sm, ok := ms.msgs[seq]
|
||||
@@ -184,6 +179,28 @@ func (ms *memStore) Lookup(seq uint64) (string, []byte, int64, error) {
|
||||
return sm.subj, sm.msg, sm.ts, nil
|
||||
}
|
||||
|
||||
// RemoveMsg will remove the message from this store.
|
||||
func (ms *memStore) RemoveMsg(seq uint64) bool {
|
||||
ms.mu.Lock()
|
||||
ok := ms.removeMsg(seq)
|
||||
ms.mu.Unlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// Removes the message referenced by seq.
|
||||
func (ms *memStore) removeMsg(seq uint64) bool {
|
||||
sm, ok := ms.msgs[seq]
|
||||
if ok {
|
||||
delete(ms.msgs, seq)
|
||||
ms.stats.Msgs--
|
||||
ms.stats.Bytes -= memStoreMsgSize(sm.subj, sm.msg)
|
||||
if seq == ms.stats.FirstSeq {
|
||||
ms.stats.FirstSeq++
|
||||
}
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
func (ms *memStore) Stats() MsgSetStats {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
|
||||
@@ -43,7 +43,7 @@ const (
|
||||
StreamPolicy RetentionPolicy = iota
|
||||
// InterestPolicy specifies that when all known subscribers have acknowledged a message it can be removed.
|
||||
InterestPolicy
|
||||
// WorkQueuePolicy specifies that when the first subscriber acknowledges the message it can be removed.
|
||||
// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
|
||||
WorkQueuePolicy
|
||||
)
|
||||
|
||||
@@ -331,6 +331,7 @@ func (mset *MsgSet) delete() error {
|
||||
for _, o := range mset.obs {
|
||||
obs = append(obs, o)
|
||||
}
|
||||
mset.obs = nil
|
||||
mset.mu.Unlock()
|
||||
c.closeConnection(ClientClosed)
|
||||
|
||||
@@ -348,6 +349,13 @@ func (mset *MsgSet) cleanName() string {
|
||||
return strings.Replace(mset.config.Name, tsep, "-", -1)
|
||||
}
|
||||
|
||||
// NumObservables reports on number of active observables for this message set.
|
||||
func (mset *MsgSet) NumObservables() int {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
return len(mset.obs)
|
||||
}
|
||||
|
||||
// Stats will return the current stats for this message set.
|
||||
func (mset *MsgSet) Stats() MsgSetStats {
|
||||
// Currently rely on store.
|
||||
@@ -355,6 +363,7 @@ func (mset *MsgSet) Stats() MsgSetStats {
|
||||
return mset.store.Stats()
|
||||
}
|
||||
|
||||
// waitForMsgs will have the message set wait for the arrival of new messages.
|
||||
func (mset *MsgSet) waitForMsgs() {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
@@ -367,3 +376,22 @@ func (mset *MsgSet) waitForMsgs() {
|
||||
mset.sg.Wait()
|
||||
mset.sgw--
|
||||
}
|
||||
|
||||
// Determines if the new proposed partition is unique amongst all observables.
|
||||
// Lock should be held.
|
||||
func (mset *MsgSet) partitionUnique(partition string) bool {
|
||||
for _, o := range mset.obs {
|
||||
if o.config.Partition == _EMPTY_ {
|
||||
return false
|
||||
}
|
||||
if subjectIsSubsetMatch(partition, o.config.Partition) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// ackMsg is called into from an observable when we have a WorkQueue retention policy.
|
||||
func (mset *MsgSet) ackMsg(seq uint64) {
|
||||
mset.store.RemoveMsg(seq)
|
||||
}
|
||||
|
||||
@@ -20,19 +20,19 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ObservableConfig struct {
|
||||
Delivery string `json:"delivery_subject"`
|
||||
Durable string `json:"durable_name,omitempty"`
|
||||
StartSeq uint64 `json:"start_seq,omitempty"`
|
||||
StartTime time.Time `json:"start_time,omitempty"`
|
||||
DeliverAll bool `json:"deliver_all,omitempty"`
|
||||
DeliverLast bool `json:"deliver_last,omitempty"`
|
||||
AckPolicy AckPolicy `json:"ack_policy"`
|
||||
Partition string `json:"partition"`
|
||||
Delivery string `json:"delivery_subject"`
|
||||
Durable string `json:"durable_name,omitempty"`
|
||||
StartSeq uint64 `json:"start_seq,omitempty"`
|
||||
StartTime time.Time `json:"start_time,omitempty"`
|
||||
DeliverAll bool `json:"deliver_all,omitempty"`
|
||||
DeliverLast bool `json:"deliver_last,omitempty"`
|
||||
AckPolicy AckPolicy `json:"ack_policy"`
|
||||
AckWait time.Duration `json:"ack_wait"`
|
||||
Partition string `json:"partition"`
|
||||
}
|
||||
|
||||
// AckPolicy determines how the observable should acknowledge delivered messages.
|
||||
@@ -49,24 +49,31 @@ const (
|
||||
|
||||
// Observable is a jetstream observable/subscriber.
|
||||
type Observable struct {
|
||||
mu sync.Mutex
|
||||
name string
|
||||
mset *MsgSet
|
||||
aseq uint64
|
||||
sseq uint64
|
||||
dseq uint64
|
||||
dsubj string
|
||||
reqSub *subscription
|
||||
ackSub *subscription
|
||||
ackReply string
|
||||
waiting []string
|
||||
config ObservableConfig
|
||||
mu sync.Mutex
|
||||
name string
|
||||
mset *MsgSet
|
||||
pseq uint64
|
||||
sseq uint64
|
||||
dseq uint64
|
||||
dsubj string
|
||||
reqSub *subscription
|
||||
ackSub *subscription
|
||||
ackReply string
|
||||
pending map[uint64]int64
|
||||
ptmr *time.Timer
|
||||
redeliver []uint64
|
||||
waiting []string
|
||||
config ObservableConfig
|
||||
}
|
||||
|
||||
// Default AckWait, only applicable on explicit ack policy observables.
|
||||
const JsAckWaitDefault = 30 * time.Second
|
||||
|
||||
func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) {
|
||||
if config == nil {
|
||||
return nil, fmt.Errorf("observable config required")
|
||||
}
|
||||
|
||||
// For now expect a literal subject if its not empty. Empty means work queue mode (pull mode).
|
||||
if config.Delivery != _EMPTY_ {
|
||||
if !subjectIsLiteral(config.Delivery) {
|
||||
@@ -75,6 +82,14 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
if mset.deliveryFormsCycle(config.Delivery) {
|
||||
return nil, fmt.Errorf("observable delivery subject forms a cycle")
|
||||
}
|
||||
} else {
|
||||
// Pull mode / work queue mode require explicit ack.
|
||||
if config.AckPolicy != AckExplicit {
|
||||
return nil, fmt.Errorf("observable in pull mode requires explicit ack policy")
|
||||
}
|
||||
if config.AckWait == time.Duration(0) {
|
||||
config.AckWait = JsAckWaitDefault
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure any partition subject is also a literal.
|
||||
@@ -105,6 +120,32 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
}
|
||||
}
|
||||
|
||||
// Hold mset lock here/
|
||||
mset.mu.Lock()
|
||||
|
||||
// Check on msgset type conflicts.
|
||||
switch mset.config.Retention {
|
||||
case WorkQueuePolicy:
|
||||
if config.Delivery != "" {
|
||||
mset.mu.Unlock()
|
||||
return nil, fmt.Errorf("delivery subject not allowed on workqueue message set")
|
||||
}
|
||||
if len(mset.obs) > 0 {
|
||||
if config.Partition == _EMPTY_ {
|
||||
mset.mu.Unlock()
|
||||
return nil, fmt.Errorf("multiple non-partioned observables not allowed on workqueue message set")
|
||||
} else if !mset.partitionUnique(config.Partition) {
|
||||
// We have a partition but it is not unique amongst the others.
|
||||
mset.mu.Unlock()
|
||||
return nil, fmt.Errorf("partioned observable not unique on workqueue message set")
|
||||
}
|
||||
}
|
||||
if !config.DeliverAll {
|
||||
mset.mu.Unlock()
|
||||
return nil, fmt.Errorf("observable must be deliver all on workqueue message set")
|
||||
}
|
||||
}
|
||||
|
||||
// Set name, which will be durable name if set, otherwise we create one at random.
|
||||
o := &Observable{mset: mset, config: *config, dsubj: config.Delivery}
|
||||
if isDurableObservable(config) {
|
||||
@@ -117,7 +158,6 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
o.selectStartingSeqNo()
|
||||
|
||||
// Now register with mset and create ack subscription.
|
||||
mset.mu.Lock()
|
||||
c := mset.client
|
||||
if c == nil {
|
||||
mset.mu.Unlock()
|
||||
@@ -163,20 +203,40 @@ func (o *Observable) msgSet() *MsgSet {
|
||||
}
|
||||
|
||||
func (o *Observable) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
// TODO(dlc) process the ack.
|
||||
if len(msg) > 1 {
|
||||
switch {
|
||||
case bytes.Equal(msg, AckNext):
|
||||
o.processNextMsgReq(nil, nil, subject, reply, nil)
|
||||
case bytes.Equal(msg, AckNak):
|
||||
if o.isPushMode() {
|
||||
// Reset our observable to this sequence number.
|
||||
o.resetToSeq(o.SeqFromReply(subject))
|
||||
}
|
||||
seq := o.SeqFromReply(subject)
|
||||
switch {
|
||||
case len(msg) == 0, bytes.Equal(msg, AckAck):
|
||||
o.ackMsg(seq)
|
||||
case bytes.Equal(msg, AckNext):
|
||||
o.ackMsg(seq)
|
||||
o.processNextMsgReq(nil, nil, subject, reply, nil)
|
||||
case bytes.Equal(msg, AckNak):
|
||||
if o.isPushMode() {
|
||||
// Reset our observable to this sequence number.
|
||||
o.resetToSeq(seq)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process an ack for a message.
|
||||
func (o *Observable) ackMsg(seq uint64) {
|
||||
o.mu.Lock()
|
||||
switch o.config.AckPolicy {
|
||||
case AckNone, AckAll:
|
||||
o.pseq = seq
|
||||
case AckExplicit:
|
||||
delete(o.pending, seq)
|
||||
if seq == o.pseq+1 {
|
||||
o.pseq++
|
||||
}
|
||||
}
|
||||
mset := o.mset
|
||||
o.mu.Unlock()
|
||||
if mset != nil && mset.config.Retention == WorkQueuePolicy {
|
||||
mset.ackMsg(seq)
|
||||
}
|
||||
}
|
||||
|
||||
// resetToSeq is used when we receive a NAK to reset a push based observable. e.g. a replay.
|
||||
func (o *Observable) resetToSeq(seq uint64) {
|
||||
o.mu.Lock()
|
||||
@@ -208,8 +268,21 @@ func (o *Observable) processNextMsgReq(_ *subscription, _ *client, _, reply stri
|
||||
batchSize := batchSizeFromMsg(msg)
|
||||
|
||||
o.mu.Lock()
|
||||
mset := o.mset
|
||||
if mset == nil {
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
for i := 0; i < batchSize; i++ {
|
||||
if subj, msg, err := o.getNextMsg(); err == nil {
|
||||
if len(o.redeliver) > 0 {
|
||||
seq := o.redeliver[0]
|
||||
o.redeliver = append(o.redeliver[:0], o.redeliver[1:]...)
|
||||
subj, msg, _, err := mset.store.Lookup(seq)
|
||||
if err == ErrStoreMsgNotFound {
|
||||
continue
|
||||
}
|
||||
o.reDeliverMsgRequest(mset, reply, subj, msg, seq)
|
||||
} else if subj, msg, err := o.getNextMsg(); err == nil {
|
||||
o.deliverMsgRequest(o.mset, reply, subj, msg, o.dseq)
|
||||
o.incSeqs()
|
||||
} else {
|
||||
@@ -252,7 +325,16 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
|
||||
for {
|
||||
o.mu.Lock()
|
||||
seq := o.sseq
|
||||
var seq uint64
|
||||
var redelivery bool
|
||||
|
||||
if len(o.redeliver) > 0 {
|
||||
seq = o.redeliver[0]
|
||||
redelivery = true
|
||||
} else {
|
||||
seq = o.sseq
|
||||
}
|
||||
|
||||
subj, msg, _, err := mset.store.Lookup(seq)
|
||||
|
||||
// On error either break or return.
|
||||
@@ -262,7 +344,12 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
s.Warnf("Jetstream internal storage error on lookup: %v", err)
|
||||
return
|
||||
}
|
||||
break
|
||||
if !redelivery {
|
||||
break
|
||||
} else {
|
||||
// This was not found so can't be redelivered.
|
||||
o.redeliver = append(o.redeliver[:0], o.redeliver[1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
// We have the message. We need to check if we are in push mode or pull mode.
|
||||
@@ -274,13 +361,23 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
}
|
||||
|
||||
if o.isPushMode() {
|
||||
o.deliverMsg(mset, subj, msg, o.dseq)
|
||||
o.incSeqs()
|
||||
if !redelivery {
|
||||
o.deliverMsg(mset, subj, msg, o.dseq)
|
||||
o.incSeqs()
|
||||
} else {
|
||||
o.redeliver = append(o.redeliver[:0], o.redeliver[1:]...)
|
||||
o.deliverMsg(mset, subj, msg, seq)
|
||||
}
|
||||
} else if len(o.waiting) > 0 {
|
||||
reply := o.waiting[0]
|
||||
o.waiting = append(o.waiting[:0], o.waiting[1:]...)
|
||||
o.deliverMsgRequest(mset, reply, subj, msg, o.dseq)
|
||||
o.incSeqs()
|
||||
if !redelivery {
|
||||
o.deliverMsgRequest(mset, reply, subj, msg, o.dseq)
|
||||
o.incSeqs()
|
||||
} else {
|
||||
o.redeliver = append(o.redeliver[:0], o.redeliver[1:]...)
|
||||
o.reDeliverMsgRequest(mset, reply, subj, msg, seq)
|
||||
}
|
||||
} else {
|
||||
// No one waiting, let's break out and wait.
|
||||
o.mu.Unlock()
|
||||
@@ -308,6 +405,62 @@ func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint6
|
||||
// Deliver a msg to the msg request subject.
|
||||
func (o *Observable) deliverMsgRequest(mset *MsgSet, dsubj, subj string, msg []byte, seq uint64) {
|
||||
mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
|
||||
if o.config.AckPolicy == AckExplicit {
|
||||
o.trackPending(seq)
|
||||
}
|
||||
}
|
||||
|
||||
// Redeliver a message.
|
||||
func (o *Observable) reDeliverMsgRequest(mset *MsgSet, dsubj, subj string, msg []byte, seq uint64) {
|
||||
mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
|
||||
}
|
||||
|
||||
// Tracks our outstanding pending acks. Only applicable to AckExplicit mode.
|
||||
// Lock should be held.
|
||||
func (o *Observable) trackPending(seq uint64) {
|
||||
if o.pending == nil {
|
||||
o.pending = make(map[uint64]int64)
|
||||
}
|
||||
if o.ptmr == nil {
|
||||
o.ptmr = time.AfterFunc(o.config.AckWait, o.checkPending)
|
||||
}
|
||||
o.pending[seq] = time.Now().UnixNano()
|
||||
}
|
||||
|
||||
func (o *Observable) checkPending() {
|
||||
now := time.Now().UnixNano()
|
||||
shouldSignal := false
|
||||
|
||||
o.mu.Lock()
|
||||
mset := o.mset
|
||||
if mset == nil {
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
aw := int64(o.config.AckWait)
|
||||
for seq := o.pseq; seq < o.dseq; seq++ {
|
||||
if ts, ok := o.pending[seq]; ok {
|
||||
if now-ts > aw {
|
||||
// If we have waiting, go ahead and deliver here.
|
||||
// FIXME(dlc) - Not sure this is correct.
|
||||
o.redeliver = append(o.redeliver, seq)
|
||||
shouldSignal = true
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(o.pending) > 0 {
|
||||
o.ptmr.Reset(o.config.AckWait)
|
||||
} else {
|
||||
o.ptmr.Stop()
|
||||
o.ptmr = nil
|
||||
}
|
||||
o.mu.Unlock()
|
||||
|
||||
if shouldSignal {
|
||||
mset.signalObservers()
|
||||
}
|
||||
}
|
||||
|
||||
// SeqFromReply will extract a sequence number from a reply ack subject.
|
||||
@@ -321,7 +474,10 @@ func (o *Observable) SeqFromReply(reply string) (seq uint64) {
|
||||
|
||||
// NextSeq returns the next delivered sequence number for this observable.
|
||||
func (o *Observable) NextSeq() uint64 {
|
||||
return atomic.LoadUint64(&o.dseq)
|
||||
o.mu.Lock()
|
||||
dseq := o.dseq
|
||||
o.mu.Unlock()
|
||||
return dseq
|
||||
}
|
||||
|
||||
// Will select the starting sequence.
|
||||
@@ -352,8 +508,10 @@ func (o *Observable) selectStartingSeqNo() {
|
||||
} else if o.sseq > stats.LastSeq {
|
||||
o.sseq = stats.LastSeq + 1
|
||||
}
|
||||
// Set deliveryt sequence to be the same to start.
|
||||
// Set delivery sequence to be the same to start.
|
||||
o.dseq = o.sseq
|
||||
// Set pending sequence to delivery - 1
|
||||
o.pseq = o.dseq - 1
|
||||
}
|
||||
|
||||
// Test whether a config represents a durable subscriber.
|
||||
@@ -404,6 +562,10 @@ func (o *Observable) Delete() error {
|
||||
reqSub := o.reqSub
|
||||
o.ackSub = nil
|
||||
o.reqSub = nil
|
||||
if o.ptmr != nil {
|
||||
o.ptmr.Stop()
|
||||
o.ptmr = nil
|
||||
}
|
||||
o.mu.Unlock()
|
||||
|
||||
if mset == nil {
|
||||
|
||||
@@ -31,6 +31,7 @@ const (
|
||||
type MsgSetStore interface {
|
||||
StoreMsg(subj string, msg []byte) (uint64, error)
|
||||
Lookup(seq uint64) (subj string, msg []byte, ts int64, err error)
|
||||
RemoveMsg(seq uint64) bool
|
||||
GetSeqFromTime(t time.Time) uint64
|
||||
Stats() MsgSetStats
|
||||
}
|
||||
|
||||
@@ -228,6 +228,12 @@ func TestJetStreamCreateObservable(t *testing.T) {
|
||||
t.Fatalf("Expected an error for no config")
|
||||
}
|
||||
|
||||
// No deliver subject, meaning its in pull mode, work queue mode means it is required to
|
||||
// do explicit ack.
|
||||
if _, err := mset.AddObservable(&server.ObservableConfig{}); err == nil {
|
||||
t.Fatalf("Expected an error on work queue / pull mode without explicit ack mode")
|
||||
}
|
||||
|
||||
// Check for delivery subject errors.
|
||||
|
||||
// Literal delivery subject required.
|
||||
@@ -420,6 +426,10 @@ func TestJetStreamBasicDelivery(t *testing.T) {
|
||||
checkMsgs(101)
|
||||
}
|
||||
|
||||
func workerModeConfig(name string) *server.ObservableConfig {
|
||||
return &server.ObservableConfig{Durable: name, DeliverAll: true, AckPolicy: server.AckExplicit}
|
||||
}
|
||||
|
||||
func TestJetStreamBasicWorkQueue(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
@@ -433,7 +443,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) {
|
||||
|
||||
// Create basic work queue mode observable.
|
||||
oname := "WQ"
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
|
||||
o, err := mset.AddObservable(workerModeConfig(oname))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
@@ -602,7 +612,7 @@ func TestJetStreamWorkQueuePartitioning(t *testing.T) {
|
||||
}
|
||||
|
||||
oname := "WQ"
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, Partition: subjA, DeliverAll: true})
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, Partition: subjA, DeliverAll: true, AckPolicy: server.AckExplicit})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
@@ -649,7 +659,7 @@ func TestJetStreamWorkQueueAckAndNext(t *testing.T) {
|
||||
|
||||
// Create basic work queue mode observable.
|
||||
oname := "WQ"
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
|
||||
o, err := mset.AddObservable(workerModeConfig(oname))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
@@ -707,7 +717,7 @@ func TestJetStreamWorkQueueRequestBatch(t *testing.T) {
|
||||
|
||||
// Create basic work queue mode observable.
|
||||
oname := "WQ"
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
|
||||
o, err := mset.AddObservable(workerModeConfig(oname))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
@@ -841,3 +851,170 @@ func TestJetStreamBasicPushNak(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamWorkQueueMsgSet(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
mname := "MY_WORK_QUEUE.*"
|
||||
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Retention: server.WorkQueuePolicy})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding message set: %v", err)
|
||||
}
|
||||
defer s.JetStreamDeleteMsgSet(mset)
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
sub, _ := nc.SubscribeSync(nats.NewInbox())
|
||||
defer sub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
// This type of message set has restrictions which we will test here.
|
||||
|
||||
// Push based not allowed.
|
||||
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: sub.Subject}); err == nil {
|
||||
t.Fatalf("Expected an error on delivery subject")
|
||||
}
|
||||
|
||||
// DeliverAll is only start mode allowed.
|
||||
if _, err := mset.AddObservable(&server.ObservableConfig{DeliverLast: true}); err == nil {
|
||||
t.Fatalf("Expected an error with anything but DeliverAll")
|
||||
}
|
||||
|
||||
// We will create a non-partitioned observable. This should succeed.
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true, AckPolicy: server.AckExplicit})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
// Now if we create another this should fail, only can have one non-partitioned.
|
||||
if _, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true}); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for second observable for a workqueue")
|
||||
}
|
||||
|
||||
o.Delete()
|
||||
|
||||
if numo := mset.NumObservables(); numo != 0 {
|
||||
t.Fatalf("Expected to have zero observables, got %d", numo)
|
||||
}
|
||||
|
||||
// Now add in an observable that has a partition.
|
||||
|
||||
pConfig := func(pname string) *server.ObservableConfig {
|
||||
return &server.ObservableConfig{DeliverAll: true, Partition: pname, AckPolicy: server.AckExplicit}
|
||||
}
|
||||
o, err = mset.AddObservable(pConfig("MY_WORK_QUEUE.A"))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
// Now creating another with separate partition should work.
|
||||
o2, err := mset.AddObservable(pConfig("MY_WORK_QUEUE.B"))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
defer o2.Delete()
|
||||
|
||||
// Anything that would overlap should fail though.
|
||||
if _, err := mset.AddObservable(pConfig(">")); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for partitioned observable for a workqueue")
|
||||
}
|
||||
if _, err := mset.AddObservable(pConfig("MY_WORK_QUEUE.A")); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for partitioned observable for a workqueue")
|
||||
}
|
||||
if _, err := mset.AddObservable(pConfig("MY_WORK_QUEUE.A")); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for partitioned observable for a workqueue")
|
||||
}
|
||||
|
||||
o3, err := mset.AddObservable(pConfig("MY_WORK_QUEUE.C"))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
defer o3.Delete()
|
||||
}
|
||||
|
||||
func TestJetStreamWorkQueueAckWaitRedelivery(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
mname := "MY_WQ"
|
||||
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Retention: server.WorkQueuePolicy})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding message set: %v", err)
|
||||
}
|
||||
defer s.JetStreamDeleteMsgSet(mset)
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
// Now load up some messages.
|
||||
toSend := 100
|
||||
for i := 0; i < toSend; i++ {
|
||||
resp, _ := nc.Request(mname, []byte("Hello World!"), 50*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
}
|
||||
stats := mset.Stats()
|
||||
if stats.Msgs != uint64(toSend) {
|
||||
t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs)
|
||||
}
|
||||
|
||||
ackWait := 50 * time.Millisecond
|
||||
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true, AckPolicy: server.AckExplicit, AckWait: ackWait})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
sub, _ := nc.SubscribeSync(nats.NewInbox())
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
reqNextMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, o.Name())
|
||||
|
||||
// Consume all the messages. But do not ack.
|
||||
for i := 0; i < toSend; i++ {
|
||||
nc.PublishRequest(reqNextMsgSubj, sub.Subject, nil)
|
||||
if _, err := sub.NextMsg(time.Second); err != nil {
|
||||
t.Fatalf("Unexpected error waiting for messages: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 0 {
|
||||
t.Fatalf("Did not consume all messages, still have %d", nmsgs)
|
||||
}
|
||||
|
||||
// All messages should still be there.
|
||||
stats = mset.Stats()
|
||||
if int(stats.Msgs) != toSend {
|
||||
t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs)
|
||||
}
|
||||
|
||||
// Now consume and ack.
|
||||
for i := 1; i <= toSend; i++ {
|
||||
nc.PublishRequest(reqNextMsgSubj, sub.Subject, nil)
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error waiting for messages: %v", err)
|
||||
}
|
||||
if seq := o.SeqFromReply(m.Reply); seq != uint64(i) {
|
||||
t.Fatalf("Expected sequence of %d , got %d", i, seq)
|
||||
}
|
||||
// Ack the message here.
|
||||
m.Respond(nil)
|
||||
}
|
||||
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 0 {
|
||||
t.Fatalf("Did not consume all messages, still have %d", nmsgs)
|
||||
}
|
||||
|
||||
// Flush acks
|
||||
nc.Flush()
|
||||
|
||||
// Now check the mset as well, since we have a WorkQueue retention policy this should be empty.
|
||||
if stats := mset.Stats(); stats.Msgs != 0 {
|
||||
t.Fatalf("Expected no messages, got %d", stats.Msgs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -549,7 +549,7 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
|
||||
|
||||
// Create basic work queue mode observable.
|
||||
oname := "WQ"
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true, AckPolicy: server.AckExplicit})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user