Merge pull request #2421 from nats-io/subs-leak

[FIXED] Subscription leaks when auto-unsub used after delivered > max.
This commit is contained in:
Derek Collison
2021-08-08 08:00:06 -07:00
committed by GitHub
9 changed files with 171 additions and 62 deletions

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/klauspost/compress v1.11.12
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.0.3
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30
github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e

2
go.sum
View File

@@ -18,6 +18,8 @@ github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls=
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41 h1:GUUkiOgD00OMr4foruBN6YG1be3lFnHl0LJIoEs8cQg=
github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=

View File

@@ -2799,14 +2799,14 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
func (c *client) processUnsub(arg []byte) error {
args := splitArg(arg)
var sid []byte
max := -1
max := int64(-1)
switch len(args) {
case 1:
sid = args[0]
case 2:
sid = args[0]
max = parseSize(args[1])
max = int64(parseSize(args[1]))
default:
return fmt.Errorf("processUnsub Parse Error: '%s'", arg)
}
@@ -2827,8 +2827,8 @@ func (c *client) processUnsub(arg []byte) error {
updateGWs := false
if sub, ok = c.subs[string(sid)]; ok {
acc = c.acc
if max > 0 {
sub.max = int64(max)
if max > 0 && max > sub.nm {
sub.max = max
} else {
// Clear it here to override
sub.max = 0
@@ -3066,6 +3066,7 @@ func (c *client) deliverMsg(sub *subscription, acc *Account, subject, reply, mh,
srv := client.srv
sub.nm++
// Check if we should auto-unsubscribe.
if sub.max > 0 {
if client.kind == ROUTER && sub.nm >= sub.max {

View File

@@ -7844,6 +7844,68 @@ func TestJetStreamPanicDecodingConsumerState(t *testing.T) {
}
}
// Had a report of leaked subs with pull subscribers.
func TestJetStreamPullConsumerLeakedSubs(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"Domains.*"},
Replicas: 1,
Retention: nats.InterestPolicy,
}); err != nil {
t.Fatalf("Error creating stream: %v", err)
}
sub, err := js.PullSubscribe("Domains.Domain", "Domains-Api", nats.MaxAckPending(20_000))
if err != nil {
t.Fatalf("Error creating pull subscriber: %v", err)
}
defer sub.Unsubscribe()
// Load up a bunch of requests.
numRequests := 20 //100_000
for i := 0; i < numRequests; i++ {
js.PublishAsync("Domains.Domain", []byte("QUESTION"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
numSubs := c.stableTotalSubs()
// With batch of 1 we do not see any issues, so set to 10.
// Currently Go client uses auto unsub based on the batch size.
for i := 0; i < numRequests/10; i++ {
msgs, err := sub.Fetch(10)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for _, m := range msgs {
m.Ack()
}
}
// Make sure the stream is empty..
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 0 {
t.Fatalf("Stream should be empty, got %+v", si)
}
// Make sure we did not leak any subs.
if numSubsAfter := c.stableTotalSubs(); numSubsAfter != numSubs {
t.Fatalf("Subs leaked: %d before, %d after", numSubs, numSubsAfter)
}
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -92,7 +92,7 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [
// oldRequestWithContext utilizes inbox and subscription per request.
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
inbox := NewInbox()
inbox := nc.newInbox()
ch := make(chan *Msg, RequestChanLen)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)

View File

@@ -695,7 +695,7 @@ func ExpectLastSequence(seq uint64) PubOpt {
})
}
// ExpectLastMsgId sets the expected sequence in the response from the publish.
// ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastMsgId(id string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lid = id
@@ -1048,7 +1048,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// to which it should be attaching to.
if consumer != _EMPTY_ {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found")
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
}
@@ -1194,6 +1194,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
attached = true
} else {
if cinfo.Error.Code == 404 {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
}
@@ -1355,6 +1358,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {
if err := json.Unmarshal(resp.Data, &slr); err != nil {
return _EMPTY_, err
}
if slr.Error != nil || len(slr.Streams) != 1 {
return _EMPTY_, ErrNoMatchingStream
}
@@ -1889,6 +1893,9 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
return nil, ErrConsumerNotFound
}
return nil, fmt.Errorf("nats: %s", info.Error.Description)
}
return info.ConsumerInfo, nil

View File

@@ -74,23 +74,24 @@ type JetStreamManager interface {
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
}
// Placement is used to guide placement of streams in clustered JetStream.
@@ -258,6 +259,9 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
return nil, ErrConsumerNotFound
}
return nil, errors.New(info.Error.Description)
}
return info.ConsumerInfo, nil
@@ -292,7 +296,11 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
return ErrConsumerNotFound
}
return errors.New(resp.Error.Description)
}
return nil
@@ -559,6 +567,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}
return resp.StreamInfo, nil
}
@@ -587,8 +596,12 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
return nil, ErrStreamNotFound
}
return nil, errors.New(resp.Error.Description)
}
return resp.StreamInfo, nil
}
@@ -701,7 +714,11 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
return ErrStreamNotFound
}
return errors.New(resp.Error.Description)
}
return nil

View File

@@ -146,6 +146,8 @@ var (
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("nats: stream not found")
ErrConsumerNotFound = errors.New("nats: consumer not found")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
@@ -433,6 +435,9 @@ type Options struct {
// For websocket connections, indicates to the server that the connection
// supports compression. If the server does too, then data will be compressed.
Compression bool
// InboxPrefix allows the default _INBOX prefix to be customized
InboxPrefix string
}
const (
@@ -494,11 +499,13 @@ type Conn struct {
ws bool // true if a websocket connection
// New style response handler
respSub string // The wildcard subject
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
respSub string // The wildcard subject
respSubPrefix string // the wildcard prefix including trailing .
respSubLen int // the length of the wildcard prefix excluding trailing .
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
}
type natsReader struct {
@@ -1101,6 +1108,17 @@ func Compression(enabled bool) Option {
}
}
// CustomInboxPrefix configures the request + reply inbox prefix
func CustomInboxPrefix(p string) Option {
return func(o *Options) error {
if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") {
return fmt.Errorf("nats: invald custom prefix")
}
o.InboxPrefix = p
return nil
}
}
// Handler processing
// SetDisconnectHandler will set the disconnect event handler.
@@ -3120,10 +3138,7 @@ func decodeHeadersMsg(data []byte) (Header, error) {
//
// https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeader
func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
var (
m = make(textproto.MIMEHeader)
strs []string
)
m := make(textproto.MIMEHeader)
for {
kv, err := tp.ReadLine()
if len(kv) == 0 {
@@ -3145,16 +3160,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
i++
}
value := string(kv[i:])
vv := m[key]
if vv == nil && len(strs) > 0 {
// Single value header.
vv, strs = strs[:1:1], strs[1:]
vv[0] = value
m[key] = vv
} else {
// Multi value header.
m[key] = append(vv, value)
}
m[key] = append(m[key], value)
if err != nil {
return m, err
}
@@ -3350,7 +3356,8 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respInbox[respInboxPrefixLen:]
token := respInbox[nc.respSubLen:]
nc.respMap[token] = mch
if nc.respMux == nil {
// Create the response subscription we will use for all new style responses.
@@ -3457,7 +3464,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration)
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
inbox := NewInbox()
inbox := nc.newInbox()
ch := make(chan *Msg, RequestChanLen)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
@@ -3477,12 +3484,10 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration)
// InboxPrefix is the prefix for all inbox subjects.
const (
InboxPrefix = "_INBOX."
inboxPrefixLen = len(InboxPrefix)
respInboxPrefixLen = inboxPrefixLen + nuidSize + 1
replySuffixLen = 8 // Gives us 62^8
rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
base = 62
InboxPrefix = "_INBOX."
inboxPrefixLen = len(InboxPrefix)
rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
base = 62
)
// NewInbox will return an inbox string which can be used for directed replies from
@@ -3497,10 +3502,23 @@ func NewInbox() string {
return string(b[:])
}
func (nc *Conn) newInbox() string {
if nc.Opts.InboxPrefix == _EMPTY_ {
return NewInbox()
}
var sb strings.Builder
sb.WriteString(nc.Opts.InboxPrefix)
sb.WriteByte('.')
sb.WriteString(nuid.Next())
return sb.String()
}
// Function to init new response structures.
func (nc *Conn) initNewResp() {
// _INBOX wildcard
nc.respSub = fmt.Sprintf("%s.*", NewInbox())
nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox())
nc.respSubLen = len(nc.respSubPrefix)
nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
nc.respMap = make(map[string]chan *Msg)
nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
@@ -3512,15 +3530,17 @@ func (nc *Conn) newRespInbox() string {
if nc.respMap == nil {
nc.initNewResp()
}
var b [respInboxPrefixLen + replySuffixLen]byte
pres := b[:respInboxPrefixLen]
copy(pres, nc.respSub)
var sb strings.Builder
sb.WriteString(nc.respSubPrefix)
rn := nc.respRand.Int63()
for i, l := respInboxPrefixLen, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
for i := 0; i < nuidSize; i++ {
sb.WriteByte(rdigits[rn%base])
rn /= base
}
return string(b[:])
return sb.String()
}
// NewRespInbox is the new format used for _INBOX.

2
vendor/modules.txt vendored
View File

@@ -9,7 +9,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.3
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30
# github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin