Don't immediately listen on the direct get subjects.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-08-17 16:34:39 -07:00
parent c61465b344
commit ce2d5fa173
2 changed files with 73 additions and 25 deletions

View File

@@ -12818,3 +12818,76 @@ func TestJetStreamClusterLeaderAbortsCatchupOnFollowerError(t *testing.T) {
t.Fatal("Expected err response from the remote")
}
}
func TestJetStreamClusterStreamDirectGetNotTooSoon(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, _ := jsClientConnect(t, s)
defer nc.Close()
// Do by hand for now.
cfg := &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"foo"},
Replicas: 3,
MaxMsgsPer: 1,
AllowDirect: true,
}
addStream(t, nc, cfg)
sendStreamMsg(t, nc, "foo", "bar")
getSubj := fmt.Sprintf(JSDirectGetLastBySubjectT, "TEST", "foo")
// Make sure we get all direct subs.
checkForDirectSubs := func() {
t.Helper()
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
if err != nil {
return err
}
mset.mu.RLock()
hasBoth := mset.directSub != nil && mset.lastBySub != nil
mset.mu.RUnlock()
if !hasBoth {
return fmt.Errorf("%v does not have both direct subs registered", s)
}
}
return nil
})
}
_, err := nc.Request(getSubj, nil, time.Second)
require_NoError(t, err)
checkForDirectSubs()
// We want to make sure that when starting up we do not listen until we have a leader.
nc.Close()
c.stopAll()
// Start just one..
s, opts := RunServerWithConfig(c.opts[0].ConfigFile)
c.servers[0] = s
c.opts[0] = opts
nc, _ = jsClientConnect(t, s)
defer nc.Close()
_, err = nc.Request(getSubj, nil, time.Second)
require_Error(t, err, nats.ErrTimeout)
// Now start all and make sure they all eventually have subs for direct access.
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
_, err = nc.Request(getSubj, nil, time.Second)
require_NoError(t, err)
checkForDirectSubs()
}

View File

@@ -1482,31 +1482,6 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if mset.isLeader() && sendAdvisory {
mset.sendUpdateAdvisoryLocked()
}
// Check for AllowDirect
if mset.cfg.AllowDirect {
mset.subscribeToDirect()
} else if mset.directSub != nil {
mset.unsubscribe(mset.directSub)
mset.directSub = nil
}
// Check for mirror. If set but we are not a mirror just ignore for now.
if mset.cfg.MirrorDirect && mset.cfg.Mirror != nil {
if err := mset.subscribeToMirrorDirect(); err != nil {
// Disable since we had problems above.
mset.cfg.MirrorDirect = false
}
} else if !mset.cfg.MirrorDirect && mset.mirror != nil {
if mset.mirror.dsub != nil {
mset.unsubscribe(mset.mirror.dsub)
mset.mirror.dsub = nil
}
if mset.mirror.lbsub != nil {
mset.unsubscribe(mset.mirror.lbsub)
mset.mirror.lbsub = nil
}
}
mset.mu.Unlock()
if js != nil {