diff --git a/go.mod b/go.mod index 94faffc4..be6b70e9 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/klauspost/compress v1.13.4 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.1.0 - github.com/nats-io/nats.go v1.13.0 + github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e diff --git a/go.sum b/go.sum index dedb1acc..b7b2c150 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE= -github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 h1:GMx3ZOcMEVM5qnUItQ4eJyQ6ycwmIEB/VC/UxvdevE0= +github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index a868bac2..71e89652 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da + github.com/nats-io/nats-server/v2 v2.6.3-0.20211014173021-3fbf9ddcbc6a github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index 5f7cda60..541e54e6 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -17,9 +17,9 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da h1:0snsE4pD2VKIsFiRMRkHFY+SJZVbT7/eZJ1lOt5XuLA= -github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= -github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.6.3-0.20211014173021-3fbf9ddcbc6a h1:9cFe3XP6CAOBUIPFPV3/KmTPyL/PWU1INdsnPabQmrw= +github.com/nats-io/nats-server/v2 v2.6.3-0.20211014173021-3fbf9ddcbc6a/go.mod h1:CNi6dJQ5H+vWqaoWKjCGtqBt7ai/xOTLiocUqhK6ews= +github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 361c15a6..f2ae3571 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -214,6 +214,11 @@ type jsOpts struct { aecb MsgErrHandler // Maximum in flight. maxap int + // the domain that produced the pre + domain string + // enables protocol tracing + trace TraceCB + shouldTrace bool } const ( @@ -252,18 +257,55 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error { return opt(opts) } +// TraceOperation indicates the direction of traffic flow to TraceCB +type TraceOperation int + +const ( + // TraceSent indicate the payload is being sent to subj + TraceSent TraceOperation = 0 + // TraceReceived indicate the payload is being received on subj + TraceReceived TraceOperation = 1 +) + +// TraceCB is called to trace API interactions for the JetStream Context +type TraceCB func(op TraceOperation, subj string, payload []byte, hdr Header) + +// TraceFunc enables tracing of JetStream API interactions +func TraceFunc(cb TraceCB) JSOpt { + return jsOptFn(func(js *jsOpts) error { + js.trace = cb + js.shouldTrace = true + return nil + }) +} + // Domain changes the domain part of JetSteam API prefix. func Domain(domain string) JSOpt { - return APIPrefix(fmt.Sprintf(jsDomainT, domain)) + if domain == _EMPTY_ { + return APIPrefix(_EMPTY_) + } + + return jsOptFn(func(js *jsOpts) error { + js.domain = domain + js.pre = fmt.Sprintf(jsDomainT, domain) + + return nil + }) + } // APIPrefix changes the default prefix used for the JetStream API. func APIPrefix(pre string) JSOpt { return jsOptFn(func(js *jsOpts) error { + if pre == _EMPTY_ { + return nil + } + js.pre = pre if !strings.HasSuffix(js.pre, ".") { js.pre = js.pre + "." } + return nil }) } @@ -1398,12 +1440,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, var ccSubj string if isDurable { - ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) + ccSubj = js.apiSubj(fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)) } else { - ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) + ccSubj = js.apiSubj(fmt.Sprintf(apiConsumerCreateT, stream)) } - resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) + if js.opts.shouldTrace { + js.opts.trace(TraceSent, ccSubj, j, nil) + } + resp, err := nc.Request(ccSubj, j, js.opts.wait) if err != nil { cleanUpSub() if err == ErrNoResponders { @@ -1411,6 +1456,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } return nil, err } + if js.opts.shouldTrace { + js.opts.trace(TraceReceived, ccSubj, resp.Data, resp.Header) + } + var cinfo consumerResponse err = json.Unmarshal(resp.Data, &cinfo) if err != nil { @@ -1627,6 +1676,26 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { return } + var maxStr string + // If there was an AUTO_UNSUB done, we need to adjust the new value + // to send after the SUB for the new sid. + if sub.max > 0 { + if sub.jsi.fciseq < sub.max { + adjustedMax := sub.max - sub.jsi.fciseq + maxStr = strconv.Itoa(int(adjustedMax)) + } else { + // We are already at the max, so we should just unsub the + // existing sub and be done + go func(sid int64) { + nc.mu.Lock() + nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_)) + nc.kickFlusher() + nc.mu.Unlock() + }(sub.sid) + return + } + } + // Quick unsubscribe. Since we know this is a simple push subscriber we do in place. osid := sub.applyNewSID() @@ -1646,6 +1715,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc.mu.Lock() nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_)) nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid)) + if maxStr != _EMPTY_ { + nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr)) + } nc.kickFlusher() nc.mu.Unlock() @@ -2405,7 +2477,7 @@ func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) { ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer) - resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) + resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -2426,11 +2498,27 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin return info.ConsumerInfo, nil } +// a RequestWithContext with tracing via TraceCB +func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) { + if js.opts.shouldTrace { + js.opts.trace(TraceSent, subj, data, nil) + } + resp, err := js.nc.RequestWithContext(ctx, subj, data) + if err != nil { + return nil, err + } + if js.opts.shouldTrace { + js.opts.trace(TraceReceived, subj, resp.Data, resp.Header) + } + + return resp, nil +} + func (m *Msg) checkReply() (*js, *jsSub, error) { if m == nil || m.Sub == nil { return nil, nil, ErrMsgNotBound } - if m.Reply == "" { + if m.Reply == _EMPTY_ { return nil, nil, ErrMsgNoReply } sub := m.Sub diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 95f38b59..5c38ab67 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -188,7 +188,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { defer cancel() } - resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) + resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) if err != nil { // todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had if err == ErrNoResponders { @@ -251,7 +251,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) } - resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req) + resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -293,7 +293,7 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { } dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) - r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil) + r, err := js.apiRequestWithContext(o.ctx, dcSubj, nil) if err != nil { return err } @@ -376,7 +376,7 @@ func (c *consumerLister) Next() bool { } clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream)) - r, err := c.js.nc.RequestWithContext(ctx, clSubj, req) + r, err := c.js.apiRequestWithContext(ctx, clSubj, req) if err != nil { c.err = err return false @@ -473,7 +473,7 @@ func (c *consumerNamesLister) Next() bool { } clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream)) - r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil) + r, err := c.js.apiRequestWithContext(ctx, clSubj, nil) if err != nil { c.err = err return false @@ -561,7 +561,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { } csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name)) - r, err := js.nc.RequestWithContext(o.ctx, csSubj, req) + r, err := js.apiRequestWithContext(o.ctx, csSubj, req) if err != nil { return nil, err } @@ -592,7 +592,7 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { } csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil) + r, err := js.apiRequestWithContext(o.ctx, csSubj, nil) if err != nil { return nil, err } @@ -676,7 +676,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error } usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name)) - r, err := js.nc.RequestWithContext(o.ctx, usSubj, req) + r, err := js.apiRequestWithContext(o.ctx, usSubj, req) if err != nil { return nil, err } @@ -711,7 +711,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { } dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name)) - r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil) if err != nil { return err } @@ -788,7 +788,7 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name)) - r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, req) if err != nil { return nil, err } @@ -853,7 +853,7 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name)) - r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, req) if err != nil { return err } @@ -905,7 +905,7 @@ func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) } psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, stream)) - r, err := js.nc.RequestWithContext(o.ctx, psSubj, b) + r, err := js.apiRequestWithContext(o.ctx, psSubj, b) if err != nil { return err } @@ -970,7 +970,7 @@ func (s *streamLister) Next() bool { } slSubj := s.js.apiSubj(apiStreamList) - r, err := s.js.nc.RequestWithContext(ctx, slSubj, req) + r, err := s.js.apiRequestWithContext(ctx, slSubj, req) if err != nil { s.err = err return false @@ -1054,7 +1054,7 @@ func (l *streamNamesLister) Next() bool { defer cancel() } - r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil) + r, err := l.js.apiRequestWithContext(ctx, l.js.apiSubj(apiStreams), nil) if err != nil { l.err = err return false @@ -1132,7 +1132,7 @@ func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, if o.ctx == nil && o.wait > 0 { o.ctx, cancel = context.WithTimeout(context.Background(), o.wait) } - if o.pre == "" { + if o.pre == _EMPTY_ { o.pre = defs.pre } diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index cf5468bc..58f4b4b9 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -66,13 +66,33 @@ type KeyValue interface { Bucket() string // PurgeDeletes will remove all current delete markers. PurgeDeletes(opts ...WatchOpt) error + // Status retrieves the status and configuration of a bucket + Status() (KeyValueStatus, error) +} + +// KeyValueStatus is run-time status about a Key-Value bucket +type KeyValueStatus interface { + // Bucket the name of the bucket + Bucket() string + + // Values is how many messages are in the bucket, including historical values + Values() uint64 + + // History returns the configured history kept per key + History() int64 + + // TTL is how long the bucket keeps values for + TTL() time.Duration + + // BackingStore indicates what technology is used for storage of the bucket + BackingStore() string } // KeyWatcher is what is returned when doing a watch. type KeyWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -333,6 +353,18 @@ func keyValid(key string) bool { // Get returns the latest value for the key. func (kv *kvs) Get(key string) (KeyValueEntry, error) { + e, err := kv.get(key) + if err == ErrKeyDeleted { + return nil, ErrKeyNotFound + } + if err != nil { + return nil, err + } + + return e, nil +} + +func (kv *kvs) get(key string) (KeyValueEntry, error) { if !keyValid(key) { return nil, ErrInvalidKey } @@ -367,6 +399,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) { entry.op = KeyValuePurge return entry, ErrKeyDeleted } + } return entry, nil @@ -400,11 +433,13 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { if err == nil { return v, nil } + // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that // so we need to double check. - if e, err := kv.Get(key); err == ErrKeyDeleted { + if e, err := kv.get(key); err == ErrKeyDeleted { return kv.Update(key, value, e.Revision()) } + return 0, err } @@ -469,20 +504,31 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { } defer watcher.Stop() + var deleteMarkers []KeyValueEntry for entry := range watcher.Updates() { if entry == nil { break } if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge { - var b strings.Builder - b.WriteString(kv.pre) - b.WriteString(entry.Key()) - err := kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String()}) - if err != nil { - return err - } + deleteMarkers = append(deleteMarkers, entry) } } + + var ( + pr streamPurgeRequest + b strings.Builder + ) + // Do actual purges here. + for _, entry := range deleteMarkers { + b.WriteString(kv.pre) + b.WriteString(entry.Key()) + pr.Subject = b.String() + err := kv.js.purgeStream(kv.stream, &pr) + if err != nil { + return err + } + b.Reset() + } return nil } @@ -642,3 +688,37 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { func (kv *kvs) Bucket() string { return kv.name } + +// KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus +type KeyValueBucketStatus struct { + nfo *StreamInfo + bucket string +} + +// Bucket the name of the bucket +func (s *KeyValueBucketStatus) Bucket() string { return s.bucket } + +// Values is how many messages are in the bucket, including historical values +func (s *KeyValueBucketStatus) Values() uint64 { return s.nfo.State.Msgs } + +// History returns the configured history kept per key +func (s *KeyValueBucketStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject } + +// TTL is how long the bucket keeps values for +func (s *KeyValueBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } + +// BackingStore indicates what technology is used for storage of the bucket +func (s *KeyValueBucketStatus) BackingStore() string { return "JetStream" } + +// StreamInfo is the stream info retrieved to create the status +func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo } + +// Status retrieves the status and configuration of a bucket +func (kv *kvs) Status() (KeyValueStatus, error) { + nfo, err := kv.js.StreamInfo(kv.stream) + if err != nil { + return nil, err + } + + return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil +} diff --git a/vendor/github.com/nats-io/nats.go/object.go b/vendor/github.com/nats-io/nats.go/object.go index 13dd7b28..24953562 100644 --- a/vendor/github.com/nats-io/nats.go/object.go +++ b/vendor/github.com/nats-io/nats.go/object.go @@ -31,6 +31,8 @@ import ( "github.com/nats-io/nuid" ) +// ObjectStoreManager creates, loads and deletes Object Stores +// // Notice: Experimental Preview // // This functionality is EXPERIMENTAL and may be changed in later releases. @@ -43,6 +45,9 @@ type ObjectStoreManager interface { DeleteObjectStore(bucket string) error } +// ObjectStore is a blob store capable of storing large objects efficiently in +// JetStream streams +// // Notice: Experimental Preview // // This functionality is EXPERIMENTAL and may be changed in later releases. @@ -89,6 +94,9 @@ type ObjectStore interface { // List will list all the objects in this store. List(opts ...WatchOpt) ([]*ObjectInfo, error) + + // Status retrieves run-time status about the backing store of the bucket. + Status() (ObjectStoreStatus, error) } type ObjectOpt interface { @@ -109,7 +117,7 @@ func (ctx ContextOpt) configureObject(opts *objOpts) error { type ObjectWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan *ObjectInfo - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -132,6 +140,25 @@ type ObjectStoreConfig struct { Replicas int } +type ObjectStoreStatus interface { + // Bucket is the name of the bucket + Bucket() string + // Description is the description supplied when creating the bucket + Description() string + // TTL indicates how long objects are kept in the bucket + TTL() time.Duration + // Storage indicates the underlying JetStream storage technology used to store data + Storage() StorageType + // Replicas indicates how many storage replicas are kept for the data in the bucket + Replicas() int + // Sealed indicates the stream is sealed and cannot be modified in any way + Sealed() bool + // Size is the combined size of all data in the bucket including metadata, in bytes + Size() uint64 + // BackingStore provides details about the underlying storage + BackingStore() string +} + // ObjectMetaOptions type ObjectMetaOptions struct { Link *ObjectLink `json:"link,omitempty"` @@ -857,6 +884,54 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { return objs, nil } +// ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus +type ObjectBucketStatus struct { + nfo *StreamInfo + bucket string +} + +// Bucket is the name of the bucket +func (s *ObjectBucketStatus) Bucket() string { return s.bucket } + +// Description is the description supplied when creating the bucket +func (s *ObjectBucketStatus) Description() string { return s.nfo.Config.Description } + +// TTL indicates how long objects are kept in the bucket +func (s *ObjectBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } + +// Storage indicates the underlying JetStream storage technology used to store data +func (s *ObjectBucketStatus) Storage() StorageType { return s.nfo.Config.Storage } + +// Replicas indicates how many storage replicas are kept for the data in the bucket +func (s *ObjectBucketStatus) Replicas() int { return s.nfo.Config.Replicas } + +// Sealed indicates the stream is sealed and cannot be modified in any way +func (s *ObjectBucketStatus) Sealed() bool { return s.nfo.Config.Sealed } + +// Size is the combined size of all data in the bucket including metadata, in bytes +func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes } + +// BackingStore indicates what technology is used for storage of the bucket +func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" } + +// StreamInfo is the stream info retrieved to create the status +func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo } + +// Status retrieves run-time status about a bucket +func (obs *obs) Status() (ObjectStoreStatus, error) { + nfo, err := obs.js.StreamInfo(obs.stream) + if err != nil { + return nil, err + } + + status := &ObjectBucketStatus{ + nfo: nfo, + bucket: obs.name, + } + + return status, nil +} + // Read impl. func (o *objResult) Read(p []byte) (n int, err error) { o.Lock() @@ -880,7 +955,7 @@ func (o *objResult) Read(p []byte) (n int, err error) { } r := o.r.(net.Conn) - r.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + r.SetReadDeadline(time.Now().Add(2 * time.Second)) n, err = r.Read(p) if err, ok := err.(net.Error); ok && err.Timeout() { if ctx := o.ctx; ctx != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index bd20d740..a71b7fe2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.1.0 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.13.0 +# github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin