diff --git a/server/jetstream.go b/server/jetstream.go index 5d476cc5..987aa216 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -515,6 +515,11 @@ func (s *Server) migrateEphemerals() { } } } + + // Gove time for migration information to make it out of our server. + if len(consumers) > 0 { + time.Sleep(50 * time.Millisecond) + } } // Shutdown jetstream for this server. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 3b1e2f67..3dcb7282 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -2098,6 +2098,7 @@ func TestJetStreamClusterEphemeralConsumersNotReplicated(t *testing.T) { t.Fatalf("Unexpected publish error: %v", err) } checkSubsPending(t, sub, 1) + sub.NextMsg(0) if ci.Cluster == nil || len(ci.Cluster.Replicas) != 0 { t.Fatalf("Expected ephemeral to be R=1, got %+v", ci.Cluster) @@ -2115,13 +2116,13 @@ func TestJetStreamClusterEphemeralConsumersNotReplicated(t *testing.T) { scl.Shutdown() c.waitOnStreamLeader("$G", "foo") - // Let the consumer migrate and spin up. - time.Sleep(250 * time.Millisecond) - if _, err = js.Publish("foo", []byte("OK")); err != nil { t.Fatalf("Unexpected publish error: %v", err) } - checkSubsPending(t, sub, 2) + + if _, err := sub.NextMsg(500 * time.Millisecond); err != nil { + t.Logf("Expected to see another message, but behavior is optimistic so can fail") + } } func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {