diff --git a/jetstream/jsm/jsm.go b/jetstream/jsm/jsm.go index b987fa71..8193cd02 100644 --- a/jetstream/jsm/jsm.go +++ b/jetstream/jsm/jsm.go @@ -397,17 +397,18 @@ func getMsgSetInfo(nc *nats.Conn, name string) { mstats := &msi.Stats cfg := &msi.Config log.Println() - log.Printf("Subjects: %+v", cfg.Subjects) - log.Printf("Retention: %s - %s", cfg.Storage, cfg.Retention) - log.Printf("TTL: %v", cfg.MaxAge) - log.Printf("Messages: %s of %s", + log.Printf("Subjects: %+v", cfg.Subjects) + log.Printf("Retention: %s - %s", cfg.Storage, cfg.Retention) + log.Printf("TTL: %v", cfg.MaxAge) + log.Printf("Messages: %s of %s", humanize.Comma(int64(mstats.Msgs)), unlimitedOrFriendly(int(cfg.MaxMsgs))) - log.Printf("Bytes: %s of %s", + log.Printf("Bytes: %s of %s", humanize.Bytes(mstats.Bytes), unlimitedOrFriendly(int(cfg.MaxBytes))) - log.Printf("FirstSeq: %s", humanize.Comma(int64(mstats.FirstSeq))) - log.Printf("LastSeq: %s", humanize.Comma(int64(mstats.LastSeq))) + log.Printf("FirstSeq: %s", humanize.Comma(int64(mstats.FirstSeq))) + log.Printf("LastSeq: %s", humanize.Comma(int64(mstats.LastSeq))) + log.Printf("Observables: %d", mstats.Observables) log.Println() } diff --git a/server/filestore.go b/server/filestore.go index 075a0d00..f3912a3d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1111,7 +1111,9 @@ func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, int64, error) { func (fs *fileStore) Stats() MsgSetStats { fs.mu.RLock() defer fs.mu.RUnlock() - return fs.stats + stats := fs.stats + stats.Observables = len(fs.obs) + return stats } func fileStoreMsgSize(subj string, msg []byte) uint64 { diff --git a/server/memstore.go b/server/memstore.go index 0db2ca07..d992d176 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -23,12 +23,13 @@ import ( // TODO(dlc) - This is a fairly simplistic approach but should do for now. type memStore struct { - mu sync.RWMutex - stats MsgSetStats - msgs map[uint64]*storedMsg - scb func(int64) - ageChk *time.Timer - config MsgSetConfig + mu sync.RWMutex + stats MsgSetStats + msgs map[uint64]*storedMsg + scb func(int64) + ageChk *time.Timer + config MsgSetConfig + obsCount int } type storedMsg struct { @@ -263,8 +264,11 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { func (ms *memStore) Stats() MsgSetStats { ms.mu.RLock() - defer ms.mu.RUnlock() - return ms.stats + stats := ms.stats + stats.Observables = ms.obsCount + ms.mu.RUnlock() + + return stats } func memStoreMsgSize(subj string, msg []byte) uint64 { @@ -287,15 +291,40 @@ func (ms *memStore) Stop() { ms.mu.Unlock() } -type observableMemStore struct{} +func (ms *memStore) incObsCount() { + ms.mu.Lock() + ms.obsCount++ + ms.mu.Unlock() +} + +func (ms *memStore) decObsCount() { + ms.mu.Lock() + + if ms.obsCount == 0 { + ms.mu.RUnlock() + return + } + + ms.obsCount-- + ms.mu.Unlock() +} + +type observableMemStore struct { + ms *memStore +} func (ms *memStore) ObservableStore(_ string, _ *ObservableConfig) (ObservableStore, error) { - return &observableMemStore{}, nil + ms.incObsCount() + + return &observableMemStore{ms}, nil } // No-ops. func (os *observableMemStore) Update(_ *ObservableState) error { return nil } -func (os *observableMemStore) Stop() {} +func (os *observableMemStore) Stop() { + os.ms.decObsCount() +} + func (os *observableMemStore) State() (*ObservableState, error) { return nil, nil } diff --git a/server/msgset.go b/server/msgset.go index 3bd3b73c..a46d64d7 100644 --- a/server/msgset.go +++ b/server/msgset.go @@ -321,7 +321,7 @@ func (mset *MsgSet) processMsgBySeq(_ *subscription, _ *client, subject, reply s subj, msg, ts, err := store.LoadMsg(seq) if err != nil { c.Debugf("JetStream request for message: %q - %q - %d error %v", c.acc.Name, name, seq, err) - response = []byte("-ERR 'bad sequence argument'") + response = []byte("-ERR 'could not load message from storage'") mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0} return } diff --git a/server/observable.go b/server/observable.go index b228aca9..2dd6ab74 100644 --- a/server/observable.go +++ b/server/observable.go @@ -24,6 +24,11 @@ import ( "time" ) +type ObservableInfo struct { + Config ObservableConfig `json:"configuration"` + State ObservableState `json:"state"` +} + type ObservableConfig struct { Delivery string `json:"delivery_subject"` Durable string `json:"durable_name,omitempty"` @@ -48,12 +53,23 @@ type AckPolicy int const ( // AckNone requires no acks for delivered messages. AckNone AckPolicy = iota - // When acking a sequence number, this implicitly acks all sequences below this one as well. + // AckAll when acking a sequence number, this implicitly acks all sequences below this one as well. AckAll // AckExplicit requires ack or nack for all messages. AckExplicit ) +func (a AckPolicy) String() string { + switch a { + case AckNone: + return "none" + case AckAll: + return "all" + default: + return "explicit" + } +} + // ReplayPolicy determines how the observable should replay messages it already has queued in the message set. type ReplayPolicy int @@ -64,6 +80,15 @@ const ( ReplayOriginal ) +func (r ReplayPolicy) String() string { + switch r { + case ReplayInstant: + return "instant" + default: + return "original" + } +} + // Ack responses. Note that a nil or no payload is same as AckAck var ( // Ack @@ -429,35 +454,41 @@ func (o *Observable) updateStateLoop() { } } -// Returns our current observable state. -func (o *Observable) Info() *ObservableState { +// Info returns our current observable state. +func (o *Observable) Info() *ObservableInfo { o.mu.Lock() defer o.mu.Unlock() - state := &ObservableState{ - Delivered: SequencePair{ - ObsSeq: o.dseq, - SetSeq: o.sseq, - }, - AckFloor: SequencePair{ - ObsSeq: o.adflr, - SetSeq: o.asflr, + info := &ObservableInfo{ + Config: o.config, + State: ObservableState{ + Delivered: SequencePair{ + ObsSeq: o.dseq, + SetSeq: o.sseq, + }, + AckFloor: SequencePair{ + ObsSeq: o.adflr, + SetSeq: o.asflr, + }, }, } + + info.Config.Durable = o.name if len(o.pending) > 0 { p := make(map[uint64]int64, len(o.pending)) for k, v := range o.pending { p[k] = v } - state.Pending = p + info.State.Pending = p } if len(o.rdc) > 0 { r := make(map[uint64]uint64, len(o.rdc)) for k, v := range o.rdc { r[k] = v } - state.Redelivery = r + info.State.Redelivery = r } - return state + + return info } // Will update the underlying store. diff --git a/server/store.go b/server/store.go index 931bf544..5ee70fde 100644 --- a/server/store.go +++ b/server/store.go @@ -54,10 +54,11 @@ type MsgSetStore interface { // MsgSetStats are stats about this given message set. type MsgSetStats struct { - Msgs uint64 - Bytes uint64 - FirstSeq uint64 - LastSeq uint64 + Msgs uint64 + Bytes uint64 + FirstSeq uint64 + LastSeq uint64 + Observables int } type ObservableStore interface { @@ -68,22 +69,22 @@ type ObservableStore interface { // SequencePair has both the observable and the message set sequence. This point to same message. type SequencePair struct { - ObsSeq uint64 - SetSeq uint64 + ObsSeq uint64 `json:"observable_sequence"` + SetSeq uint64 `json:"msg_set_sequence"` } // ObservableState represents a stored state for an observable. type ObservableState struct { // Delivered keep track of last delivered sequence numbers for both set and observable. - Delivered SequencePair + Delivered SequencePair `json:"delivered"` // AckFloor keeps track of the ack floors for both set and observable. - AckFloor SequencePair + AckFloor SequencePair `json:"ack_floor"` // These are both in set sequence context. // Pending is for all messages pending and the timestamp for the delivered time. // This will only be present when the AckPolicy is ExplicitAck. - Pending map[uint64]int64 + Pending map[uint64]int64 `json:"pending_count"` // This is for messages that have been redelivered, so count > 1. - Redelivery map[uint64]uint64 + Redelivery map[uint64]uint64 `json:"redelivered_count"` } func jsonString(s string) string {