diff --git a/server/client.go b/server/client.go index 07bb929d..6d1728a1 100644 --- a/server/client.go +++ b/server/client.go @@ -173,6 +173,7 @@ const ( Revocation InternalClient MsgHeaderViolation + NoRespondersRequiresHeaders ClusterNameConflict ) @@ -420,23 +421,24 @@ func (s *subscription) isClosed() bool { } type clientOpts struct { - Echo bool `json:"echo"` - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - TLSRequired bool `json:"tls_required"` - Nkey string `json:"nkey,omitempty"` - JWT string `json:"jwt,omitempty"` - Sig string `json:"sig,omitempty"` - Token string `json:"auth_token,omitempty"` - Username string `json:"user,omitempty"` - Password string `json:"pass,omitempty"` - Name string `json:"name"` - Lang string `json:"lang"` - Version string `json:"version"` - Protocol int `json:"protocol"` - Account string `json:"account,omitempty"` - AccountNew bool `json:"new_account,omitempty"` - Headers bool `json:"headers,omitempty"` + Echo bool `json:"echo"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + TLSRequired bool `json:"tls_required"` + Nkey string `json:"nkey,omitempty"` + JWT string `json:"jwt,omitempty"` + Sig string `json:"sig,omitempty"` + Token string `json:"auth_token,omitempty"` + Username string `json:"user,omitempty"` + Password string `json:"pass,omitempty"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + Protocol int `json:"protocol"` + Account string `json:"account,omitempty"` + AccountNew bool `json:"new_account,omitempty"` + Headers bool `json:"headers,omitempty"` + NoResponders bool `json:"no_responders,omitempty"` // Routes only Import *SubjectPermission `json:"import,omitempty"` @@ -1564,6 +1566,17 @@ func (c *client) processConnect(arg []byte) error { c.closeConnection(BadClientProtocolVersion) return ErrBadClientProtocol } + // Check to see that if no_responders is requested + // they have header support on as well. + c.mu.Lock() + misMatch := c.opts.NoResponders && !c.headers + c.mu.Unlock() + if misMatch { + c.sendErr(ErrNoRespondersRequiresHeaders.Error()) + c.closeConnection(NoRespondersRequiresHeaders) + return ErrNoRespondersRequiresHeaders + + } if verbose { c.sendOK() } @@ -3165,9 +3178,33 @@ func (c *client) processInboundClientMsg(msg []byte) bool { didDeliver = c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames) || didDeliver } + // Check to see if we did not deliver to anyone and the client has a reply subject set + // and wants notification of no_responders. + if !didDeliver && len(c.pa.reply) > 0 { + c.mu.Lock() + if c.opts.NoResponders { + if sub := c.subForReply(c.pa.reply); sub != nil { + proto := fmt.Sprintf("HMSG %s %s 16 16\r\nNATS/1.0 503\r\n\r\n", c.pa.reply, sub.sid) + c.queueOutbound([]byte(proto)) + } + } + c.mu.Unlock() + } + return didDeliver } +// Return the subscription for this reply subject. Only look at normal subs for this client. +func (c *client) subForReply(reply []byte) *subscription { + r := c.acc.sl.Match(string(reply)) + for _, sub := range r.psubs { + if sub.client == c { + return sub + } + } + return nil +} + // This is invoked knowing that this client has some GW replies // in its map. It will check if one is find for the c.pa.subject // and if so will process it directly (send to GWs and LEAFs) and diff --git a/server/client_test.go b/server/client_test.go index 48294251..c1b20f6b 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -194,6 +194,36 @@ func TestClientCreateAndInfo(t *testing.T) { } } +func TestClientNoResponderSupport(t *testing.T) { + opts := defaultServerOptions + opts.Port = -1 + s := New(&opts) + + c, _, _ := newClientForServer(s) + defer c.close() + + // Force header support if you want to do no_responders. Make sure headers are set. + if err := c.parse([]byte("CONNECT {\"no_responders\":true}\r\n")); err == nil { + t.Fatalf("Expected error") + } + + c, cr, _ := newClientForServer(s) + defer c.close() + + c.parseAsync("CONNECT {\"headers\":true, \"no_responders\":true}\r\nSUB reply 1\r\nPUB foo reply 2\r\nok\r\n") + + l, err := cr.ReadString('\n') + if err != nil { + t.Fatalf("Error receiving msg from server: %v\n", err) + } + + am := hmsgPat.FindAllStringSubmatch(l, -1) + if len(am) == 0 { + t.Fatalf("Did not get a match for %q", l) + } + checkPayload(cr, []byte("NATS/1.0 503\r\n\r\n"), t) +} + func TestServerHeaderSupport(t *testing.T) { opts := defaultServerOptions opts.Port = -1 diff --git a/server/errors.go b/server/errors.go index f9d46777..e323ce06 100644 --- a/server/errors.go +++ b/server/errors.go @@ -145,6 +145,10 @@ var ( // but they are not supported on this server. ErrMsgHeadersNotSupported = errors.New("message headers not supported") + // ErrNoRespondersRequiresHeaders signals that a client needs to have headers + // on if they want no responders behavior. + ErrNoRespondersRequiresHeaders = errors.New("no responders requires headers support") + // ErrClusterNameConfigConflict signals that the options for cluster name in cluster and gateway are in conflict. ErrClusterNameConfigConflict = errors.New("cluster name conflicts between cluster and gateway definitions")