Auto-expiration of ephemeral push based observables

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-12 12:10:36 -07:00
parent d02b2a3d9c
commit 0fb7ee32bc
5 changed files with 400 additions and 55 deletions

View File

@@ -2765,7 +2765,7 @@ func (c *client) processInboundMsg(msg []byte) {
}
// processInboundClientMsg is called to process an inbound msg from a client.
func (c *client) processInboundClientMsg(msg []byte) {
func (c *client) processInboundClientMsg(msg []byte) bool {
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
c.in.msgs++
@@ -2780,13 +2780,13 @@ func (c *client) processInboundClientMsg(msg []byte) {
// Check pub permissions
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) {
c.pubPermissionViolation(c.pa.subject)
return
return false
}
// Now check for reserved replies. These are used for service imports.
if len(c.pa.reply) > 0 && isReservedReply(c.pa.reply) {
c.replySubjectViolation(c.pa.reply)
return
return false
}
if c.opts.Verbose {
@@ -2795,7 +2795,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
// Mostly under testing scenarios.
if c.srv == nil || c.acc == nil {
return
return false
}
// Check if this client's gateway replies map is not empty
@@ -2803,9 +2803,12 @@ func (c *client) processInboundClientMsg(msg []byte) {
return
}
// Indication if we attempted to deliver the message to anyone.
var didDeliver bool
// Check to see if we need to map/route to another account.
if c.acc.imports.services != nil {
c.checkForImportServices(c.acc, msg)
didDeliver = c.checkForImportServices(c.acc, msg)
}
// If we have an exported service and we are doing remote tracking, check this subject
@@ -2867,6 +2870,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
// Check for no interest, short circuit if so.
// This is the fanout scale.
if len(r.psubs)+len(r.qsubs) > 0 {
didDeliver = true
flag := pmrNoFlag
// If there are matching queue subs and we are in gateway mode,
// we need to keep track of the queue names the messages are
@@ -2888,8 +2892,10 @@ func (c *client) processInboundClientMsg(msg []byte) {
// Now deal with gateways
if c.srv.gateway.enabled {
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames)
didDeliver = c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames) || didDeliver
}
return didDeliver
}
// This is invoked knowing that this client has some GW replies
@@ -2947,9 +2953,9 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
// This checks and process import services by doing the mapping and sending the
// message onward if applicable.
func (c *client) checkForImportServices(acc *Account, msg []byte) {
func (c *client) checkForImportServices(acc *Account, msg []byte) bool {
if acc == nil || acc.imports.services == nil {
return
return false
}
acc.mu.RLock()
@@ -2957,6 +2963,8 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
invalid := si != nil && si.invalid
acc.mu.RUnlock()
var didDeliver bool
// Get the results from the other account for the mapped "to" subject.
// If we have been marked invalid simply return here.
if si != nil && !invalid && si.acc != nil && si.acc.sl != nil {
@@ -2982,23 +2990,31 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
// FIXME(dlc) - Do L1 cache trick from above.
rr := si.acc.sl.Match(si.to)
// Check to see if we have no results and this is an internal serviceImport. If so we
// need to clean that up.
// Check to see if we have no results and this is an internal serviceImport.
// If so we need to clean that up.
if len(rr.psubs)+len(rr.qsubs) == 0 && si.internal {
// We may also have a response entry, so go through that way.
si.acc.checkForRespEntry(si.to)
}
flags := pmrNoFlag
// This gives us a notion that we have interest in this message.
// We need to check if this is false but we have
didDeliver = len(rr.psubs)+len(rr.qsubs) > 0
// If we are a route or gateway or leafnode and this message is flipped to a queue subscriber we
// need to handle that since the processMsgResults will want a queue filter.
if c.kind == GATEWAY || c.kind == ROUTER || c.kind == LEAF {
flags |= pmrIgnoreEmptyQueueFilter
}
if c.srv.gateway.enabled {
// If this is not a gateway connection but gateway is enabled,
// try to send this converted message to all gateways.
if c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) {
flags |= pmrCollectQueueNames
queues := c.processMsgResults(si.acc, rr, msg, []byte(si.to), nrr, flags)
c.sendMsgToGateways(si.acc, msg, []byte(si.to), nrr, queues)
didDeliver = c.sendMsgToGateways(si.acc, msg, []byte(si.to), nrr, queues) || didDeliver
} else {
c.processMsgResults(si.acc, rr, msg, []byte(si.to), nrr, flags)
}
@@ -3016,6 +3032,8 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
acc.removeServiceImport(si.from)
}
}
return didDeliver
}
func (c *client) addSubToRouteTargets(sub *subscription) {

View File

@@ -2344,7 +2344,7 @@ var subPool = &sync.Pool{
// it is known that this gateway has no interest in the account or
// subject, etc..
// <Invoked from any client connection's readLoop>
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) {
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool {
gwsa := [16]*client{}
gws := gwsa[:0]
// This is in fast path, so avoid calling function when possible.
@@ -2359,7 +2359,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
thisClusterOldReplyPrefix := gw.oldReplyPfx
gw.RUnlock()
if len(gws) == 0 {
return
return false
}
var (
subj = string(subject)
@@ -2370,6 +2370,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
mreply []byte
dstHash []byte
checkReply = len(reply) > 0
didDeliver bool
)
// Get a subscription from the pool
@@ -2479,12 +2480,15 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
// So set/reset important fields.
sub.nm, sub.max = 0, 0
sub.client = gwc
sub.subject = subject
c.deliverMsg(sub, subject, mh, msg, false)
sub.subject = c.pa.subject
c.deliverMsg(sub, c.pa.subject, mh, msg, false)
didDeliver = true
}
// Done with subscription, put back to pool. We don't need
// to reset content since we explicitly set when using it.
subPool.Put(sub)
return didDeliver
}
// Possibly sends an A- to the remote gateway `c`.

View File

@@ -237,7 +237,7 @@ func (mset *MsgSet) processInboundJetStreamMsg(_ *subscription, _ *client, subje
}
// Send Ack here.
if doAck && len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, []byte(JsOK)}
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, []byte(JsOK), nil, 0}
}
mset.signalObservers()
@@ -251,11 +251,14 @@ func (mset *MsgSet) signalObservers() {
mset.mu.Unlock()
}
// Internal message for use by jetstream subsystem.
type jsPubMsg struct {
subj string
dsubj string
reply string
msg []byte
o *Observable
seq uint64
}
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
@@ -308,9 +311,14 @@ func (mset *MsgSet) internalSendLoop() {
msg := append(pm.msg, _CRLF_...)
// FIXME(dlc) - capture if this sent to anyone and notify
// observer if its now zero, meaning no interest.
c.processInboundClientMsg(msg)
didDeliver := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if pm.o != nil && !didDeliver {
pm.o.didNotDeliver(pm.seq)
}
case <-s.quitCh:
return
}

