From 47bef915ed20ad183b7c1a4f973185cd6181368c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 3 Jul 2022 11:08:24 -0700 Subject: [PATCH] Allow all members of a replicated stream to participate in direct access. We will wait until a non-leader replica is current to subscribe. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 77 +++++++++++- server/jetstream_helpers_test.go | 97 +++++++++++++++ server/jetstream_super_cluster_test.go | 11 +- server/norace_test.go | 160 +++++++++++++++---------- 4 files changed, 277 insertions(+), 68 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ca48af1d..3819258e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1707,6 +1707,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // For migration tracking. var migrating bool var peerGroup peerMigrateType + var mmt *time.Ticker var mmtc <-chan time.Time @@ -1720,11 +1721,31 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() - mmtc = nil + mmt, mmtc = nil, nil } } defer stopMigrationMonitoring() + // This is to optionally track when we are ready as a non-leader for direct access participation. + // Either direct or if we are a direct mirror, or both. + var dat *time.Ticker + var datc <-chan time.Time + + startDirectAccessMonitoring := func() { + if dat == nil { + dat = time.NewTicker(1 * time.Second) + datc = dat.C + } + } + + stopDirectMonitoring := func() { + if dat != nil { + dat.Stop() + dat, datc = nil, nil + } + } + defer stopDirectMonitoring() + // This is triggered during a scale up from 1 to clustered mode. We need the new followers to catchup, // similar to how we trigger the catchup mechanism post a backup/restore. It's ok to do here and preferred // over waiting to be elected, this just queues it up for the new members to see first and trigger the above @@ -1787,6 +1808,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } else if n.NeedSnapshot() { doSnapshot() } + // Always cancel if this was running. + stopDirectMonitoring() + } else if n.GroupLeader() != noLeader { js.setStreamAssignmentRecovering(sa) } @@ -1806,6 +1830,48 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps stopMigrationMonitoring() } } + + // Here we are checking if we are not the leader but we have been asked to allow + // direct access. We now allow non-leaders to participate in the queue group. + if !isLeader && mset != nil { + mset.mu.Lock() + // Check direct gets first. + if mset.cfg.AllowDirect { + if mset.directSub == nil && mset.isCurrent() { + mset.subscribeToDirect() + } else { + startDirectAccessMonitoring() + } + } + // Now check for mirror directs as well. + if mset.cfg.MirrorDirect { + if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() { + mset.subscribeToMirrorDirect() + } else { + startDirectAccessMonitoring() + } + } + mset.mu.Unlock() + } + + case <-datc: + mset.mu.Lock() + ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent() + if !current { + mset.mu.Unlock() + continue + } + // We are current, cancel monitoring and create the direct subs as needed. + if ad { + mset.subscribeToDirect() + } + if md { + mset.subscribeToMirrorDirect() + } + mset.mu.Unlock() + // Stop monitoring. + stopDirectMonitoring() + case <-t.C: doSnapshot() case <-uch: @@ -5984,6 +6050,15 @@ func (mset *stream) isCatchingUp() bool { return mset.catchup } +// Determine if a non-leader is current. +// Lock should be held. +func (mset *stream) isCurrent() bool { + if mset.node == nil { + return true + } + return mset.node.Current() && !mset.catchup +} + // Maximum requests for the whole server that can be in flight. const maxConcurrentSyncRequests = 8 diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index fd87e08b..b8c2ff25 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -17,15 +17,18 @@ package server import ( + "context" "encoding/json" "fmt" "io/ioutil" "math/rand" + "net" "strings" "testing" "time" "github.com/nats-io/nats.go" + "golang.org/x/time/rate" ) // Support functions @@ -1365,6 +1368,24 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { return resp.StreamInfo } +func updateStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { + t.Helper() + req, err := json.Marshal(cfg) + require_NoError(t, err) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second) + require_NoError(t, err) + var resp JSApiStreamCreateResponse + err = json.Unmarshal(rmsg.Data, &resp) + require_NoError(t, err) + if resp.Type != JSApiStreamUpdateResponseType { + t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType) + } + if resp.Error != nil { + t.Fatalf("Unexpected error: %+v", resp.Error) + } + return resp.StreamInfo +} + // setInActiveDeleteThreshold sets the delete threshold for how long to wait // before deleting an inactive consumer. func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error { @@ -1380,3 +1401,79 @@ func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error { } return nil } + +// Net Proxy - For introducing RTT and BW constraints. +type netProxy struct { + listener net.Listener + conns []net.Conn + url string +} + +func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy { + hp := net.JoinHostPort("127.0.0.1", "0") + l, e := net.Listen("tcp", hp) + if e != nil { + panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e)) + } + port := l.Addr().(*net.TCPAddr).Port + proxy := &netProxy{listener: l} + go func() { + client, err := l.Accept() + if err != nil { + return + } + server, err := net.DialTimeout("tcp", serverURL[7:], time.Second) + if err != nil { + panic("Can't connect to NATS server") + } + proxy.conns = append(proxy.conns, client, server) + go proxy.loop(rtt, upRate, client, server) + go proxy.loop(rtt, downRate, server, client) + }() + proxy.url = fmt.Sprintf("nats://127.0.0.1:%d", port) + return proxy +} + +func (np *netProxy) clientURL() string { + return np.url +} + +func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { + delay := rtt / 2 + const rbl = 8192 + var buf [rbl]byte + ctx := context.Background() + + rl := rate.NewLimiter(rate.Limit(tbw), rbl) + + for fr := true; ; { + sr := time.Now() + n, err := r.Read(buf[:]) + if err != nil { + return + } + // RTT delays + if fr || time.Since(sr) > 2*time.Millisecond { + fr = false + if delay > 0 { + time.Sleep(delay) + } + } + if err := rl.WaitN(ctx, n); err != nil { + return + } + if _, err = w.Write(buf[:n]); err != nil { + return + } + } +} + +func (np *netProxy) stop() { + if np.listener != nil { + np.listener.Close() + np.listener = nil + for _, c := range np.conns { + c.Close() + } + } +} diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 4cc17c69..64ed1580 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2536,7 +2536,16 @@ func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) { } addStream(t, nc, cfg) - // Since last one was an R3, check and wait for subscription. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("M2") + require_NoError(t, err) + if si.State.Msgs != uint64(num) { + return fmt.Errorf("Expected %d msgs, got state: %d", num, si.State.Msgs) + } + return nil + }) + + // Since last one was an R3, check and wait for the direct subscription. checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { sl := sc.clusterForName("C3").streamLeader("$G", "M2") if mset, err := sl.GlobalAccount().lookupStream("M2"); err == nil { diff --git a/server/norace_test.go b/server/norace_test.go index 3a642d84..9ab726d8 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -20,7 +20,6 @@ import ( "bufio" "bytes" "compress/gzip" - "context" "encoding/binary" "encoding/json" "fmt" @@ -48,7 +47,6 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" - "golang.org/x/time/rate" ) // IMPORTANT: Tests in this file are not executed when running with the -race flag. @@ -5171,6 +5169,7 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) return } case <-qch: + pt.Stop() return } } @@ -5230,78 +5229,107 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) } } -// Net Proxy - For introducing RTT and BW constraints. -type netProxy struct { - listener net.Listener - conns []net.Conn - url string -} +// Test that all peers have the direct access subs that participate in a queue group, +// but only when they are current and ready. So we will start with R1, add in messages +// then scale up while also still adding messages. +func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() -func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy { - hp := net.JoinHostPort("127.0.0.1", "0") - l, e := net.Listen("tcp", hp) - if e != nil { - panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e)) + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Start as R1 + cfg := &StreamConfig{ + Name: "TEST", + Subjects: []string{"kv.>"}, + MaxMsgsPer: 10, + AllowDirect: true, + Replicas: 1, + Storage: FileStorage, } - port := l.Addr().(*net.TCPAddr).Port - proxy := &netProxy{listener: l} - go func() { - client, err := l.Accept() - if err != nil { - return - } - server, err := net.DialTimeout("tcp", serverURL[7:], time.Second) - if err != nil { - panic("Can't connect to NATS server") - } - proxy.conns = append(proxy.conns, client, server) - go proxy.loop(rtt, upRate, client, server) - go proxy.loop(rtt, downRate, server, client) - }() - proxy.url = fmt.Sprintf("nats://127.0.0.1:%d", port) - return proxy -} + addStream(t, nc, cfg) -func (np *netProxy) clientURL() string { - return np.url -} + // Seed with enough messages to start then we will scale up while still adding more messages. + num, msg := 1000, bytes.Repeat([]byte("XYZ"), 64) + for i := 0; i < num; i++ { + js.PublishAsync(fmt.Sprintf("kv.%d", i), msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } -func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { - delay := rtt / 2 - const rbl = 8192 - var buf [rbl]byte - ctx := context.Background() + getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST") + getMsg := func(key string) *nats.Msg { + req := []byte(fmt.Sprintf(`{"last_by_subj":%q}`, key)) + m, err := nc.Request(getSubj, req, time.Second) + require_NoError(t, err) + require_True(t, m.Header.Get(JSSubject) == key) + return m + } - rl := rate.NewLimiter(rate.Limit(tbw), rbl) + // Just make sure we can succeed here. + getMsg("kv.22") - for fr := true; ; { - sr := time.Now() - n, err := r.Read(buf[:]) - if err != nil { - return - } - // RTT delays - if fr || time.Since(sr) > 2*time.Millisecond { - fr = false - if delay > 0 { - time.Sleep(delay) + // Now crank up a go routine to continue sending more messages. + qch := make(chan bool) + var wg sync.WaitGroup + + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + nc, _ := jsClientConnect(t, c.randomServer()) + js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond)) + defer nc.Close() + for { + pt := time.NewTimer(time.Duration(time.Millisecond)) + select { + case <-pt.C: + js.Publish(fmt.Sprintf("kv.%d", rand.Intn(1000)), msg) + case <-qch: + pt.Stop() + return + } } - } - if err := rl.WaitN(ctx, n); err != nil { - return - } - if _, err = w.Write(buf[:n]); err != nil { - return - } + }() } -} -func (np *netProxy) stop() { - if np.listener != nil { - np.listener.Close() - np.listener = nil - for _, c := range np.conns { - c.Close() - } + time.Sleep(100 * time.Millisecond) + + // Now let's scale up to an R3. + cfg.Replicas = 3 + updateStream(t, nc, cfg) + c.waitOnStreamLeader("$G", "TEST") + + // For each non-leader check that the direct sub fires up. + // We just test all, the leader will already have a directSub. + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + mset.mu.RLock() + ok := mset.directSub != nil + mset.mu.RUnlock() + if ok { + return nil + } + return fmt.Errorf("No directSub yet") + }) + } + + close(qch) + wg.Wait() + + // Just make sure we can succeed here. + getMsg("kv.22") + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + + if si.State.Msgs == uint64(num) { + t.Fatalf("Expected to see messages increase, got %d", si.State.Msgs) } }