First pass at supercluster enablement.

This allows metacontrollers to span superclusters. Also includes placement directives for streams. By default they select the request origin cluster.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-03 17:28:13 -08:00
parent 0dfac6b52f
commit a1e0f7dc1a
9 changed files with 305 additions and 44 deletions

View File

@@ -4662,9 +4662,12 @@ func (c *client) getClientInfo(detailed bool) *ClientInfo {
return nil
}
// Server name. Defaults to server ID if not set explicitly.
var sn string
if detailed && c.kind != LEAF {
sn = c.srv.Name()
var cn, sn string
if detailed {
if c.kind != LEAF {
sn = c.srv.Name()
}
cn = c.srv.cachedClusterName()
}
c.mu.Lock()
@@ -4685,6 +4688,7 @@ func (c *client) getClientInfo(detailed bool) *ClientInfo {
ci.Lang = c.opts.Lang
ci.Version = c.opts.Version
ci.Server = sn
ci.Cluster = cn
ci.Jwt = c.opts.JWT
ci.IssuerKey = issuerForClient(c)
ci.NameTag = c.nameTag

View File

@@ -173,6 +173,7 @@ type ClientInfo struct {
Version string `json:"ver,omitempty"`
RTT time.Duration `json:"rtt,omitempty"`
Server string `json:"server,omitempty"`
Cluster string `json:"cluster,omitempty"`
Stop *time.Time `json:"stop,omitempty"`
Jwt string `json:"jwt,omitempty"`
IssuerKey string `json:"issuer_key,omitempty"`
@@ -263,7 +264,7 @@ RESET:
host := s.info.Host
servername := s.info.Name
seqp := &s.sys.seq
js := s.js != nil
js := s.info.JetStream
cluster := s.info.Cluster
if s.gateway.enabled {
cluster = s.getGatewayName()
@@ -826,6 +827,7 @@ func (s *Server) remoteServerShutdown(sub *subscription, _ *client, subject, rep
s.Debugf("Received remote server shutdown on bad subject %q", subject)
return
}
sid := toks[serverSubjectIndex]
su := s.sys.servers[sid]
if su != nil {
@@ -860,7 +862,9 @@ func (s *Server) processNewServer(ms *ServerInfo) {
// connect update to make sure they switch this account to interest only mode.
s.ensureGWsInterestOnlyForLeafNodes()
// Add to our nodeToName
s.nodeToName[string(getHash(ms.Name))] = ms.Name
node := string(getHash(ms.Name))
s.nodeToName.Store(node, ms.Name)
s.nodeToCluster.Store(node, ms.Cluster)
}
// If GW is enabled on this server and there are any leaf node connections,

View File

@@ -46,6 +46,12 @@ type jetStreamCluster struct {
consumerResults *subscription
}
// Used to guide placement of streams in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster"`
Tags []string `json:"tags,omitempty"`
}
// Define types of the entry.
type entryOp uint8
@@ -2582,22 +2588,32 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
// selectPeerGroup will select a group of peers to start a raft group.
// TODO(dlc) - For now randomly select. Can be way smarter.
func (cc *jetStreamCluster) selectPeerGroup(r int) []string {
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string) []string {
var nodes []string
peers := cc.meta.Peers()
// Make sure they are active
s := cc.s
ourID := cc.meta.ID()
s, ourID := cc.s, cc.meta.ID()
now := time.Now()
for _, p := range peers {
// FIXME(dlc) - cluster scoped.
if p.ID == ourID || s.getRouteByHash([]byte(p.ID)) != nil {
nodes = append(nodes, p.ID)
// Make sure they are active and current.
current, lastSeen := p.Current, now.Sub(p.Last)
if current && lastSeen > lostQuorumInterval {
current = false
}
if p.ID == ourID || current {
if cluster != _EMPTY_ {
if s.clusterNameForNode(p.ID) == cluster {
nodes = append(nodes, p.ID)
}
} else {
nodes = append(nodes, p.ID)
}
}
}
if len(nodes) < r {
return nil
}
// Don't depend on range.
// Don't depend on range to randomize.
rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] })
return nodes[:r]
}
@@ -2622,15 +2638,18 @@ func groupName(prefix string, peers []string, storage StorageType) string {
// createGroupForStream will create a group for assignment for the stream.
// Lock should be held.
func (cc *jetStreamCluster) createGroupForStream(cfg *StreamConfig) *raftGroup {
func (cc *jetStreamCluster) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup {
replicas := cfg.Replicas
if replicas == 0 {
replicas = 1
}
cluster := ci.Cluster
if cfg.Placement != nil && cfg.Placement.Cluster != _EMPTY_ {
cluster = cfg.Placement.Cluster
}
// Need to create a group here.
// TODO(dlc) - Can be way smarter here.
peers := cc.selectPeerGroup(replicas)
peers := cc.selectPeerGroup(replicas, cluster)
if len(peers) == 0 {
return nil
}
@@ -2682,7 +2701,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
}
// Raft group selection and placement.
rg := cc.createGroupForStream(cfg)
rg := cc.createGroupForStream(ci, cfg)
if rg == nil {
resp.Error = jsInsufficientErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
@@ -2770,7 +2789,7 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r
}
// Raft group selection and placement.
rg := cc.createGroupForStream(cfg)
rg := cc.createGroupForStream(ci, cfg)
if rg == nil {
resp.Error = jsInsufficientErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))

View File

@@ -19,6 +19,7 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
"path"
"sync"
@@ -228,11 +229,35 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
expected := len(knownPeers)
// We need to adjust this is all peers are not known.
if !allPeersKnown {
s.Debugf("Determining expected peer size for JetStream metacontroller")
if expected < 2 {
expected = 2
}
if ncr := s.configuredRoutes(); expected < ncr {
expected = ncr
opts := s.getOpts()
nrs := len(opts.Routes)
cn := s.ClusterName()
ngwps := 0
for _, gw := range opts.Gateway.Gateways {
// Ignore our own cluster if specified.
if gw.Name == cn {
continue
}
for _, u := range gw.URLs {
host := u.Hostname()
// If this is an IP just add one.
if net.ParseIP(host) != nil {
ngwps++
} else {
addrs, _ := net.LookupHost(host)
ngwps += len(addrs)
}
}
}
if expected < nrs+ngwps {
expected = nrs + ngwps
s.Debugf("Adjusting expected peer set size to %d with %d known", expected, len(knownPeers))
}
}
@@ -261,6 +286,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
if ps == nil || ps.clusterSize < 2 {
return nil, errors.New("raft: cluster too small")
}
n := &raft{
created: time.Now(),
id: hash[:idLen],
@@ -353,10 +379,18 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
// Maps node names back to server names.
func (s *Server) serverNameForNode(node string) string {
s.mu.Lock()
sn := s.nodeToName[node]
s.mu.Unlock()
return sn
if sn, ok := s.nodeToName.Load(node); ok {
return sn.(string)
}
return _EMPTY_
}
// Maps node names back to cluster names.
func (s *Server) clusterNameForNode(node string) string {
if cn, ok := s.nodeToCluster.Load(node); ok {
return cn.(string)
}
return _EMPTY_
}
// Server will track all raft nodes.
@@ -872,7 +906,7 @@ func (n *raft) shutdown(shouldDelete bool) {
}
}
func (n *raft) newInbox(cn string) string {
func (n *raft) newInbox() string {
var b [replySuffixLen]byte
rn := rand.Int63()
for i, l := 0, rn; i < len(b); i++ {
@@ -883,8 +917,8 @@ func (n *raft) newInbox(cn string) string {
}
const (
raftVoteSubj = "$NRG.V.%s.%s"
raftAppendSubj = "$NRG.E.%s.%s"
raftVoteSubj = "$NRG.V.%s"
raftAppendSubj = "$NRG.AE.%s"
raftPropSubj = "$NRG.P.%s"
raftRemovePeerSubj = "$NRG.RP.%s"
raftReplySubj = "$NRG.R.%s"
@@ -897,9 +931,8 @@ func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) {
}
func (n *raft) createInternalSubs() error {
cn := n.s.ClusterName()
n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, cn, n.group), n.newInbox(cn)
n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, cn, n.group), n.newInbox(cn)
n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox()
n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox()
n.psubj = fmt.Sprintf(raftPropSubj, n.group)
n.rpsubj = fmt.Sprintf(raftRemovePeerSubj, n.group)
@@ -1732,7 +1765,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
pterm: n.pterm,
pindex: n.pindex,
}
inbox := n.newInbox(n.s.ClusterName())
inbox := n.newInbox()
sub, _ := n.subscribe(inbox, n.handleAppendEntry)
n.catchup.sub = sub
return inbox

View File

@@ -1416,7 +1416,8 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
if !exists {
s.routes[c.cid] = c
s.remotes[id] = c
s.nodeToName[c.route.hash] = c.route.remoteName
s.nodeToName.Store(c.route.hash, c.route.remoteName)
s.nodeToCluster.Store(c.route.hash, s.info.Cluster)
c.mu.Lock()
c.route.connectURLs = info.ClientConnectURLs
c.route.wsConnURLs = info.WSConnectURLs

View File

@@ -238,9 +238,9 @@ type Server struct {
rnMu sync.RWMutex
raftNodes map[string]RaftNode
// For mapping from a node name back to a server name.
// Normal server lock here.
nodeToName map[string]string
// For mapping from a node name back to a server name and cluster.
nodeToName sync.Map
nodeToCluster sync.Map
}
// Make sure all are 64bits for atomic use
@@ -324,7 +324,6 @@ func NewServer(opts *Options) (*Server, error) {
httpBasePath: httpBasePath,
eventIds: nuid.New(),
routesToSelf: make(map[string]struct{}),
nodeToName: make(map[string]string),
}
// Trusted root operator keys.
@@ -332,11 +331,20 @@ func NewServer(opts *Options) (*Server, error) {
return nil, fmt.Errorf("Error processing trusted operator keys")
}
if opts.Cluster.Name != _EMPTY_ {
// Also place into mapping cn with cnMu lock.
s.cnMu.Lock()
s.cn = opts.Cluster.Name
s.cnMu.Unlock()
}
s.mu.Lock()
defer s.mu.Unlock()
// Place ourselves.
s.nodeToName[string(getHash(serverName))] = serverName
// Place ourselves in some lookup maps.
ourId := string(getHash(serverName))
s.nodeToName.Store(ourId, serverName)
s.nodeToCluster.Store(ourId, opts.Cluster.Name)
s.routeResolver = opts.Cluster.resolver
if s.routeResolver == nil {
@@ -789,10 +797,11 @@ func (s *Server) activePeers() (peers []string) {
if s.sys == nil {
return nil
}
// FIXME(dlc) - When this spans supercluster need to adjust below.
for _, r := range s.routes {
peers = append(peers, r.route.hash)
}
s.nodeToName.Range(func(k, v interface{}) bool {
peers = append(peers, k.(string))
return true
})
return peers
}

View File

@@ -51,6 +51,7 @@ type StreamConfig struct {
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
// These are non public configuration options.
// If you add new options, check fileStreamInfoJSON in order for them to

View File

@@ -322,7 +322,6 @@ func TestJetStreamClusterCompaction(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
}
nc.Flush()
if _, err := js.StreamInfo("TEST"); err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -3097,6 +3096,84 @@ func TestJetStreamClusterRemoveServer(t *testing.T) {
})
}
func TestJetStreamClusterSuperClusterBasics(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Client based API
s := sc.randomCluster().randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 10 messages.
msg, toSend := []byte("Hello JS Clustering"), 10
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// Now grab info for this stream.
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "TEST" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
// Check active state as well, shows that the owner answered.
if si.State.Msgs != uint64(toSend) {
t.Fatalf("Expected %d msgs, got bad state: %+v", toSend, si.State)
}
// Check request origin placement.
if si.Cluster.Name != s.ClusterName() {
t.Fatalf("Expected stream to be placed in %q, but got %q", s.ClusterName(), si.Cluster.Name)
}
// Check consumers.
sub, err := js.SubscribeSync("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, toSend)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.Delivered.Consumer != uint64(toSend) || ci.NumAckPending != toSend {
t.Fatalf("ConsumerInfo is not correct: %+v", ci)
}
// Now check we can place a stream.
// Need to do this by hand for now until Go client catches up.
pcn := "C3"
cfg := server.StreamConfig{
Name: "TEST2",
Storage: server.FileStorage,
Placement: &server.Placement{Cluster: pcn},
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, _ := nc.Request(fmt.Sprintf(server.JSApiStreamCreateT, cfg.Name), req, time.Second)
var scResp server.JSApiStreamCreateResponse
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.StreamInfo == nil || scResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
}
if scResp.StreamInfo.Cluster.Name != pcn {
t.Fatalf("Expected the stream to be placed in %q, got %q", pcn, scResp.StreamInfo.Cluster.Name)
}
}
func TestJetStreamClusterStreamPerf(t *testing.T) {
// Comment out to run, holding place for now.
skip(t)
@@ -3166,6 +3243,7 @@ var jsClusterTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
cluster {
name: %s
listen: 127.0.0.1:%d
@@ -3173,6 +3251,114 @@ var jsClusterTempl = `
}
`
var jsSuperClusterTempl = `
%s
gateway {
name: %s
listen: 127.0.0.1:%d
gateways = [%s
]
}
`
var jsGWTempl = `%s{name: %s, urls: [%s]}`
func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) *supercluster {
t.Helper()
if numServersPer < 1 {
t.Fatalf("Number of servers must be >= 1")
}
if numClusters <= 1 {
t.Fatalf("Number of clusters must be > 1")
}
const (
startClusterPort = 33222
startGWPort = 11222
)
// Make the GWs form faster for the tests.
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()
cp, gp := startClusterPort, startGWPort
var clusters []*cluster
var gws []string
// Build GWs first, will be same for all servers.
for i, port := 1, gp; i <= numClusters; i++ {
cn := fmt.Sprintf("C%d", i)
var urls []string
for n := 0; n < numServersPer; n++ {
urls = append(urls, fmt.Sprintf("nats-route://127.0.0.1:%d", port))
port++
}
gws = append(gws, fmt.Sprintf(jsGWTempl, "\n\t\t\t", cn, strings.Join(urls, ",")))
}
gwconf := strings.Join(gws, "")
for i := 1; i <= numClusters; i++ {
cn := fmt.Sprintf("C%d", i)
// Go ahead and build configurations.
c := &cluster{servers: make([]*server.Server, 0, numServersPer), opts: make([]*server.Options, 0, numServersPer), name: cn}
// Build out the routes that will be shared with all configs.
var routes []string
for port := cp; port < cp+numServersPer; port++ {
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", port))
}
routeConfig := strings.Join(routes, ",")
for i := 0; i < numServersPer; i++ {
storeDir, _ := ioutil.TempDir("", server.JetStreamStoreDir)
sn := fmt.Sprintf("%s-S%d", cn, i+1)
bconf := fmt.Sprintf(jsClusterTempl, sn, storeDir, cn, cp+i, routeConfig)
conf := fmt.Sprintf(jsSuperClusterTempl, bconf, cn, gp, gwconf)
gp++
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
c.servers = append(c.servers, s)
c.opts = append(c.opts, o)
}
checkClusterFormed(t, c.servers...)
clusters = append(clusters, c)
cp += numServersPer
c.t = t
}
// Wait for the supercluster to be formed.
egws := numClusters - 1
for _, c := range clusters {
for _, s := range c.servers {
waitForOutboundGateways(t, s, egws, 2*time.Second)
}
}
sc := &supercluster{t, clusters}
sc.waitOnLeader()
return sc
}
func (sc *supercluster) waitOnLeader() {
expires := time.Now().Add(5 * time.Second)
for time.Now().Before(expires) {
for _, c := range sc.clusters {
if leader := c.leader(); leader != nil {
time.Sleep(200 * time.Millisecond)
return
}
}
time.Sleep(25 * time.Millisecond)
}
sc.t.Fatalf("Expected a cluster leader, got none")
}
func (sc *supercluster) randomCluster() *cluster {
clusters := append(sc.clusters[:0:0], sc.clusters...)
rand.Shuffle(len(clusters), func(i, j int) { clusters[i], clusters[j] = clusters[j], clusters[i] })
return clusters[0]
}
// This will create a cluster that is explicitly configured for the routes, etc.
// and also has a defined clustername. All configs for routes and cluster name will be the same.
func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster {

View File

@@ -35,10 +35,14 @@ import (
// Used to setup superclusters for tests.
type supercluster struct {
t *testing.T
clusters []*cluster
}
func (sc *supercluster) shutdown() {
if sc == nil {
return
}
for _, c := range sc.clusters {
shutdownCluster(c)
}
@@ -66,7 +70,7 @@ func createSuperCluster(t *testing.T, numServersPer, numClusters int) *superclus
c := createClusterEx(t, true, 5*time.Millisecond, true, randClusterName(), numServersPer, clusters...)
clusters = append(clusters, c)
}
return &supercluster{clusters}
return &supercluster{t, clusters}
}
func (sc *supercluster) setResponseThreshold(t *testing.T, maxTime time.Duration) {