diff --git a/go.mod b/go.mod index 75a31335..75bafe81 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,12 @@ module github.com/nats-io/nats-server/v2 require ( - github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 github.com/nats-io/nats.go v1.10.0 github.com/golang/protobuf v1.3.5 // indirect github.com/nats-io/nats.go v1.9.2 github.com/nats-io/nkeys v0.1.4 + github.com/nats-io/jwt v0.3.2 github.com/nats-io/nuid v1.0.1 github.com/minio/highwayhash v1.0.0 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 diff --git a/go.sum b/go.sum index f3a34f4b..69501fa1 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,6 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0= -golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/server/filestore.go b/server/filestore.go index 00d76247..5d22741e 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -118,17 +118,19 @@ const ( // Index file for observable obsState = "o.dat" // Maximum size of a write buffer we may consider for re-use. - maxBufReuse = 4 * 1024 * 1024 + maxBufReuse = 2 * 1024 * 1024 // Default stream block size. defaultStreamBlockSize = 128 * 1024 * 1024 // 128MB // Default for workqueue or interest based. defaultOtherBlockSize = 32 * 1024 * 1024 // 32MB // max block size for now. - maxBlockSize = defaultStreamBlockSize + maxBlockSize = 2 * defaultStreamBlockSize // default cache expiration defaultCacheExpiration = 2 * time.Second // default sync interval defaultSyncInterval = 10 * time.Second + // coalesceDelay + coalesceDelay = 20 * time.Millisecond ) func newFileStore(fcfg FileStoreConfig, cfg MsgSetConfig) (*fileStore, error) { @@ -455,6 +457,7 @@ func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, error) { if fs.ageChk == nil && fs.cfg.MaxAge != 0 { fs.startAgeChk() } + cb := fs.scb stopBytes := int64(fs.stats.Bytes) fs.mu.Unlock() @@ -757,6 +760,7 @@ func (fs *fileStore) flushLoop(fch, qch chan struct{}) { for { select { case <-fch: + time.Sleep(coalesceDelay) fs.flushPendingWrites() case <-qch: return @@ -1031,7 +1035,7 @@ func (fs *fileStore) checkPrefetch(seq uint64, mb *msgBlock) { // Will return message for the given sequence number. func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg { fs.mu.Lock() - // seq == 0 indidcates we want first msg. + // seq == 0 indicates we want first msg. if seq == 0 { seq = fs.stats.FirstSeq } @@ -1041,9 +1045,6 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg { return nil } - // Check for prefetch - fs.checkPrefetch(seq, mb) - // Check cache. if mb.cache != nil { if sm, ok := mb.cache[seq]; ok { @@ -1052,6 +1053,10 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg { return sm } } + + // Check for prefetch + fs.checkPrefetch(seq, mb) + // If we are here we do not have the message in our cache currently. sm := fs.readAndCacheMsgs(mb, seq) if sm != nil { @@ -1109,12 +1114,12 @@ func (fs *fileStore) flushPendingWrites() { // Lock should be held. func (fs *fileStore) flushPendingWritesLocked() { mb := fs.lmb - if mb == nil { + if mb == nil || mb.mfd == nil { return } // Append new data to the message block file. - if lbb := fs.wmb.Len(); lbb > 0 && mb.mfd != nil { + if lbb := fs.wmb.Len(); lbb > 0 { n, _ := fs.wmb.WriteTo(mb.mfd) if int(n) != lbb { fs.wmb.Truncate(int(n)) @@ -1578,14 +1583,13 @@ func (o *observableFileStore) Update(state *ObservableState) error { // Check if we have the index file open. o.mu.Lock() - defer o.mu.Unlock() - if err := o.ensureStateFileOpen(); err != nil { - return err + err := o.ensureStateFileOpen() + if err == nil { + n, err = o.ifd.WriteAt(buf, 0) + o.lwsz = int64(n) } - - n, err := o.ifd.WriteAt(buf, 0) - o.lwsz = int64(n) + o.mu.Unlock() return err } diff --git a/server/msgset.go b/server/msgset.go index c017a6b3..8a3524e3 100644 --- a/server/msgset.go +++ b/server/msgset.go @@ -52,7 +52,7 @@ const ( // MsgSet is a jetstream message set. When we receive a message internally destined // for a MsgSet we will direct link from the client to this MsgSet structure. type MsgSet struct { - mu sync.Mutex + mu sync.RWMutex sg *sync.Cond sgw int jsa *jsAccount @@ -291,7 +291,8 @@ func (mset *MsgSet) processInboundJetStreamMsg(_ *subscription, _ *client, subje response := AckAck // Check to see if we are over the account limit. - if seq, err := store.StoreMsg(subject, msg); err != nil { + seq, err := store.StoreMsg(subject, msg) + if err != nil { mset.mu.Lock() accName := c.acc.Name name := mset.config.Name @@ -299,17 +300,31 @@ func (mset *MsgSet) processInboundJetStreamMsg(_ *subscription, _ *client, subje c.Errorf("JetStream failed to store a msg on account: %q message set: %q - %v", accName, name, err) response = []byte(fmt.Sprintf("-ERR %q", err.Error())) } else if jsa.limitsExceeded(stype) { - c.Debugf("JetStream resource limits exceeded for account") + c.Debugf("JetStream resource limits exceeded for account: %q", c.acc.Name) response = []byte("-ERR 'resource limits exceeded for account'") store.RemoveMsg(seq) - } else { - mset.signalObservers() + seq = 0 } // Send response here. if doAck && len(reply) > 0 { mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0} } + + if err == nil && seq > 0 { + var needSignal bool + mset.mu.Lock() + for _, o := range mset.obs { + if !o.deliverCurrentMsg(subject, msg, seq) { + needSignal = true + } + } + mset.mu.Unlock() + + if needSignal { + mset.signalObservers() + } + } } func (mset *MsgSet) signalObservers() { diff --git a/server/observable.go b/server/observable.go index 849cbbe7..2adb0c1a 100644 --- a/server/observable.go +++ b/server/observable.go @@ -98,6 +98,7 @@ type Observable struct { nointerest int athresh int achk time.Duration + fch chan struct{} qch chan struct{} } @@ -197,7 +198,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) } // Set name, which will be durable name if set, otherwise we create one at random. - o := &Observable{mset: mset, config: *config, dsubj: config.Delivery, active: true, qch: make(chan struct{})} + o := &Observable{mset: mset, config: *config, dsubj: config.Delivery, active: true, qch: make(chan struct{}), fch: make(chan struct{})} if isDurableObservable(config) { o.name = config.Durable } else { @@ -286,6 +287,8 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) // Now start up Go routine to deliver msgs. go o.loopAndDeliverMsgs(s, a) + // Startup our state update loop. + go o.updateStateLoop() return o, nil } @@ -387,26 +390,51 @@ func (o *Observable) readStoredState() error { return err } +func (o *Observable) updateStateLoop() { + o.mu.Lock() + fch := o.fch + qch := o.qch + o.mu.Unlock() + + for { + select { + case <-qch: + return + case <-fch: + time.Sleep(25 * time.Millisecond) + o.mu.Lock() + if o.store != nil { + state := &ObservableState{ + Delivered: SequencePair{ + ObsSeq: o.dseq, + SetSeq: o.sseq, + }, + AckFloor: SequencePair{ + ObsSeq: o.adflr, + SetSeq: o.asflr, + }, + Pending: o.pending, + Redelivery: o.rdc, + } + // FIXME(dlc) - Hold onto any errors. + o.store.Update(state) + } + o.mu.Unlock() + } + } +} + // Will update the underlying store. // Lock should be held. func (o *Observable) updateStore() { if o.store == nil { return } - state := &ObservableState{ - Delivered: SequencePair{ - ObsSeq: o.dseq, - SetSeq: o.sseq, - }, - AckFloor: SequencePair{ - ObsSeq: o.adflr, - SetSeq: o.asflr, - }, - Pending: o.pending, - Redelivery: o.rdc, + // Kick our flusher + select { + case o.fch <- struct{}{}: + default: } - // FIXME(dlc) - Hold onto any errors. - o.store.Update(state) } // Process an ack for a message. @@ -567,7 +595,7 @@ func (o *Observable) clearReplayState() { } // Wait for pull requests. -// FIXME(dlc) - for short wait periods is ok but should single when waiting comes in. +// FIXME(dlc) - for short wait periods is ok but should signal when waiting comes in. func (o *Observable) waitForPullRequests(wait time.Duration) { o.mu.Lock() qch := o.qch @@ -736,6 +764,54 @@ func (o *Observable) ackReply(sseq, dseq, dcount uint64) string { return fmt.Sprintf(o.ackReplyT, dcount, sseq, dseq) } +// deliverCurrentMsg is the hot path to deliver a message that was just received. +// Will return if the message was delivered or not. +func (o *Observable) deliverCurrentMsg(subj string, msg []byte, seq uint64) bool { + o.mu.Lock() + if seq != o.sseq { + o.mu.Unlock() + return false + } + + // If we are in push mode and not active let's stop sending. + if o.isPushMode() && !o.active { + o.mu.Unlock() + return false + } + + // If we are in pull mode and no one is waiting already break and wait. + if o.isPullMode() && len(o.waiting) == 0 { + o.mu.Unlock() + return false + } + + // Bump store sequence here. + o.sseq++ + + // If we are partitioned and we do not match, do not consider this a failure. + // Go ahead and return true. + if o.config.Partition != "" && subj != o.config.Partition { + o.mu.Unlock() + return true + } + var dsubj string + if len(o.waiting) > 0 { + dsubj = o.waiting[0] + o.waiting = append(o.waiting[:0], o.waiting[1:]...) + } else { + dsubj = o.dsubj + } + + if len(msg) > 0 { + msg = append(msg[:0:0], msg...) + } + + o.deliverMsg(dsubj, subj, msg, seq, 1) + o.mu.Unlock() + + return true +} + // Deliver a msg to the observable. // Lock should be held and o.mset validated to be non-nil. func (o *Observable) deliverMsg(dsubj, subj string, msg []byte, seq, dcount uint64) { @@ -1019,6 +1095,9 @@ func (o *Observable) stop(dflag bool) error { o.mu.Unlock() return nil } + + close(o.qch) + if o.store != nil { o.store.Stop() } @@ -1030,7 +1109,7 @@ func (o *Observable) stop(dflag bool) error { o.reqSub = nil stopAndClearTimer(&o.ptmr) stopAndClearTimer(&o.atmr) - close(o.qch) + o.mu.Unlock() mset.mu.Lock() diff --git a/server/store.go b/server/store.go index 5bbf3e09..8089c1a5 100644 --- a/server/store.go +++ b/server/store.go @@ -30,6 +30,8 @@ const ( FileStorage ) +var ErrStoreMsgNotFound = errors.New("no message found") + type MsgSetStore interface { StoreMsg(subj string, msg []byte) (uint64, error) LoadMsg(seq uint64) (subj string, msg []byte, ts int64, err error) @@ -203,7 +205,3 @@ func (rp *ReplayPolicy) UnmarshalJSON(data []byte) error { } return nil } - -var ( - ErrStoreMsgNotFound = errors.New("no message found") -) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 7dc1b0bd..3d277b3b 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -594,7 +594,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { t.Helper() nextMsg, err := nc.Request(o.RequestNextMsgSubject(), nil, time.Second) if err != nil { - t.Fatalf("Unexpected error: %v", err) + t.Fatalf("Unexpected error for seq %d: %v", seqno, err) } if nextMsg.Subject != "bar" { t.Fatalf("Expected subject of %q, got %q", "bar", nextMsg.Subject) @@ -615,7 +615,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { go func() { time.Sleep(nextDelay) - nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + nc.Request(sendSubj, []byte("Hello World!"), 100*time.Millisecond) }() start := time.Now() @@ -975,7 +975,6 @@ func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) { if _, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true}); err == nil { t.Fatalf("Expected an error on attempt for second observable for a workqueue") } - o.Delete() if numo := mset.NumObservables(); numo != 0 { @@ -1044,7 +1043,7 @@ func TestJetStreamWorkQueueAckWaitRedelivery(t *testing.T) { // Now load up some messages. toSend := 100 for i := 0; i < toSend; i++ { - resp, _ := nc.Request(c.mconfig.Name, []byte("Hello World!"), 50*time.Millisecond) + resp, _ := nc.Request(c.mconfig.Name, []byte("Hello World!"), 100*time.Millisecond) expectOKResponse(t, resp) } stats := mset.Stats() @@ -2706,3 +2705,70 @@ func TestJetStreamSimpleFileStorageRecovery(t *testing.T) { } } } + +func TestJetStreamPerf(t *testing.T) { + // Uncomment to run, holding place for now. + t.SkipNow() + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config == nil { + t.Fatalf("Expected non-nil config") + } + defer os.RemoveAll(config.StoreDir) + + acc := s.GlobalAccount() + + msetConfig := server.MsgSetConfig{ + Name: "MSET22", + Storage: server.FileStorage, + Subjects: []string{"foo"}, + } + + mset, err := acc.AddMsgSet(&msetConfig) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + var toSend = 1000000 + var received int + done := make(chan bool) + + delivery := "d" + //delivery = "foo" + + nc.Subscribe(delivery, func(m *nats.Msg) { + received++ + if received >= toSend { + done <- true + } + }) + nc.Flush() + + _, err = mset.AddObservable(&server.ObservableConfig{ + Delivery: delivery, + DeliverAll: true, + AckPolicy: server.AckNone, + }) + if err != nil { + t.Fatalf("Error creating observable: %v", err) + } + + payload := []byte("Hello World") + + start := time.Now() + + for i := 0; i < toSend; i++ { + nc.Publish("foo", payload) + } + + <-done + tt := time.Since(start) + fmt.Printf("time is %v\n", tt) + fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) +}