From b0340ce598f57b3ee834456bd92c27ce32e4c6fc Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 May 2023 11:02:06 -0700 Subject: [PATCH] Make sure to wait properly until we believe we are caught up to enable direct gets. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 30 +++++------- server/jetstream_cluster_3_test.go | 76 ++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 19 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index baed19e4..c260542e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2284,27 +2284,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Here we are checking if we are not the leader but we have been asked to allow // direct access. We now allow non-leaders to participate in the queue group. if !isLeader && mset != nil { - mset.mu.Lock() - // Check direct gets first. - if mset.cfg.AllowDirect { - if mset.directSub == nil && mset.isCurrent() { - mset.subscribeToDirect() - } else { - startDirectAccessMonitoring() - } - } - // Now check for mirror directs as well. - if mset.cfg.MirrorDirect { - if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() { - mset.subscribeToMirrorDirect() - } else { - startDirectAccessMonitoring() - } - } - mset.mu.Unlock() + startDirectAccessMonitoring() } case <-datc: + if mset == nil || isRecovering { + return + } + // If we are leader we can stop, we know this is setup now. + if isLeader { + stopDirectMonitoring() + return + } + mset.mu.Lock() ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent() if !current { @@ -2328,7 +2320,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps mset.subscribeToMirrorDirect() } mset.mu.Unlock() - // Stop monitoring. + // Stop direct monitoring. stopDirectMonitoring() case <-t.C: diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 82faceaa..700f5810 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4046,3 +4046,79 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) { }) require_NoError(t, err) } + +// https://github.com/nats-io/nats-server/issues/4162 +func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "TEST", + Replicas: 3, + }) + require_NoError(t, err) + + _, err = kv.PutString("foo", "bar") + require_NoError(t, err) + + // Close client in case we were connected to server below. + // We will recreate. + nc.Close() + + // Shutdown a non-leader. + s := c.randomNonStreamLeader(globalAccountName, "KV_TEST") + s.Shutdown() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err = js.KeyValue("TEST") + require_NoError(t, err) + + _, err = kv.PutString("foo", "baz") + require_NoError(t, err) + + errCh := make(chan error, 100) + done := make(chan struct{}) + + go func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err := js.KeyValue("TEST") + if err != nil { + errCh <- err + return + } + + for { + select { + case <-done: + return + default: + entry, err := kv.Get("foo") + if err != nil { + errCh <- err + return + } + if v := string(entry.Value()); v != "baz" { + errCh <- fmt.Errorf("Got wrong value: %q", v) + } + } + } + }() + + // Restart + c.restartServer(s) + // Wait for a bit to make sure as this server participates in direct gets + // it does not server stale reads. + time.Sleep(2 * time.Second) + close(done) + + if len(errCh) > 0 { + t.Fatalf("Expected no errors but got %v", <-errCh) + } +}