View File

@@ -61,25 +61,36 @@ var (
// Observable is a jetstream observable/subscriber.
type Observable struct {
mu sync.Mutex
name string
mset *MsgSet
sseq uint64
dseq uint64
aseq uint64
dsubj string
reqSub *subscription
ackSub *subscription
ackReply string
pending map[uint64]int64
ptmr *time.Timer
redeliver []uint64
waiting []string
config ObservableConfig
mu sync.Mutex
name string
mset *MsgSet
sseq uint64
dseq uint64
aseq uint64
dsubj string
reqSub *subscription
ackSub *subscription
ackReply string
pending map[uint64]int64
ptmr *time.Timer
redeliver []uint64
waiting []string
config ObservableConfig
active bool
atmr *time.Timer
nointerest int
athresh int
achk time.Duration
}
// Default AckWait, only applicable on explicit ack policy observables.
const JsAckWaitDefault = 30 * time.Second
const (
// Default AckWait, only applicable on explicit ack policy observables.
JsAckWaitDefault = 30 * time.Second
// JsActiveCheckInterval is default hb interval for push based observables.
JsActiveCheckIntervalDefault = time.Second
// JsNotActiveThreshold is number of times we detect no interest to close an observable if ephemeral.
JsNotActiveThresholdDefault = 2
)
func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) {
if config == nil {
@@ -159,7 +170,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
}
// Set name, which will be durable name if set, otherwise we create one at random.
o := &Observable{mset: mset, config: *config, dsubj: config.Delivery}
o := &Observable{mset: mset, config: *config, dsubj: config.Delivery, active: true}
if isDurableObservable(config) {
o.name = config.Durable
} else {
@@ -191,12 +202,14 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
} else {
o.ackSub = sub
}
// Setup the internal sub for individual message requests.
reqSubj := fmt.Sprintf("%s.%s.%s", JsReqPre, cn, o.name)
if sub, err := mset.subscribeInternal(reqSubj, o.processNextMsgReq); err != nil {
return nil, err
} else {
o.reqSub = sub
// Setup the internal sub for next message requests.
if !o.isPushMode() {
reqSubj := fmt.Sprintf("%s.%s.%s", JsReqPre, cn, o.name)
if sub, err := mset.subscribeInternal(reqSubj, o.processNextMsgReq); err != nil {
return nil, err
} else {
o.reqSub = sub
}
}
mset.obs[o.name] = o
mset.mu.Unlock()
@@ -204,6 +217,15 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
// Now start up Go routine to deliver msgs.
go o.loopAndDeliverMsgs(s, a)
// If push mode, start up active hb timer.
if o.isPushMode() {
o.mu.Lock()
o.athresh = JsNotActiveThresholdDefault
o.achk = JsActiveCheckIntervalDefault
o.atmr = time.AfterFunc(o.achk, o.checkActive)
o.mu.Unlock()
}
return o, nil
}
@@ -370,6 +392,12 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
var seq uint64
var redelivery bool
// If we are in push mode and not active let's stop sending.
if o.isPushMode() && !o.active {
o.mu.Unlock()
break
}
if len(o.redeliver) > 0 {
seq = o.redeliver[0]
redelivery = true
@@ -441,12 +469,12 @@ func (o *Observable) incSeqs() {
// Deliver a msg to the observable push delivery subject.
func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint64) {
mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg, o, seq}
}
// Deliver a msg to the msg request subject.
func (o *Observable) deliverMsgRequest(mset *MsgSet, dsubj, subj string, msg []byte, seq uint64) {
mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg, o, seq}
if o.config.AckPolicy == AckExplicit {
o.trackPending(seq)
}
@@ -454,7 +482,7 @@ func (o *Observable) deliverMsgRequest(mset *MsgSet, dsubj, subj string, msg []b
// Redeliver a message.
func (o *Observable) reDeliverMsgRequest(mset *MsgSet, dsubj, subj string, msg []byte, seq uint64) {
mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg, o, seq}
}
// Tracks our outstanding pending acks. Only applicable to AckExplicit mode.
@@ -469,6 +497,56 @@ func (o *Observable) trackPending(seq uint64) {
o.pending[seq] = time.Now().UnixNano()
}
func (o *Observable) checkActive() {
o.mu.Lock()
mset := o.mset
if mset == nil || !o.isPushMode() {
o.mu.Unlock()
return
}
var shouldDelete, shouldSignal bool
if o.mset.noInterest(o.config.Delivery) {
o.active = false
o.nointerest++
if o.config.Durable == "" && o.nointerest >= o.athresh {
shouldDelete = true
}
} else {
// reset
shouldSignal = !o.active
o.active = true
o.nointerest = 0
}
// Reset our timer here.
o.atmr.Reset(o.achk)
o.mu.Unlock()
if shouldSignal {
mset.signalObservers()
}
// This is for push based ephemerals.
if shouldDelete {
o.Delete()
}
}
// didNotDeliver is called when a delivery for an observable message failed.
// Depending on our state, we will process the failure.
func (o *Observable) didNotDeliver(seq uint64) {
o.mu.Lock()
if o.mset == nil {
o.mu.Unlock()
return
}
if o.config.Delivery != _EMPTY_ {
o.active = false
o.nointerest++
}
// FIXME(dlc) - Other scenarios. Pull mode, etc.
o.mu.Unlock()
}
// This checks if we already have this sequence queued for redelivery.
// FIXME(dlc) - This is O(n) but should be fast with small redeliver size.
// Lock should be held.
@@ -602,7 +680,18 @@ func (mset *MsgSet) DeleteObservable(o *Observable) error {
// Active indicates if this observable is still active.
func (o *Observable) Active() bool {
return o.msgSet() != nil
o.mu.Lock()
active := o.active && o.mset != nil
o.mu.Unlock()
return active
}
func stopAndClearTimer(tp **time.Timer) {
if *tp == nil {
return
}
(*tp).Stop()
*tp = nil
}
// Delete will delete the observable for the associated message set.
@@ -611,14 +700,13 @@ func (o *Observable) Delete() error {
// TODO(dlc) - Do cleanup here.
mset := o.mset
o.mset = nil
o.active = false
ackSub := o.ackSub
reqSub := o.reqSub
o.ackSub = nil
o.reqSub = nil
if o.ptmr != nil {
o.ptmr.Stop()
o.ptmr = nil
}
stopAndClearTimer(&o.ptmr)
stopAndClearTimer(&o.atmr)
o.mu.Unlock()
if mset == nil {
@@ -642,21 +730,49 @@ func (o *Observable) Delete() error {
// Checks to see if there is registered interest in the delivery subject.
// Note that since we require delivery to be a literal this is just like
// a publish match.
//
// TODO(dlc) - if gateways are enabled we need to do some more digging for the
// real answer.
func (mset *MsgSet) noInterest(delivery string) bool {
var c *client
var acc *Account
mset.mu.Lock()
if mset.client != nil {
acc = mset.client.acc
c = mset.client
acc = c.acc
}
mset.mu.Unlock()
if acc == nil {
return true
}
r := acc.sl.Match(delivery)
return len(r.psubs)+len(r.qsubs) == 0
noInterest := len(r.psubs)+len(r.qsubs) == 0
// Check for service imports here.
if acc.imports.services != nil {
acc.mu.RLock()
si := acc.imports.services[delivery]
invalid := si != nil && si.invalid
acc.mu.RUnlock()
if si != nil && !invalid && si.acc != nil && si.acc.sl != nil {
rr := si.acc.sl.Match(si.to)
noInterest = len(rr.psubs)+len(rr.qsubs) == 0 || noInterest
}
}
// Process GWs here. This is not going to exact since it could be that the GW does not
// know, that is ok for here.
// TODO(@@IK) to check.
if c != nil && c.srv != nil && c.srv.gateway.enabled {
gw := c.srv.gateway
gw.RLock()
for _, gwc := range gw.outo {
psi, qr := gwc.gatewayInterest(acc.Name, delivery)
if psi || qr != nil {
noInterest = false
break
}
}
gw.RUnlock()
}
return noInterest
}
// Check that we do not form a cycle by delivering to a delivery subject
@@ -677,3 +793,18 @@ func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool {
func (mset *MsgSet) validPartition(partitionSubject string) bool {
return mset.deliveryFormsCycle(partitionSubject)
}
// SetActiveCheckParams allows a server to change the active check parameters
// for push based observables.
func (o *Observable) SetActiveCheckParams(achk time.Duration, thresh int) error {
o.mu.Lock()
if o.atmr == nil || !o.isPushMode() {
o.mu.Unlock()
return fmt.Errorf("observable not push based")
}
o.achk = achk
o.athresh = thresh
o.atmr.Reset(o.achk)
o.mu.Unlock()
return nil
}

View File

@@ -1179,3 +1179,187 @@ func TestJetStreamWorkQueueWorkingIndicator(t *testing.T) {
getMsg(2)
}
func TestJetStreamEphemeralObservables(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "EP", Subjects: []string{"foo.*"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc := clientConnectToServer(t, s)
defer nc.Close()
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: sub.Subject})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// For test speed.
o.SetActiveCheckParams(50*time.Millisecond, 2)
if !o.Active() {
t.Fatalf("Expected the observable to be considered active")
}
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
}
// Make sure works now.
nc.Request("foo.22", nil, 100*time.Millisecond)
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 1 {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, 1)
}
return nil
})
// Now close the subscription and send a message, this should trip active state on ephemeral observable.
sub.Unsubscribe()
nc.Request("foo.22", nil, 100*time.Millisecond)
nc.Flush()
if o.Active() {
t.Fatalf("Expected the ephemeral observable to be considered inactive")
}
// The reason for this still being 1 is that we give some time in case of a reconnect scenario.
// We detect right away on the publish but we wait for interest to be re-established.
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
}
// We should delete this one after the check interval.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
if numo := mset.NumObservables(); numo != 0 {
return fmt.Errorf("Expected number of observables to be 0, go %d", numo)
}
return nil
})
// Now check that with no publish we still will expire the observable.
sub, _ = nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
o, err = mset.AddObservable(&server.ObservableConfig{Delivery: sub.Subject})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// For test speed.
o.SetActiveCheckParams(10*time.Millisecond, 2)
if !o.Active() {
t.Fatalf("Expected the observable to be considered active")
}
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
}
sub.Unsubscribe()
nc.Flush()
// We should delete this one after the check interval.
time.Sleep(50 * time.Millisecond)
if o.Active() {
t.Fatalf("Expected the ephemeral observable to be considered inactive")
}
if numo := mset.NumObservables(); numo != 0 {
t.Fatalf("Expected number of observables to be 0, go %d", numo)
}
}
func TestJetStreamObservableReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "EP", Subjects: []string{"foo.*"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc := clientConnectToServer(t, s)
defer nc.Close()
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
// Capture the subscription.
delivery := sub.Subject
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// For test speed.
o.SetActiveCheckParams(100*time.Millisecond, 10)
if !o.Active() {
t.Fatalf("Expected the observable to be considered active")
}
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
}
// We will simulate reconnect by unsubscribing on one connection and forming
// the same on another. Once we have cluster tests we will do more testing on
// reconnect scenarios.
getMsg := func(seqno int) *nats.Msg {
t.Helper()
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if seq := o.SeqFromReply(m.Reply); seq != uint64(seqno) {
t.Fatalf("Expected sequence of %d , got %d", seqno, seq)
}
m.Respond(nil)
return m
}
sendMsg := func() {
t.Helper()
if err := nc.Publish("foo.22", []byte("OK!")); err != nil {
return
}
}
// Send and Pull first message.
sendMsg() // 1
getMsg(1)
// Cancel first one.
sub.Unsubscribe()
// Re-establish new sub on same subject.
sub, _ = nc.SubscribeSync(delivery)
time.Sleep(100 * time.Millisecond)
// We should be getting 2 here.
sendMsg() // 2
getMsg(2)
sub.Unsubscribe()
time.Sleep(200 * time.Millisecond)
// send 3-10
for i := 0; i <= 7; i++ {
sendMsg()
}
// Make sure they are queued up with no interest.
nc.Flush()
// Restablish again.
sub, _ = nc.SubscribeSync(delivery)
// We should be getting 3-10 here.
for i := 3; i <= 10; i++ {
getMsg(i)
}
}