Allow all members of a replicated stream to participate in direct access.

We will wait until a non-leader replica is current to subscribe.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-07-03 11:08:24 -07:00
parent 4075721651
commit 47bef915ed
4 changed files with 277 additions and 68 deletions

View File

@@ -1707,6 +1707,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// For migration tracking.
var migrating bool
var peerGroup peerMigrateType
var mmt *time.Ticker
var mmtc <-chan time.Time
@@ -1720,11 +1721,31 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
stopMigrationMonitoring := func() {
if mmt != nil {
mmt.Stop()
mmtc = nil
mmt, mmtc = nil, nil
}
}
defer stopMigrationMonitoring()
// This is to optionally track when we are ready as a non-leader for direct access participation.
// Either direct or if we are a direct mirror, or both.
var dat *time.Ticker
var datc <-chan time.Time
startDirectAccessMonitoring := func() {
if dat == nil {
dat = time.NewTicker(1 * time.Second)
datc = dat.C
}
}
stopDirectMonitoring := func() {
if dat != nil {
dat.Stop()
dat, datc = nil, nil
}
}
defer stopDirectMonitoring()
// This is triggered during a scale up from 1 to clustered mode. We need the new followers to catchup,
// similar to how we trigger the catchup mechanism post a backup/restore. It's ok to do here and preferred
// over waiting to be elected, this just queues it up for the new members to see first and trigger the above
@@ -1787,6 +1808,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
} else if n.NeedSnapshot() {
doSnapshot()
}
// Always cancel if this was running.
stopDirectMonitoring()
} else if n.GroupLeader() != noLeader {
js.setStreamAssignmentRecovering(sa)
}
@@ -1806,6 +1830,48 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
stopMigrationMonitoring()
}
}
// Here we are checking if we are not the leader but we have been asked to allow
// direct access. We now allow non-leaders to participate in the queue group.
if !isLeader && mset != nil {
mset.mu.Lock()
// Check direct gets first.
if mset.cfg.AllowDirect {
if mset.directSub == nil && mset.isCurrent() {
mset.subscribeToDirect()
} else {
startDirectAccessMonitoring()
}
}
// Now check for mirror directs as well.
if mset.cfg.MirrorDirect {
if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() {
mset.subscribeToMirrorDirect()
} else {
startDirectAccessMonitoring()
}
}
mset.mu.Unlock()
}
case <-datc:
mset.mu.Lock()
ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent()
if !current {
mset.mu.Unlock()
continue
}
// We are current, cancel monitoring and create the direct subs as needed.
if ad {
mset.subscribeToDirect()
}
if md {
mset.subscribeToMirrorDirect()
}
mset.mu.Unlock()
// Stop monitoring.
stopDirectMonitoring()
case <-t.C:
doSnapshot()
case <-uch:
@@ -5984,6 +6050,15 @@ func (mset *stream) isCatchingUp() bool {
return mset.catchup
}
// Determine if a non-leader is current.
// Lock should be held.
func (mset *stream) isCurrent() bool {
if mset.node == nil {
return true
}
return mset.node.Current() && !mset.catchup
}
// Maximum requests for the whole server that can be in flight.
const maxConcurrentSyncRequests = 8

View File

@@ -17,15 +17,18 @@
package server
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net"
"strings"
"testing"
"time"
"github.com/nats-io/nats.go"
"golang.org/x/time/rate"
)
// Support functions
@@ -1365,6 +1368,24 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
return resp.StreamInfo
}
func updateStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
t.Helper()
req, err := json.Marshal(cfg)
require_NoError(t, err)
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second)
require_NoError(t, err)
var resp JSApiStreamCreateResponse
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
if resp.Type != JSApiStreamUpdateResponseType {
t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
return resp.StreamInfo
}
// setInActiveDeleteThreshold sets the delete threshold for how long to wait
// before deleting an inactive consumer.
func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
@@ -1380,3 +1401,79 @@ func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
}
return nil
}
// Net Proxy - For introducing RTT and BW constraints.
type netProxy struct {
listener net.Listener
conns []net.Conn
url string
}
func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy {
hp := net.JoinHostPort("127.0.0.1", "0")
l, e := net.Listen("tcp", hp)
if e != nil {
panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e))
}
port := l.Addr().(*net.TCPAddr).Port
proxy := &netProxy{listener: l}
go func() {
client, err := l.Accept()
if err != nil {
return
}
server, err := net.DialTimeout("tcp", serverURL[7:], time.Second)
if err != nil {
panic("Can't connect to NATS server")
}
proxy.conns = append(proxy.conns, client, server)
go proxy.loop(rtt, upRate, client, server)
go proxy.loop(rtt, downRate, server, client)
}()
proxy.url = fmt.Sprintf("nats://127.0.0.1:%d", port)
return proxy
}
func (np *netProxy) clientURL() string {
return np.url
}
func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) {
delay := rtt / 2
const rbl = 8192
var buf [rbl]byte
ctx := context.Background()
rl := rate.NewLimiter(rate.Limit(tbw), rbl)
for fr := true; ; {
sr := time.Now()
n, err := r.Read(buf[:])
if err != nil {
return
}
// RTT delays
if fr || time.Since(sr) > 2*time.Millisecond {
fr = false
if delay > 0 {
time.Sleep(delay)
}
}
if err := rl.WaitN(ctx, n); err != nil {
return
}
if _, err = w.Write(buf[:n]); err != nil {
return
}
}
}
func (np *netProxy) stop() {
if np.listener != nil {
np.listener.Close()
np.listener = nil
for _, c := range np.conns {
c.Close()
}
}
}

