mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge pull request #212 from nats-io/cluster_auth_bcrypt
Support for bcrypt in cluster auth
This commit is contained in:
4
TODO.md
4
TODO.md
@@ -3,8 +3,10 @@
|
||||
|
||||
- [ ] Multiple Auth
|
||||
- [ ] Authorization / Access
|
||||
- [ ] T series reservations
|
||||
= [ ] _SYS. server events?
|
||||
- [ ] No downtime restart
|
||||
- [ ] Signal based reload of configuration.
|
||||
- [ ] Signal based reload of configuration
|
||||
- [ ] Dynamic socket buffer sizes
|
||||
- [ ] Switch to 1.4/1.5 and use maps vs hashmaps in sublist
|
||||
- [ ] brew, apt-get, rpm, chocately (windows)
|
||||
|
||||
13
gnatsd.go
13
gnatsd.go
@@ -139,17 +139,26 @@ func main() {
|
||||
}
|
||||
|
||||
func configureAuth(s *server.Server, opts *server.Options) {
|
||||
// Client
|
||||
if opts.Username != "" {
|
||||
auth := &auth.Plain{
|
||||
Username: opts.Username,
|
||||
Password: opts.Password,
|
||||
}
|
||||
s.SetAuthMethod(auth)
|
||||
s.SetClientAuthMethod(auth)
|
||||
} else if opts.Authorization != "" {
|
||||
auth := &auth.Token{
|
||||
Token: opts.Authorization,
|
||||
}
|
||||
s.SetAuthMethod(auth)
|
||||
s.SetClientAuthMethod(auth)
|
||||
}
|
||||
// Routes
|
||||
if opts.ClusterUsername != "" {
|
||||
auth := &auth.Plain{
|
||||
Username: opts.ClusterUsername,
|
||||
Password: opts.ClusterPassword,
|
||||
}
|
||||
s.SetRouteAuthMethod(auth)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -247,7 +247,12 @@ func (c *client) processInfo(arg []byte) error {
|
||||
}
|
||||
|
||||
func (c *client) processErr(errStr string) {
|
||||
c.Errorf("Client error %s", errStr)
|
||||
switch c.typ {
|
||||
case CLIENT:
|
||||
c.Errorf("Client Error %s", errStr)
|
||||
case ROUTER:
|
||||
c.Errorf("Route Error %s", errStr)
|
||||
}
|
||||
c.closeConnection()
|
||||
}
|
||||
|
||||
@@ -285,7 +290,7 @@ func (c *client) processConnect(arg []byte) error {
|
||||
}
|
||||
|
||||
func (c *client) authTimeout() {
|
||||
c.sendErr("Authorization Timeout")
|
||||
c.sendErr(ErrAuthTimeout.Error())
|
||||
c.Debugf("Authorization Timeout")
|
||||
c.closeConnection()
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func rawSetup(serverOptions Options) (*Server, *client, *bufio.Reader, string) {
|
||||
cr := bufio.NewReaderSize(cli, DEFAULT_BUF_SIZE)
|
||||
s := New(&serverOptions)
|
||||
if serverOptions.Authorization != "" {
|
||||
s.SetAuthMethod(&mockAuth{})
|
||||
s.SetClientAuthMethod(&mockAuth{})
|
||||
}
|
||||
|
||||
ch := make(chan *client)
|
||||
@@ -535,7 +535,7 @@ func TestAuthorizationTimeout(t *testing.T) {
|
||||
serverOptions.Authorization = "my_token"
|
||||
serverOptions.AuthTimeout = 1
|
||||
s, _, cr, _ := rawSetup(serverOptions)
|
||||
s.SetAuthMethod(&mockAuth{})
|
||||
s.SetClientAuthMethod(&mockAuth{})
|
||||
|
||||
time.Sleep(secondsToDuration(serverOptions.AuthTimeout))
|
||||
l, err := cr.ReadString('\n')
|
||||
|
||||
32
server/configs/srv_a_bcrypt.conf
Normal file
32
server/configs/srv_a_bcrypt.conf
Normal file
@@ -0,0 +1,32 @@
|
||||
# Copyright 2012-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
# Cluster Server A
|
||||
|
||||
host: '127.0.0.1'
|
||||
port: 4222
|
||||
|
||||
authorization {
|
||||
user: user
|
||||
password: foo
|
||||
timeout: 2
|
||||
}
|
||||
|
||||
cluster {
|
||||
host: '127.0.0.1'
|
||||
port: 4244
|
||||
|
||||
authorization {
|
||||
user: ruser
|
||||
# bcrypt version of 'bar'
|
||||
password: $2a$11$lnaSz3ya7RQ3QK9T9pBPyen1WRLz4QGLu6mI3kC701NUWcBO0bml6
|
||||
timeout: 2
|
||||
}
|
||||
|
||||
# Routes are actively solicited and connected to from this server.
|
||||
# Other servers can connect to us if they supply the correct credentials
|
||||
# in their routes definitions from above.
|
||||
|
||||
routes = [
|
||||
nats-route://ruser:bar@127.0.0.1:4246
|
||||
]
|
||||
}
|
||||
32
server/configs/srv_b_bcrypt.conf
Normal file
32
server/configs/srv_b_bcrypt.conf
Normal file
@@ -0,0 +1,32 @@
|
||||
# Copyright 2012-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
# Cluster Server B
|
||||
|
||||
host: '127.0.0.1'
|
||||
port: 4224
|
||||
|
||||
authorization {
|
||||
user: user
|
||||
password: foo
|
||||
timeout: 2
|
||||
}
|
||||
|
||||
cluster {
|
||||
host: '127.0.0.1'
|
||||
port: 4246
|
||||
|
||||
authorization {
|
||||
user: ruser
|
||||
# bcrypt version of 'bar'
|
||||
password: $2a$11$lnaSz3ya7RQ3QK9T9pBPyen1WRLz4QGLu6mI3kC701NUWcBO0bml6
|
||||
timeout: 2
|
||||
}
|
||||
|
||||
# Routes are actively solicited and connected to from this server.
|
||||
# Other servers can connect to us if they supply the correct credentials
|
||||
# in their routes definitions from above.
|
||||
|
||||
routes = [
|
||||
nats-route://ruser:bar@127.0.0.1:4244
|
||||
]
|
||||
}
|
||||
@@ -11,6 +11,9 @@ var (
|
||||
// ErrAuthorization represents error condition on failed authorization.
|
||||
ErrAuthorization = errors.New("Authorization Error")
|
||||
|
||||
// ErrAuthTimeout represents error condition on failed authorization due to timeout.
|
||||
ErrAuthTimeout = errors.New("Authorization Timeout")
|
||||
|
||||
// ErrMaxPayload represents error condition when the payload is too big.
|
||||
ErrMaxPayload = errors.New("Maximum Payload Exceeded")
|
||||
)
|
||||
|
||||
@@ -75,6 +75,7 @@ func TestServerRoutesWithClients(t *testing.T) {
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
// Wait for route to form.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
nc2, err := nats.Connect(urlB)
|
||||
@@ -85,3 +86,49 @@ func TestServerRoutesWithClients(t *testing.T) {
|
||||
nc2.Publish("foo", []byte("Hello"))
|
||||
nc2.Flush()
|
||||
}
|
||||
|
||||
func TestServerRoutesWithAuthAndBCrypt(t *testing.T) {
|
||||
optsA, _ := ProcessConfigFile("./configs/srv_a_bcrypt.conf")
|
||||
optsB, _ := ProcessConfigFile("./configs/srv_b_bcrypt.conf")
|
||||
|
||||
optsA.NoSigs, optsA.NoLog = true, true
|
||||
optsB.NoSigs, optsB.NoLog = true, true
|
||||
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
|
||||
|
||||
// Wait for route to form.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
nc1, err := nats.Connect(urlA)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v\n", err)
|
||||
}
|
||||
defer nc1.Close()
|
||||
|
||||
// Test that we are connected.
|
||||
ch := make(chan bool)
|
||||
sub, _ := nc1.Subscribe("foo", func(m *nats.Msg) { ch <- true })
|
||||
nc1.Flush()
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
nc2, err := nats.Connect(urlB)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v\n", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
nc2.Publish("foo", []byte("Hello"))
|
||||
|
||||
// Wait for message
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Timeout waiting for message across route")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,8 @@ type Server struct {
|
||||
infoJSON []byte
|
||||
sl *sublist.Sublist
|
||||
opts *Options
|
||||
auth Auth
|
||||
cAuth Auth
|
||||
rAuth Auth
|
||||
trace bool
|
||||
debug bool
|
||||
running bool
|
||||
@@ -126,17 +127,24 @@ func New(opts *Options) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// Sets the authentication method
|
||||
func (s *Server) SetAuthMethod(authMethod Auth) {
|
||||
// Sets the authentication method for clients.
|
||||
func (s *Server) SetClientAuthMethod(authMethod Auth) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.info.AuthRequired = true
|
||||
s.auth = authMethod
|
||||
s.cAuth = authMethod
|
||||
|
||||
s.generateServerInfoJSON()
|
||||
}
|
||||
|
||||
// Sets the authentication method for routes.
|
||||
func (s *Server) SetRouteAuthMethod(authMethod Auth) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.rAuth = authMethod
|
||||
}
|
||||
|
||||
func (s *Server) generateServerInfoJSON() {
|
||||
// Generate the info json
|
||||
b, err := json.Marshal(s.info)
|
||||
@@ -612,22 +620,17 @@ func (s *Server) sendInfo(c *client, info []byte) {
|
||||
}
|
||||
|
||||
func (s *Server) checkClientAuth(c *client) bool {
|
||||
if s.auth == nil {
|
||||
if s.cAuth == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return s.auth.Check(c)
|
||||
return s.cAuth.Check(c)
|
||||
}
|
||||
|
||||
func (s *Server) checkRouterAuth(c *client) bool {
|
||||
if !s.routeInfo.AuthRequired {
|
||||
if s.rAuth == nil {
|
||||
return true
|
||||
}
|
||||
if s.opts.ClusterUsername != c.opts.Username ||
|
||||
s.opts.ClusterPassword != c.opts.Password {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return s.rAuth.Check(c)
|
||||
}
|
||||
|
||||
// Check auth and return boolean indicating if client is ok
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
// Copyright 2012-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
@@ -83,7 +83,7 @@ func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server {
|
||||
}
|
||||
|
||||
if auth != nil {
|
||||
s.SetAuthMethod(auth)
|
||||
s.SetClientAuthMethod(auth)
|
||||
}
|
||||
|
||||
// Run server in Go routine.
|
||||
|
||||
Reference in New Issue
Block a user