diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index fad1ee02..777b8dbd 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5422,6 +5422,51 @@ func TestJetStreamClusterAckPendingWithMaxRedelivered(t *testing.T) { }) } +func TestJetStreamClusterMixedMode(t *testing.T) { + c := createMixedModeCluster(t, "MM5", 3, 2) + defer c.shutdown() + + // Client based API - Non-JS server. + nc, js := jsClientConnect(t, c.serverByName("S-5")) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ml := c.leader() + if ml == nil { + t.Fatalf("No metaleader") + } + + // Make sure we are tracking only the JS peers. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + peers := ml.JetStreamClusterPeers() + if len(peers) == 3 { + return nil + } + return fmt.Errorf("Not correct number of peers, expected %d, got %d", 3, len(peers)) + }) + + // Grab the underlying raft structure and make sure the system adjusts its cluster set size. + meta := ml.getJetStream().getMetaGroup().(*raft) + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + ps := meta.currentPeerState() + if len(ps.knownPeers) != 3 { + return fmt.Errorf("Expected known peers to be 3, but got %+v", ps.knownPeers) + } + if ps.clusterSize != 3 { + return fmt.Errorf("Expected cluster size to be 3, but got %+v", ps) + } + return nil + }) +} + // Support functions // Used to setup superclusters for tests. @@ -5695,6 +5740,47 @@ var jsClusterImportsTempl = ` } ` +func createMixedModeCluster(t *testing.T, clusterName string, numJsServers, numNonServers int) *cluster { + if clusterName == "" || numJsServers < 1 || numNonServers < 1 { + t.Fatalf("Bad params") + } + + numServers := numJsServers + numNonServers + const startClusterPort = 23232 + + // Build out the routes that will be shared with all configs. + var routes []string + for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp)) + } + routeConfig := strings.Join(routes, ",") + + // Go ahead and build configurations and start servers. + c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName} + + for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + storeDir := createDir(t, JetStreamStoreDir) + sn := fmt.Sprintf("S-%d", cp-startClusterPort+1) + conf := fmt.Sprintf(jsClusterLimitsTempl, sn, storeDir, clusterName, cp, routeConfig) + + // Disable JS by commmenting it out. + if cp-startClusterPort >= numJsServers { + conf = strings.Replace(conf, "jetstream: ", "#jetstream: ", 1) + } + + s, o := RunServerWithConfig(createConfFile(t, []byte(conf))) + c.servers = append(c.servers, s) + c.opts = append(c.opts, o) + } + c.t = t + + // Wait til we are formed and have a leader. + c.checkClusterFormed() + c.waitOnPeerCount(numJsServers) + + return c +} + // This will create a cluster that is explicitly configured for the routes, etc. // and also has a defined clustername. All configs for routes and cluster name will be the same. func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster {