mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
@@ -30,6 +30,7 @@ type ObservableConfig struct {
|
||||
DeliverAll bool `json:"deliver_all,omitempty"`
|
||||
DeliverLast bool `json:"deliver_last,omitempty"`
|
||||
AckPolicy AckPolicy `json:"ack_policy"`
|
||||
Partition string `json:"partition"`
|
||||
}
|
||||
|
||||
// AckPolicy determines how the observable shoulc acknowledge delivered messages.
|
||||
@@ -50,6 +51,7 @@ type Observable struct {
|
||||
name string
|
||||
mset *MsgSet
|
||||
seq uint64
|
||||
dseq uint64
|
||||
dsubj string
|
||||
reqSub *subscription
|
||||
ackSub *subscription
|
||||
@@ -72,6 +74,17 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure any partition subject is also a literal.
|
||||
if config.Partition != "" {
|
||||
if !subjectIsLiteral(config.Partition) {
|
||||
return nil, fmt.Errorf("observable partition subject has wildcards")
|
||||
}
|
||||
// Make sure this is a valid partition of the interest subjects.
|
||||
if !mset.validPartition(config.Partition) {
|
||||
return nil, fmt.Errorf("observable partition not a valid subset of the interest subjects")
|
||||
}
|
||||
}
|
||||
|
||||
// Check on start position conflicts.
|
||||
noTime := time.Time{}
|
||||
if config.StartSeq > 0 && (config.StartTime != noTime || config.DeliverAll || config.DeliverLast) {
|
||||
@@ -166,14 +179,24 @@ func (o *Observable) processObservableMsgRequest(_ *subscription, _ *client, sub
|
||||
seq = o.seq
|
||||
}
|
||||
// FIXME(dlc) - do actual sequence numbers.
|
||||
subj, msg, _, err := mset.store.Lookup(seq)
|
||||
if err == nil {
|
||||
o.deliverMsgRequest(mset, reply, subj, msg, seq)
|
||||
if seq == o.seq {
|
||||
o.seq++
|
||||
// We do loop here in case we are partitioned.
|
||||
for {
|
||||
subj, msg, _, err := mset.store.Lookup(seq)
|
||||
if err == nil {
|
||||
if o.config.Partition != "" && subj != o.config.Partition {
|
||||
o.seq++
|
||||
seq = o.seq
|
||||
continue
|
||||
}
|
||||
o.deliverMsgRequest(mset, reply, subj, msg, o.dseq)
|
||||
if wantNextMsg {
|
||||
o.incSeqs()
|
||||
}
|
||||
break
|
||||
} else if wantNextMsg {
|
||||
o.waiting = append(o.waiting, reply)
|
||||
break
|
||||
}
|
||||
} else if wantNextMsg {
|
||||
o.waiting = append(o.waiting, reply)
|
||||
}
|
||||
o.mu.Unlock()
|
||||
}
|
||||
@@ -186,12 +209,13 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
return
|
||||
}
|
||||
|
||||
// Deliver all the msgs we have now, once done or on a condition, we wait.
|
||||
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
|
||||
for {
|
||||
o.mu.Lock()
|
||||
seq := o.seq
|
||||
subj, msg, _, err := mset.store.Lookup(seq)
|
||||
|
||||
// On error either break or return.
|
||||
if err != nil {
|
||||
o.mu.Unlock()
|
||||
if err != ErrStoreMsgNotFound {
|
||||
@@ -202,14 +226,21 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
}
|
||||
|
||||
// We have the message. We need to check if we are in push mode or pull mode.
|
||||
if o.config.Delivery != "" {
|
||||
o.deliverMsg(mset, subj, msg, seq)
|
||||
// Also need to check if we have a partition filter.
|
||||
if o.config.Partition != "" && subj != o.config.Partition {
|
||||
o.seq++
|
||||
o.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if o.config.Delivery != "" {
|
||||
o.deliverMsg(mset, subj, msg, o.dseq)
|
||||
o.incSeqs()
|
||||
} else if len(o.waiting) > 0 {
|
||||
reply := o.waiting[0]
|
||||
o.waiting = append(o.waiting[:0], o.waiting[1:]...)
|
||||
o.deliverMsgRequest(mset, reply, subj, msg, seq)
|
||||
o.seq++
|
||||
o.deliverMsgRequest(mset, reply, subj, msg, o.dseq)
|
||||
o.incSeqs()
|
||||
} else {
|
||||
// No one waiting, let's break out and wait.
|
||||
o.mu.Unlock()
|
||||
@@ -222,6 +253,13 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
}
|
||||
}
|
||||
|
||||
// Advance the sequence numbers.
|
||||
// Lock should be held.
|
||||
func (o *Observable) incSeqs() {
|
||||
o.seq++
|
||||
o.dseq++
|
||||
}
|
||||
|
||||
// Deliver a msg to the observable push delivery subject.
|
||||
func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint64) {
|
||||
mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
|
||||
@@ -274,6 +312,8 @@ func (o *Observable) selectStartingSeqNo() {
|
||||
} else if o.seq > stats.LastSeq {
|
||||
o.seq = stats.LastSeq + 1
|
||||
}
|
||||
// Set deliveryt sequence to be the same to start.
|
||||
o.dseq = o.seq
|
||||
}
|
||||
|
||||
// Test whether a config represents a durable subscriber.
|
||||
@@ -346,6 +386,8 @@ func (mset *MsgSet) noInterest(delivery string) bool {
|
||||
return len(r.psubs)+len(r.qsubs) == 0
|
||||
}
|
||||
|
||||
// Check that we do not form a cycle by delivering to a delivery subject
|
||||
// that is part of the interest group.
|
||||
func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
@@ -357,3 +399,8 @@ func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// This is same as check for delivery cycle.
|
||||
func (mset *MsgSet) validPartition(partitionSubject string) bool {
|
||||
return mset.deliveryFormsCycle(partitionSubject)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user