View File

@@ -2536,7 +2536,16 @@ func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) {
}
addStream(t, nc, cfg)
// Since last one was an R3, check and wait for subscription.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("M2")
require_NoError(t, err)
if si.State.Msgs != uint64(num) {
return fmt.Errorf("Expected %d msgs, got state: %d", num, si.State.Msgs)
}
return nil
})
// Since last one was an R3, check and wait for the direct subscription.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
sl := sc.clusterForName("C3").streamLeader("$G", "M2")
if mset, err := sl.GlobalAccount().lookupStream("M2"); err == nil {

View File

@@ -20,7 +20,6 @@ import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"encoding/json"
"fmt"
@@ -48,7 +47,6 @@ import (
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"golang.org/x/time/rate"
)
// IMPORTANT: Tests in this file are not executed when running with the -race flag.
@@ -5171,6 +5169,7 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T)
return
}
case <-qch:
pt.Stop()
return
}
}
@@ -5230,78 +5229,107 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T)
}
}
// Net Proxy - For introducing RTT and BW constraints.
type netProxy struct {
listener net.Listener
conns []net.Conn
url string
}
// Test that all peers have the direct access subs that participate in a queue group,
// but only when they are current and ready. So we will start with R1, add in messages
// then scale up while also still adding messages.
func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy {
hp := net.JoinHostPort("127.0.0.1", "0")
l, e := net.Listen("tcp", hp)
if e != nil {
panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e))
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Start as R1
cfg := &StreamConfig{
Name: "TEST",
Subjects: []string{"kv.>"},
MaxMsgsPer: 10,
AllowDirect: true,
Replicas: 1,
Storage: FileStorage,
}
port := l.Addr().(*net.TCPAddr).Port
proxy := &netProxy{listener: l}
go func() {
client, err := l.Accept()
if err != nil {
return
}
server, err := net.DialTimeout("tcp", serverURL[7:], time.Second)
if err != nil {
panic("Can't connect to NATS server")
}
proxy.conns = append(proxy.conns, client, server)
go proxy.loop(rtt, upRate, client, server)
go proxy.loop(rtt, downRate, server, client)
}()
proxy.url = fmt.Sprintf("nats://127.0.0.1:%d", port)
return proxy
}
addStream(t, nc, cfg)
func (np *netProxy) clientURL() string {
return np.url
}
// Seed with enough messages to start then we will scale up while still adding more messages.
num, msg := 1000, bytes.Repeat([]byte("XYZ"), 64)
for i := 0; i < num; i++ {
js.PublishAsync(fmt.Sprintf("kv.%d", i), msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) {
delay := rtt / 2
const rbl = 8192
var buf [rbl]byte
ctx := context.Background()
getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST")
getMsg := func(key string) *nats.Msg {
req := []byte(fmt.Sprintf(`{"last_by_subj":%q}`, key))
m, err := nc.Request(getSubj, req, time.Second)
require_NoError(t, err)
require_True(t, m.Header.Get(JSSubject) == key)
return m
}
rl := rate.NewLimiter(rate.Limit(tbw), rbl)
// Just make sure we can succeed here.
getMsg("kv.22")
for fr := true; ; {
sr := time.Now()
n, err := r.Read(buf[:])
if err != nil {
return
}
// RTT delays
if fr || time.Since(sr) > 2*time.Millisecond {
fr = false
if delay > 0 {
time.Sleep(delay)
// Now crank up a go routine to continue sending more messages.
qch := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
nc, _ := jsClientConnect(t, c.randomServer())
js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond))
defer nc.Close()
for {
pt := time.NewTimer(time.Duration(time.Millisecond))
select {
case <-pt.C:
js.Publish(fmt.Sprintf("kv.%d", rand.Intn(1000)), msg)
case <-qch:
pt.Stop()
return
}
}
}
if err := rl.WaitN(ctx, n); err != nil {
return
}
if _, err = w.Write(buf[:n]); err != nil {
return
}
}()
}
}
func (np *netProxy) stop() {
if np.listener != nil {
np.listener.Close()
np.listener = nil
for _, c := range np.conns {
c.Close()
}
time.Sleep(100 * time.Millisecond)
// Now let's scale up to an R3.
cfg.Replicas = 3
updateStream(t, nc, cfg)
c.waitOnStreamLeader("$G", "TEST")
// For each non-leader check that the direct sub fires up.
// We just test all, the leader will already have a directSub.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
mset.mu.RLock()
ok := mset.directSub != nil
mset.mu.RUnlock()
if ok {
return nil
}
return fmt.Errorf("No directSub yet")
})
}
close(qch)
wg.Wait()
// Just make sure we can succeed here.
getMsg("kv.22")
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == uint64(num) {
t.Fatalf("Expected to see messages increase, got %d", si.State.Msgs)
}
}