mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Allow mixed TLS and non-TLS on same port
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
2
go.mod
2
go.mod
@@ -3,7 +3,7 @@ module github.com/nats-io/nats-server/v2
|
||||
require (
|
||||
github.com/minio/highwayhash v1.0.0
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed
|
||||
github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f
|
||||
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a
|
||||
github.com/nats-io/nkeys v0.2.0
|
||||
github.com/nats-io/nuid v1.0.1
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
|
||||
|
||||
4
go.sum
4
go.sum
@@ -20,8 +20,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1
|
||||
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
|
||||
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f h1:ILS2coqvxP2B/pptNWB5xLu626mIiflILpKwEvma9eQ=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a h1:gzSKZOBlu/DpbuPbt34paXCOvA6+E+lVfU2BmomQ9HA=
|
||||
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
|
||||
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=
|
||||
|
||||
@@ -868,7 +868,7 @@ func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
|
||||
// readLoop is the main socket read functionality.
|
||||
// Runs in its own Go routine.
|
||||
func (c *client) readLoop() {
|
||||
func (c *client) readLoop(pre []byte) {
|
||||
// Grab the connection off the client, it will be cleared on a close.
|
||||
// We check for that after the loop, but want to avoid a nil dereference
|
||||
c.mu.Lock()
|
||||
@@ -918,6 +918,11 @@ func (c *client) readLoop() {
|
||||
wsr.init()
|
||||
}
|
||||
|
||||
// If we have a pre parse that first.
|
||||
if len(pre) > 0 {
|
||||
c.parse(pre)
|
||||
}
|
||||
|
||||
for {
|
||||
n, err := nc.Read(b)
|
||||
// If we have any data we will try to parse and exit at the end.
|
||||
|
||||
@@ -846,7 +846,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
|
||||
}
|
||||
|
||||
// Spin up the read loop.
|
||||
s.startGoRoutine(func() { c.readLoop() })
|
||||
s.startGoRoutine(func() { c.readLoop(nil) })
|
||||
|
||||
// Spin up the write loop.
|
||||
s.startGoRoutine(func() { c.writeLoop() })
|
||||
|
||||
@@ -813,7 +813,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
}
|
||||
|
||||
// Spin up the read loop.
|
||||
s.startGoRoutine(func() { c.readLoop() })
|
||||
s.startGoRoutine(func() { c.readLoop(nil) })
|
||||
|
||||
// Spin up the write loop.
|
||||
s.startGoRoutine(func() { c.writeLoop() })
|
||||
|
||||
@@ -208,6 +208,7 @@ type Options struct {
|
||||
TLSKey string `json:"-"`
|
||||
TLSCaCert string `json:"-"`
|
||||
TLSConfig *tls.Config `json:"-"`
|
||||
AllowNonTLS bool `json:"-"`
|
||||
WriteDeadline time.Duration `json:"-"`
|
||||
MaxClosedClients int `json:"-"`
|
||||
LameDuckDuration time.Duration `json:"-"`
|
||||
@@ -714,6 +715,9 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
|
||||
}
|
||||
o.TLSTimeout = tc.Timeout
|
||||
o.TLSMap = tc.Map
|
||||
|
||||
case "allow_non_tls":
|
||||
o.AllowNonTLS = v.(bool)
|
||||
case "write_deadline":
|
||||
o.WriteDeadline = parseDuration("write_deadline", tk, v, errors, warnings)
|
||||
case "lame_duck_duration":
|
||||
|
||||
@@ -1222,7 +1222,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
}
|
||||
|
||||
// Spin up the read loop.
|
||||
s.startGoRoutine(func() { c.readLoop() })
|
||||
s.startGoRoutine(func() { c.readLoop(nil) })
|
||||
|
||||
// Spin up the write loop.
|
||||
s.startGoRoutine(func() { c.writeLoop() })
|
||||
|
||||
@@ -73,6 +73,7 @@ type Info struct {
|
||||
AuthRequired bool `json:"auth_required,omitempty"`
|
||||
TLSRequired bool `json:"tls_required,omitempty"`
|
||||
TLSVerify bool `json:"tls_verify,omitempty"`
|
||||
TLSAvailable bool `json:"tls_available,omitempty"`
|
||||
MaxPayload int32 `json:"max_payload"`
|
||||
JetStream bool `json:"jetstream,omitempty"`
|
||||
IP string `json:"ip,omitempty"`
|
||||
@@ -275,13 +276,17 @@ func NewServer(opts *Options) (*Server, error) {
|
||||
Host: opts.Host,
|
||||
Port: opts.Port,
|
||||
AuthRequired: false,
|
||||
TLSRequired: tlsReq,
|
||||
TLSRequired: tlsReq && !opts.AllowNonTLS,
|
||||
TLSVerify: verify,
|
||||
MaxPayload: opts.MaxPayload,
|
||||
JetStream: opts.JetStream,
|
||||
Headers: !opts.NoHeaderSupport,
|
||||
}
|
||||
|
||||
if tlsReq && !info.TLSRequired {
|
||||
info.TLSAvailable = true
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
s := &Server{
|
||||
@@ -1856,6 +1861,24 @@ func (s *Server) copyInfo() Info {
|
||||
return info
|
||||
}
|
||||
|
||||
// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port.
|
||||
type tlsMixConn struct {
|
||||
net.Conn
|
||||
pre *bytes.Buffer
|
||||
}
|
||||
|
||||
// Read for our mixed multi-reader.
|
||||
func (c *tlsMixConn) Read(b []byte) (int, error) {
|
||||
if c.pre != nil {
|
||||
n, err := c.pre.Read(b)
|
||||
if c.pre.Len() == 0 {
|
||||
c.pre = nil
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
return c.Conn.Read(b)
|
||||
}
|
||||
|
||||
func (s *Server) createClient(conn net.Conn, ws *websocket) *client {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
@@ -1902,7 +1925,6 @@ func (s *Server) createClient(conn net.Conn, ws *websocket) *client {
|
||||
// TLS handshake is done (if applicable).
|
||||
c.sendProtoNow(c.generateClientInfoJSON(info))
|
||||
|
||||
tlsRequired := ws == nil && info.TLSRequired
|
||||
// Unlock to register
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -1930,9 +1952,34 @@ func (s *Server) createClient(conn net.Conn, ws *websocket) *client {
|
||||
// Re-Grab lock
|
||||
c.mu.Lock()
|
||||
|
||||
tlsRequired := ws == nil && info.TLSRequired
|
||||
var pre []byte
|
||||
// If we have both TLS and non-TLS allowed we need to see which
|
||||
// one the client wants.
|
||||
if opts.TLSConfig != nil && opts.AllowNonTLS {
|
||||
pre = make([]byte, 8)
|
||||
c.nc.SetReadDeadline(time.Now().Add(25 * time.Millisecond))
|
||||
n, err := c.nc.Read(pre[:])
|
||||
c.nc.SetReadDeadline(time.Time{})
|
||||
pre = pre[:n]
|
||||
// Assume TLS unless we see nothing or CONNECT.
|
||||
if err != nil || bytes.Contains(pre, []byte("CONNECT")) {
|
||||
tlsRequired = false
|
||||
} else {
|
||||
tlsRequired = true
|
||||
}
|
||||
}
|
||||
|
||||
// Check for TLS
|
||||
if tlsRequired {
|
||||
c.Debugf("Starting TLS client connection handshake")
|
||||
// If we have a prebuffer create a multi-reader.
|
||||
if len(pre) > 0 {
|
||||
c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
|
||||
// Clear pre so it is not parsed.
|
||||
pre = nil
|
||||
}
|
||||
|
||||
c.nc = tls.Server(c.nc, opts.TLSConfig)
|
||||
conn := c.nc.(*tls.Conn)
|
||||
|
||||
@@ -1983,7 +2030,7 @@ func (s *Server) createClient(conn net.Conn, ws *websocket) *client {
|
||||
c.setPingTimer()
|
||||
|
||||
// Spin up the read loop.
|
||||
s.startGoRoutine(func() { c.readLoop() })
|
||||
s.startGoRoutine(func() { c.readLoop(pre) })
|
||||
|
||||
// Spin up the write loop.
|
||||
s.startGoRoutine(func() { c.writeLoop() })
|
||||
|
||||
15
test/configs/tls_mixed.conf
Normal file
15
test/configs/tls_mixed.conf
Normal file
@@ -0,0 +1,15 @@
|
||||
# Allow TLS and non TLS on same port.
|
||||
|
||||
listen: 127.0.0.1:-1
|
||||
|
||||
tls {
|
||||
# Server cert
|
||||
cert_file: "./configs/certs/server-cert.pem"
|
||||
# Server private key
|
||||
key_file: "./configs/certs/server-key.pem"
|
||||
# Specified time for handshake to complete
|
||||
timeout: 2
|
||||
}
|
||||
|
||||
# This allows non tls traffic on same port.
|
||||
allow_non_tls: true
|
||||
@@ -859,6 +859,55 @@ func TestTLSAuthorizationShortTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientTLSAndNonTLSConnections(t *testing.T) {
|
||||
s, opts := RunServerWithConfig("./configs/tls_mixed.conf")
|
||||
defer s.Shutdown()
|
||||
|
||||
surl := fmt.Sprintf("tls://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(surl, nats.RootCAs("./configs/certs/ca.pem"))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to connect with TLS: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
// Now also make sure we can connect through plain text.
|
||||
nurl := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc2, err := nats.Connect(nurl)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to connect without TLS: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
// Make sure they can go back and forth.
|
||||
sub, err := nc.SubscribeSync("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
sub2, err := nc2.SubscribeSync("bar")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
nc2.Flush()
|
||||
|
||||
nmsgs := 100
|
||||
for i := 0; i < nmsgs; i++ {
|
||||
nc2.Publish("foo", []byte("HELLO FROM PLAINTEXT"))
|
||||
nc.Publish("bar", []byte("HELLO FROM TLS"))
|
||||
}
|
||||
// Now wait for the messages.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if msgs, _, err := sub.Pending(); err != nil || msgs != nmsgs {
|
||||
return fmt.Errorf("Did not receive the correct number of messages: %d", msgs)
|
||||
}
|
||||
if msgs, _, err := sub2.Pending(); err != nil || msgs != nmsgs {
|
||||
return fmt.Errorf("Did not receive the correct number of messages: %d", msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func stressConnect(t *testing.T, wg *sync.WaitGroup, errCh chan error, url string, index int) {
|
||||
defer wg.Done()
|
||||
|
||||
|
||||
34
vendor/github.com/nats-io/nats.go/context.go
generated
vendored
34
vendor/github.com/nats-io/nats.go/context.go
generated
vendored
@@ -11,8 +11,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build go1.7
|
||||
|
||||
// A Go client for the NATS messaging system (https://nats.io).
|
||||
package nats
|
||||
|
||||
@@ -21,9 +19,33 @@ import (
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// RequestMsgWithContext takes a context, a subject and payload
|
||||
// in bytes and request expecting a single response.
|
||||
func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) {
|
||||
var hdr []byte
|
||||
var err error
|
||||
|
||||
if len(msg.Header) > 0 {
|
||||
if !nc.info.Headers {
|
||||
return nil, ErrHeadersNotSupported
|
||||
}
|
||||
|
||||
hdr, err = msg.headerBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return nc.requestWithContext(ctx, msg.Subject, hdr, msg.Data)
|
||||
}
|
||||
|
||||
// RequestWithContext takes a context, a subject and payload
|
||||
// in bytes and request expecting a single response.
|
||||
func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
|
||||
return nc.requestWithContext(ctx, subj, nil, data)
|
||||
}
|
||||
|
||||
func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
|
||||
if ctx == nil {
|
||||
return nil, ErrInvalidContext
|
||||
}
|
||||
@@ -40,10 +62,10 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte
|
||||
// If user wants the old style.
|
||||
if nc.Opts.UseOldRequestStyle {
|
||||
nc.mu.Unlock()
|
||||
return nc.oldRequestWithContext(ctx, subj, data)
|
||||
return nc.oldRequestWithContext(ctx, subj, hdr, data)
|
||||
}
|
||||
|
||||
mch, token, err := nc.createNewRequestAndSend(subj, data)
|
||||
mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -67,7 +89,7 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte
|
||||
}
|
||||
|
||||
// oldRequestWithContext utilizes inbox and subscription per request.
|
||||
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
|
||||
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
|
||||
inbox := NewInbox()
|
||||
ch := make(chan *Msg, RequestChanLen)
|
||||
|
||||
@@ -78,7 +100,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []b
|
||||
s.AutoUnsubscribe(1)
|
||||
defer s.Unsubscribe()
|
||||
|
||||
err = nc.PublishRequest(subj, inbox, data)
|
||||
err = nc.publish(subj, inbox, hdr, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
100
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
100
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
@@ -510,6 +510,31 @@ type Msg struct {
|
||||
barrier *barrierInfo
|
||||
}
|
||||
|
||||
func (m *Msg) headerBytes() ([]byte, error) {
|
||||
var hdr []byte
|
||||
if len(m.Header) == 0 {
|
||||
return hdr, nil
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
_, err := b.WriteString(hdrLine)
|
||||
if err != nil {
|
||||
return nil, ErrBadHeaderMsg
|
||||
}
|
||||
|
||||
err = m.Header.Write(&b)
|
||||
if err != nil {
|
||||
return nil, ErrBadHeaderMsg
|
||||
}
|
||||
|
||||
_, err = b.WriteString(crlf)
|
||||
if err != nil {
|
||||
return nil, ErrBadHeaderMsg
|
||||
}
|
||||
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
type barrierInfo struct {
|
||||
refs int64
|
||||
f func()
|
||||
@@ -543,6 +568,7 @@ type serverInfo struct {
|
||||
Version string `json:"version"`
|
||||
AuthRequired bool `json:"auth_required"`
|
||||
TLSRequired bool `json:"tls_required"`
|
||||
TLSAvailable bool `json:"tls_available"`
|
||||
Headers bool `json:"headers"`
|
||||
MaxPayload int64 `json:"max_payload"`
|
||||
ConnectURLs []string `json:"connect_urls,omitempty"`
|
||||
@@ -1576,7 +1602,7 @@ func (nc *Conn) checkForSecure() error {
|
||||
o := nc.Opts
|
||||
|
||||
// Check for mismatch in setups
|
||||
if o.Secure && !nc.info.TLSRequired {
|
||||
if o.Secure && !nc.info.TLSRequired && !nc.info.TLSAvailable {
|
||||
return ErrSecureConnWanted
|
||||
} else if nc.info.TLSRequired && !o.Secure {
|
||||
// Switch to Secure since server needs TLS.
|
||||
@@ -2687,18 +2713,21 @@ func (nc *Conn) PublishMsg(m *Msg) error {
|
||||
if m == nil {
|
||||
return ErrInvalidMsg
|
||||
}
|
||||
|
||||
var hdr []byte
|
||||
var err error
|
||||
|
||||
if len(m.Header) > 0 {
|
||||
if !nc.info.Headers {
|
||||
return ErrHeadersNotSupported
|
||||
}
|
||||
// FIXME(dlc) - Optimize
|
||||
var b bytes.Buffer
|
||||
b.WriteString(hdrLine)
|
||||
m.Header.Write(&b)
|
||||
b.WriteString(crlf)
|
||||
hdr = b.Bytes()
|
||||
|
||||
hdr, err = m.headerBytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nc.publish(m.Subject, m.Reply, hdr, m.Data)
|
||||
}
|
||||
|
||||
@@ -2874,7 +2903,7 @@ func (nc *Conn) respHandler(m *Msg) {
|
||||
}
|
||||
|
||||
// Helper to setup and send new request style requests. Return the chan to receive the response.
|
||||
func (nc *Conn) createNewRequestAndSend(subj string, data []byte) (chan *Msg, string, error) {
|
||||
func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) {
|
||||
// Do setup for the new style if needed.
|
||||
if nc.respMap == nil {
|
||||
nc.initNewResp()
|
||||
@@ -2898,28 +2927,55 @@ func (nc *Conn) createNewRequestAndSend(subj string, data []byte) (chan *Msg, st
|
||||
}
|
||||
nc.mu.Unlock()
|
||||
|
||||
if err := nc.PublishRequest(subj, respInbox, data); err != nil {
|
||||
if err := nc.publish(subj, respInbox, hdr, data); err != nil {
|
||||
return nil, token, err
|
||||
}
|
||||
|
||||
return mch, token, nil
|
||||
}
|
||||
|
||||
// RequestMsg will send a request payload including optional headers and deliver
|
||||
// the response message, or an error, including a timeout if no message was received properly.
|
||||
func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) {
|
||||
var hdr []byte
|
||||
var err error
|
||||
|
||||
if len(msg.Header) > 0 {
|
||||
if !nc.info.Headers {
|
||||
return nil, ErrHeadersNotSupported
|
||||
}
|
||||
|
||||
hdr, err = msg.headerBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return nc.request(msg.Subject, hdr, msg.Data, timeout)
|
||||
}
|
||||
|
||||
// Request will send a request payload and deliver the response message,
|
||||
// or an error, including a timeout if no message was received properly.
|
||||
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
|
||||
return nc.request(subj, nil, data, timeout)
|
||||
}
|
||||
|
||||
func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
|
||||
if nc == nil {
|
||||
return nil, ErrInvalidConnection
|
||||
}
|
||||
|
||||
nc.mu.Lock()
|
||||
// If user wants the old style.
|
||||
if nc.Opts.UseOldRequestStyle {
|
||||
nc.mu.Unlock()
|
||||
return nc.oldRequest(subj, data, timeout)
|
||||
return nc.oldRequest(subj, hdr, data, timeout)
|
||||
}
|
||||
|
||||
mch, token, err := nc.createNewRequestAndSend(subj, data)
|
||||
return nc.newRequest(subj, hdr, data, timeout)
|
||||
}
|
||||
|
||||
func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
|
||||
mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2948,7 +3004,7 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
|
||||
// oldRequest will create an Inbox and perform a Request() call
|
||||
// with the Inbox reply and return the first reply received.
|
||||
// This is optimized for the case of multiple responses.
|
||||
func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) {
|
||||
func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
|
||||
inbox := NewInbox()
|
||||
ch := make(chan *Msg, RequestChanLen)
|
||||
|
||||
@@ -2959,10 +3015,11 @@ func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Ms
|
||||
s.AutoUnsubscribe(1)
|
||||
defer s.Unsubscribe()
|
||||
|
||||
err = nc.PublishRequest(subj, inbox, data)
|
||||
err = nc.publish(subj, inbox, hdr, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.NextMsg(timeout)
|
||||
}
|
||||
|
||||
@@ -3653,6 +3710,21 @@ func (m *Msg) Respond(data []byte) error {
|
||||
return nc.Publish(m.Reply, data)
|
||||
}
|
||||
|
||||
// RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers
|
||||
func (m *Msg) RespondMsg(msg *Msg) error {
|
||||
if m == nil || m.Sub == nil {
|
||||
return ErrMsgNotBound
|
||||
}
|
||||
if m.Reply == "" {
|
||||
return ErrMsgNoReply
|
||||
}
|
||||
m.Sub.mu.Lock()
|
||||
nc := m.Sub.conn
|
||||
m.Sub.mu.Unlock()
|
||||
// No need to check the connection here since the call to publish will do all the checking.
|
||||
return nc.PublishMsg(msg)
|
||||
}
|
||||
|
||||
// FIXME: This is a hack
|
||||
// removeFlushEntry is needed when we need to discard queued up responses
|
||||
// for our pings as part of a flush call. This happens when we have a flush
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -2,7 +2,7 @@
|
||||
github.com/minio/highwayhash
|
||||
# github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed
|
||||
github.com/nats-io/jwt/v2
|
||||
# github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f
|
||||
# github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a
|
||||
github.com/nats-io/nats.go
|
||||
github.com/nats-io/nats.go/encoders/builtin
|
||||
github.com/nats-io/nats.go/util
|
||||
|
||||
Reference in New Issue
Block a user