Merge pull request #2098 from nats-io/mixedmode

Add mixedmode test back in
This commit is contained in:
Derek Collison
2021-04-12 08:48:43 -07:00
committed by GitHub

View File

@@ -5422,6 +5422,51 @@ func TestJetStreamClusterAckPendingWithMaxRedelivered(t *testing.T) {
})
}
func TestJetStreamClusterMixedMode(t *testing.T) {
c := createMixedModeCluster(t, "MM5", 3, 2)
defer c.shutdown()
// Client based API - Non-JS server.
nc, js := jsClientConnect(t, c.serverByName("S-5"))
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ml := c.leader()
if ml == nil {
t.Fatalf("No metaleader")
}
// Make sure we are tracking only the JS peers.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
peers := ml.JetStreamClusterPeers()
if len(peers) == 3 {
return nil
}
return fmt.Errorf("Not correct number of peers, expected %d, got %d", 3, len(peers))
})
// Grab the underlying raft structure and make sure the system adjusts its cluster set size.
meta := ml.getJetStream().getMetaGroup().(*raft)
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ps := meta.currentPeerState()
if len(ps.knownPeers) != 3 {
return fmt.Errorf("Expected known peers to be 3, but got %+v", ps.knownPeers)
}
if ps.clusterSize != 3 {
return fmt.Errorf("Expected cluster size to be 3, but got %+v", ps)
}
return nil
})
}
// Support functions
// Used to setup superclusters for tests.
@@ -5695,6 +5740,47 @@ var jsClusterImportsTempl = `
}
`
func createMixedModeCluster(t *testing.T, clusterName string, numJsServers, numNonServers int) *cluster {
if clusterName == "" || numJsServers < 1 || numNonServers < 1 {
t.Fatalf("Bad params")
}
numServers := numJsServers + numNonServers
const startClusterPort = 23232
// Build out the routes that will be shared with all configs.
var routes []string
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp))
}
routeConfig := strings.Join(routes, ",")
// Go ahead and build configurations and start servers.
c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName}
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
storeDir := createDir(t, JetStreamStoreDir)
sn := fmt.Sprintf("S-%d", cp-startClusterPort+1)
conf := fmt.Sprintf(jsClusterLimitsTempl, sn, storeDir, clusterName, cp, routeConfig)
// Disable JS by commmenting it out.
if cp-startClusterPort >= numJsServers {
conf = strings.Replace(conf, "jetstream: ", "#jetstream: ", 1)
}
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
c.servers = append(c.servers, s)
c.opts = append(c.opts, o)
}
c.t = t
// Wait til we are formed and have a leader.
c.checkClusterFormed()
c.waitOnPeerCount(numJsServers)
return c
}
// This will create a cluster that is explicitly configured for the routes, etc.
// and also has a defined clustername. All configs for routes and cluster name will be the same.
func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster {