mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
@@ -307,6 +307,165 @@ func TestSystemAccountNewConnection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func runTrustedLeafServer(t *testing.T) (*Server, *Options) {
|
||||
t.Helper()
|
||||
opts := DefaultOptions()
|
||||
kp, _ := nkeys.FromSeed(oSeed)
|
||||
pub, _ := kp.PublicKey()
|
||||
opts.TrustedKeys = []string{pub}
|
||||
opts.AccountResolver = &MemAccResolver{}
|
||||
opts.LeafNode.Port = -1
|
||||
s := RunServer(opts)
|
||||
return s, opts
|
||||
}
|
||||
|
||||
func genCredsFile(t *testing.T, jwt string, seed []byte) string {
|
||||
creds := `
|
||||
-----BEGIN NATS USER JWT-----
|
||||
%s
|
||||
------END NATS USER JWT------
|
||||
|
||||
************************* IMPORTANT *************************
|
||||
NKEY Seed printed below can be used to sign and prove identity.
|
||||
NKEYs are sensitive and should be treated as secrets.
|
||||
|
||||
-----BEGIN USER NKEY SEED-----
|
||||
%s
|
||||
------END USER NKEY SEED------
|
||||
|
||||
*************************************************************
|
||||
`
|
||||
return createConfFile(t, []byte(strings.Replace(fmt.Sprintf(creds, jwt, seed), "\t\t", "", -1)))
|
||||
}
|
||||
|
||||
func runSolicitWithCredentials(t *testing.T, opts *Options, creds string) (*Server, *Options, string) {
|
||||
content := `
|
||||
port: -1
|
||||
leafnodes {
|
||||
remotes = [
|
||||
{
|
||||
url: nats-leaf://127.0.0.1:%d
|
||||
credentials: "%s"
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
config := fmt.Sprintf(content, opts.LeafNode.Port, creds)
|
||||
conf := createConfFile(t, []byte(config))
|
||||
s, opts := RunServerWithConfig(conf)
|
||||
return s, opts, conf
|
||||
}
|
||||
|
||||
// Helper function to check that a leaf node has connected to our server.
|
||||
func checkLeafNodeConnected(t *testing.T, s *Server) {
|
||||
t.Helper()
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
if nln := s.NumLeafNodes(); nln != 1 {
|
||||
return fmt.Errorf("Expected a connected leafnode for server %q, got none", s.ID())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestSystemAccountingWithLeafNodes(t *testing.T) {
|
||||
s, opts := runTrustedLeafServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
acc, akp := createAccount(s)
|
||||
s.setSystemAccount(acc)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer ncs.Close()
|
||||
|
||||
acc2, akp2 := createAccount(s)
|
||||
|
||||
// Be explicit to only receive the event for global account.
|
||||
sub, _ := ncs.SubscribeSync(fmt.Sprintf("$SYS.ACCOUNT.%s.DISCONNECT", acc2.Name))
|
||||
defer sub.Unsubscribe()
|
||||
ncs.Flush()
|
||||
|
||||
kp, _ := nkeys.CreateUser()
|
||||
pub, _ := kp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(pub)
|
||||
ujwt, err := nuc.Encode(akp2)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
seed, _ := kp.Seed()
|
||||
mycreds := genCredsFile(t, ujwt, seed)
|
||||
defer os.Remove(mycreds)
|
||||
|
||||
// Create a server that solicits a leafnode connection.
|
||||
sl, slopts, lnconf := runSolicitWithCredentials(t, opts, mycreds)
|
||||
defer os.Remove(lnconf)
|
||||
defer sl.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
|
||||
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST LEAFNODE EVENTS"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
nc.SubscribeSync("foo")
|
||||
nc.Flush()
|
||||
|
||||
surl := fmt.Sprintf("nats://%s:%d", slopts.Host, slopts.Port)
|
||||
nc2, err := nats.Connect(surl, nats.Name("TEST LEAFNODE EVENTS"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
m := []byte("HELLO WORLD")
|
||||
|
||||
// Now generate some traffic
|
||||
nc2.SubscribeSync("*")
|
||||
for i := 0; i < 10; i++ {
|
||||
nc2.Publish("foo", m)
|
||||
nc2.Publish("bar", m)
|
||||
}
|
||||
nc2.Flush()
|
||||
|
||||
// Now send some from the cluster side too.
|
||||
for i := 0; i < 10; i++ {
|
||||
nc.Publish("foo", m)
|
||||
nc.Publish("bar", m)
|
||||
}
|
||||
|
||||
// Now shutdown the leafnode server since this is where the event tracking should
|
||||
// happen. Right now we do not track local clients to the leafnode server that
|
||||
// solicited to the cluster, but we should track usage once the leafnode connection stops.
|
||||
sl.Shutdown()
|
||||
|
||||
// Make sure we get disconnect event and that tracking is correct.
|
||||
msg, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
|
||||
dem := DisconnectEventMsg{}
|
||||
if err := json.Unmarshal(msg.Data, &dem); err != nil {
|
||||
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
|
||||
}
|
||||
if dem.Sent.Msgs != 10 {
|
||||
t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs)
|
||||
}
|
||||
if dem.Sent.Bytes != 110 {
|
||||
t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes)
|
||||
}
|
||||
if dem.Received.Msgs != 20 {
|
||||
t.Fatalf("Expected 20 msgs received, got %d", dem.Sent.Msgs)
|
||||
}
|
||||
if dem.Received.Bytes != 220 {
|
||||
t.Fatalf("Expected 220 bytes sent, got %d", dem.Sent.Bytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSystemAccountDisconnectBadLogin(t *testing.T) {
|
||||
s, opts := runTrustedServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user