mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Make sure to wait properly until we believe we are caught up to enable direct gets.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2284,27 +2284,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
// Here we are checking if we are not the leader but we have been asked to allow
|
||||
// direct access. We now allow non-leaders to participate in the queue group.
|
||||
if !isLeader && mset != nil {
|
||||
mset.mu.Lock()
|
||||
// Check direct gets first.
|
||||
if mset.cfg.AllowDirect {
|
||||
if mset.directSub == nil && mset.isCurrent() {
|
||||
mset.subscribeToDirect()
|
||||
} else {
|
||||
startDirectAccessMonitoring()
|
||||
}
|
||||
}
|
||||
// Now check for mirror directs as well.
|
||||
if mset.cfg.MirrorDirect {
|
||||
if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() {
|
||||
mset.subscribeToMirrorDirect()
|
||||
} else {
|
||||
startDirectAccessMonitoring()
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
startDirectAccessMonitoring()
|
||||
}
|
||||
|
||||
case <-datc:
|
||||
if mset == nil || isRecovering {
|
||||
return
|
||||
}
|
||||
// If we are leader we can stop, we know this is setup now.
|
||||
if isLeader {
|
||||
stopDirectMonitoring()
|
||||
return
|
||||
}
|
||||
|
||||
mset.mu.Lock()
|
||||
ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent()
|
||||
if !current {
|
||||
@@ -2328,7 +2320,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
mset.subscribeToMirrorDirect()
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
// Stop monitoring.
|
||||
// Stop direct monitoring.
|
||||
stopDirectMonitoring()
|
||||
|
||||
case <-t.C:
|
||||
|
||||
@@ -4046,3 +4046,79 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
|
||||
})
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
// https://github.com/nats-io/nats-server/issues/4162
|
||||
func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "NATS", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
|
||||
Bucket: "TEST",
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = kv.PutString("foo", "bar")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Close client in case we were connected to server below.
|
||||
// We will recreate.
|
||||
nc.Close()
|
||||
|
||||
// Shutdown a non-leader.
|
||||
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
|
||||
s.Shutdown()
|
||||
|
||||
nc, js = jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
kv, err = js.KeyValue("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = kv.PutString("foo", "baz")
|
||||
require_NoError(t, err)
|
||||
|
||||
errCh := make(chan error, 100)
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
kv, err := js.KeyValue("TEST")
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
entry, err := kv.Get("foo")
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if v := string(entry.Value()); v != "baz" {
|
||||
errCh <- fmt.Errorf("Got wrong value: %q", v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Restart
|
||||
c.restartServer(s)
|
||||
// Wait for a bit to make sure as this server participates in direct gets
|
||||
// it does not server stale reads.
|
||||
time.Sleep(2 * time.Second)
|
||||
close(done)
|
||||
|
||||
if len(errCh) > 0 {
|
||||
t.Fatalf("Expected no errors but got %v", <-errCh)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user