mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add mixedmode test back in
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -5422,6 +5422,51 @@ func TestJetStreamClusterAckPendingWithMaxRedelivered(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMixedMode(t *testing.T) {
|
||||
c := createMixedModeCluster(t, "MM5", 3, 2)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API - Non-JS server.
|
||||
nc, js := jsClientConnect(t, c.serverByName("S-5"))
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo", "bar"},
|
||||
Replicas: 3,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
ml := c.leader()
|
||||
if ml == nil {
|
||||
t.Fatalf("No metaleader")
|
||||
}
|
||||
|
||||
// Make sure we are tracking only the JS peers.
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
peers := ml.JetStreamClusterPeers()
|
||||
if len(peers) == 3 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Not correct number of peers, expected %d, got %d", 3, len(peers))
|
||||
})
|
||||
|
||||
// Grab the underlying raft structure and make sure the system adjusts its cluster set size.
|
||||
meta := ml.getJetStream().getMetaGroup().(*raft)
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
ps := meta.currentPeerState()
|
||||
if len(ps.knownPeers) != 3 {
|
||||
return fmt.Errorf("Expected known peers to be 3, but got %+v", ps.knownPeers)
|
||||
}
|
||||
if ps.clusterSize != 3 {
|
||||
return fmt.Errorf("Expected cluster size to be 3, but got %+v", ps)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
@@ -5695,6 +5740,47 @@ var jsClusterImportsTempl = `
|
||||
}
|
||||
`
|
||||
|
||||
func createMixedModeCluster(t *testing.T, clusterName string, numJsServers, numNonServers int) *cluster {
|
||||
if clusterName == "" || numJsServers < 1 || numNonServers < 1 {
|
||||
t.Fatalf("Bad params")
|
||||
}
|
||||
|
||||
numServers := numJsServers + numNonServers
|
||||
const startClusterPort = 23232
|
||||
|
||||
// Build out the routes that will be shared with all configs.
|
||||
var routes []string
|
||||
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
|
||||
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp))
|
||||
}
|
||||
routeConfig := strings.Join(routes, ",")
|
||||
|
||||
// Go ahead and build configurations and start servers.
|
||||
c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName}
|
||||
|
||||
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
sn := fmt.Sprintf("S-%d", cp-startClusterPort+1)
|
||||
conf := fmt.Sprintf(jsClusterLimitsTempl, sn, storeDir, clusterName, cp, routeConfig)
|
||||
|
||||
// Disable JS by commmenting it out.
|
||||
if cp-startClusterPort >= numJsServers {
|
||||
conf = strings.Replace(conf, "jetstream: ", "#jetstream: ", 1)
|
||||
}
|
||||
|
||||
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, o)
|
||||
}
|
||||
c.t = t
|
||||
|
||||
// Wait til we are formed and have a leader.
|
||||
c.checkClusterFormed()
|
||||
c.waitOnPeerCount(numJsServers)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// This will create a cluster that is explicitly configured for the routes, etc.
|
||||
// and also has a defined clustername. All configs for routes and cluster name will be the same.
|
||||
func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster {
|
||||
|
||||
Reference in New Issue
Block a user