mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Fixed a condition where JetStream assets could be created in multiple leafnodes.
Also added in optional Domain to StreamInfo. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1670,6 +1670,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
Created: mset.createdTime(),
|
||||
State: mset.stateWithDetail(details),
|
||||
Config: config,
|
||||
Domain: s.getOpts().JetStreamDomain,
|
||||
Cluster: js.clusterInfo(mset.raftGroup()),
|
||||
}
|
||||
if mset.isMirror() {
|
||||
|
||||
@@ -6673,6 +6673,80 @@ func TestJetStreamClusterDomains(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterDomainsWithNoJSHub(t *testing.T) {
|
||||
// Create our hub cluster with no JetStream defined.
|
||||
c := createMixedModeCluster(t, jsClusterAccountsTempl, "NOJS5", _EMPTY_, 0, 5, false)
|
||||
defer c.shutdown()
|
||||
|
||||
ln := c.createSingleLeafNodeNoSystemAccountAndEnablesJetStream()
|
||||
defer ln.Shutdown()
|
||||
|
||||
lnd := c.createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain("SPOKE", "nojs")
|
||||
defer lnd.Shutdown()
|
||||
|
||||
// Client based API - Connected to the core cluster with no JS but account has JS.
|
||||
s := c.randomServer()
|
||||
nc, _ := jsClientConnect(t, s, nats.UserInfo("nojs", "p"))
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
}
|
||||
req, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Do by hand to make sure we only get one response.
|
||||
sis := fmt.Sprintf(JSApiStreamCreateT, "TEST")
|
||||
rs := nats.NewInbox()
|
||||
sub, _ := nc.SubscribeSync(rs)
|
||||
nc.PublishRequest(sis, rs, req)
|
||||
// Wait a bit for responses.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
if nr, _, err := sub.Pending(); err != nil || nr != 1 {
|
||||
t.Fatalf("Expected 1 response, got %d and %v", nr, err)
|
||||
}
|
||||
resp, _ := sub.NextMsg(time.Second)
|
||||
|
||||
// This StreamInfo should *not* have a domain set.
|
||||
// Do by hand until this makes it to the Go client.
|
||||
var si StreamInfo
|
||||
if err = json.Unmarshal(resp.Data, &si); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.Domain != _EMPTY_ {
|
||||
t.Fatalf("Expected to have NO domain set but got %q", si.Domain)
|
||||
}
|
||||
|
||||
// Now let's create a stream specifically on the SPOKE domain.
|
||||
js, err := nc.JetStream(nats.Domain("SPOKE"))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error getting JetStream context: %v", err)
|
||||
}
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST22",
|
||||
Subjects: []string{"bar"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Now lookup by hand to check domain.
|
||||
resp, err = nc.Request("$JS.SPOKE.API.STREAM.INFO.TEST22", nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if err = json.Unmarshal(resp.Data, &si); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.Domain != "SPOKE" {
|
||||
t.Fatalf("Expected to have domain set to %q but got %q", "SPOKE", si.Domain)
|
||||
}
|
||||
}
|
||||
|
||||
// Issue #2205
|
||||
func TestJetStreamClusterDomainsAndAPIResponses(t *testing.T) {
|
||||
// This adds in domain config option to template.
|
||||
@@ -8905,7 +8979,7 @@ var jsClusterImportsTempl = `
|
||||
func createMixedModeCluster(t *testing.T, tmpl string, clusterName, snPre string, numJsServers, numNonServers int, doJSConfig bool) *cluster {
|
||||
t.Helper()
|
||||
|
||||
if clusterName == _EMPTY_ || numJsServers < 1 || numNonServers < 1 {
|
||||
if clusterName == _EMPTY_ || numJsServers < 0 || numNonServers < 1 {
|
||||
t.Fatalf("Bad params")
|
||||
}
|
||||
|
||||
@@ -8947,7 +9021,9 @@ func createMixedModeCluster(t *testing.T, tmpl string, clusterName, snPre string
|
||||
|
||||
// Wait til we are formed and have a leader.
|
||||
c.checkClusterFormed()
|
||||
c.waitOnPeerCount(numJsServers)
|
||||
if numJsServers > 0 {
|
||||
c.waitOnPeerCount(numJsServers)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
@@ -9030,10 +9106,19 @@ func (c *cluster) createSingleLeafNodeNoSystemAccount() *Server {
|
||||
|
||||
// This is tied to jsClusterAccountsTempl, so changes there to users needs to be reflected here.
|
||||
func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStream() *Server {
|
||||
return c.createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain(_EMPTY_, "nojs")
|
||||
}
|
||||
|
||||
func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain(domain, user string) *Server {
|
||||
tmpl := jsClusterSingleLeafNodeLikeNGSTempl
|
||||
if domain != _EMPTY_ {
|
||||
nsc := fmt.Sprintf("domain: %s, store_dir:", domain)
|
||||
tmpl = strings.Replace(jsClusterSingleLeafNodeLikeNGSTempl, "store_dir:", nsc, 1)
|
||||
}
|
||||
as := c.randomServer()
|
||||
lno := as.getOpts().LeafNode
|
||||
ln := fmt.Sprintf("nats://nojs:p@%s:%d", lno.Host, lno.Port)
|
||||
conf := fmt.Sprintf(jsClusterSingleLeafNodeLikeNGSTempl, createDir(c.t, JetStreamStoreDir), ln)
|
||||
ln := fmt.Sprintf("nats://%s:p@%s:%d", user, lno.Host, lno.Port)
|
||||
conf := fmt.Sprintf(tmpl, createDir(c.t, JetStreamStoreDir), ln)
|
||||
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, o)
|
||||
|
||||
@@ -1076,11 +1076,15 @@ func (c *client) processLeafnodeInfo(info *Info) {
|
||||
// account also has JetStream enabled.
|
||||
if accHasJS {
|
||||
s.addInJSDenyExport(remote)
|
||||
// If we specified a domain do not import by default.
|
||||
if hasJSDomain {
|
||||
s.addInJSDenyImport(remote)
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we have a specified JetStream domain we will want to add a mapping to
|
||||
// allow access cross domain for each non-system account.
|
||||
if hasJSDomain && acc.jetStreamConfigured() {
|
||||
if hasJSDomain && accHasJS {
|
||||
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
|
||||
if err := acc.AddMapping(src, jsAllAPI); err != nil {
|
||||
c.Debugf("Error adding JetStream domain mapping: %v", err)
|
||||
|
||||
@@ -88,6 +88,7 @@ type StreamInfo struct {
|
||||
Config StreamConfig `json:"config"`
|
||||
Created time.Time `json:"created"`
|
||||
State StreamState `json:"state"`
|
||||
Domain string `json:"domain,omitempty"`
|
||||
Cluster *ClusterInfo `json:"cluster,omitempty"`
|
||||
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
|
||||
Sources []*StreamSourceInfo `json:"sources,omitempty"`
|
||||
|
||||
Reference in New Issue
Block a user