From 98b78d06c44f0fc0d654a8dd7a2e272029ec89db Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 8 Oct 2019 14:50:23 -0700 Subject: [PATCH] First pass pull mode, e.g. worker Signed-off-by: Derek Collison --- server/const.go | 2 +- server/jetstream.go | 2 + server/observable.go | 117 +++++++++++++++++++------ test/jetstream_test.go | 188 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 278 insertions(+), 31 deletions(-) diff --git a/server/const.go b/server/const.go index 9e35f382..dcc687ea 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.1.7" + VERSION = "2.2.0-beta" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream.go b/server/jetstream.go index 5b27d1b4..52fe8f21 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -84,6 +84,8 @@ const ( // JsAckPre is the prefix for the ack stream coming back to observable. JsAckPre = "$JS.A" + + JsReqPre = "$JS.M" ) // For easier handling of exports and imports. diff --git a/server/observable.go b/server/observable.go index c8c2e456..db764fbc 100644 --- a/server/observable.go +++ b/server/observable.go @@ -36,12 +36,12 @@ type ObservableConfig struct { type AckPolicy int const ( + // AckNone requires no acks for delivered messages. + AckNone AckPolicy = iota // AckExplicit requires ack or nack for all messages. - AckExplicit AckPolicy = iota + AckExplicit // When acking a sequence number, this implicitly acks all sequences below this one as well. AckAll - // AckNone requires no acks for delivered messages. - AckNone ) // Observable is a jetstream observable/subscriber. @@ -51,8 +51,10 @@ type Observable struct { mset *MsgSet seq uint64 dsubj string + reqSub *subscription ackSub *subscription ackReply string + waiting []string config ObservableConfig } @@ -60,16 +62,14 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) if config == nil { return nil, fmt.Errorf("observable config required") } - // For now expect a literal subject that is not empty. - // FIXME(dlc) - Empty == Worker mode - if config.Delivery == "" { - return nil, fmt.Errorf("observable delivery subject is empty") - } - if !subjectIsLiteral(config.Delivery) { - return nil, fmt.Errorf("observable delivery subject has wildcards") - } - if mset.deliveryFormsCycle(config.Delivery) { - return nil, fmt.Errorf("observable delivery subject forms a cycle") + // For now expect a literal subject if its not empty. Empty means work queue mode (pull mode). + if config.Delivery != _EMPTY_ { + if !subjectIsLiteral(config.Delivery) { + return nil, fmt.Errorf("observable delivery subject has wildcards") + } + if mset.deliveryFormsCycle(config.Delivery) { + return nil, fmt.Errorf("observable delivery subject forms a cycle") + } } // Check on start position conflicts. @@ -83,7 +83,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) } // Check if we are not durable that the delivery subject has interest. - if config.Durable == "" { + if config.Durable == _EMPTY_ && config.Delivery != _EMPTY_ { if mset.noInterest(config.Delivery) { return nil, fmt.Errorf("observable requires interest for delivery subject when ephemeral") } @@ -123,6 +123,13 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) } else { o.ackSub = sub } + // Setup the internal sub for individual message requests. + reqSubj := fmt.Sprintf("%s.%s.%s", JsReqPre, cn, o.name) + if sub, err := mset.subscribeInternal(reqSubj, o.processObservableMsgRequest); err != nil { + return nil, err + } else { + o.reqSub = sub + } mset.obs[o.name] = o mset.mu.Unlock() @@ -143,6 +150,34 @@ func (o *Observable) processObservableAck(_ *subscription, _ *client, subject, _ // No-op for now. } +func (o *Observable) processObservableMsgRequest(_ *subscription, _ *client, subject, reply string, msg []byte) { + o.mu.Lock() + mset := o.mset + if mset == nil { + o.mu.Unlock() + // FIXME(dlc) - send err? + return + } + // Determine which sequence number they are looking for. nil request means next message. + wantNextMsg := len(msg) == 0 + var seq uint64 + + if wantNextMsg { + seq = o.seq + } + // FIXME(dlc) - do actual sequence numbers. + subj, msg, _, err := mset.store.Lookup(seq) + if err == nil { + o.deliverMsgRequest(mset, reply, subj, msg, seq) + if seq == o.seq { + o.seq++ + } + } else if wantNextMsg { + o.waiting = append(o.waiting, reply) + } + o.mu.Unlock() +} + func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) { var mset *MsgSet for { @@ -150,29 +185,53 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) { if mset = o.msgSet(); mset == nil { return } + // Deliver all the msgs we have now, once done or on a condition, we wait. for { - seq := atomic.LoadUint64(&o.seq) + o.mu.Lock() + seq := o.seq subj, msg, _, err := mset.store.Lookup(seq) - if err == nil { - atomic.AddUint64(&o.seq, 1) - o.deliverMsg(mset, subj, msg, seq) - } else if err != ErrStoreMsgNotFound { - s.Warnf("Jetstream internal storage error on lookup: %v", err) - return - } else { + + if err != nil { + o.mu.Unlock() + if err != ErrStoreMsgNotFound { + s.Warnf("Jetstream internal storage error on lookup: %v", err) + return + } break } + + // We have the message. We need to check if we are in push mode or pull mode. + if o.config.Delivery != "" { + o.deliverMsg(mset, subj, msg, seq) + o.seq++ + } else if len(o.waiting) > 0 { + reply := o.waiting[0] + o.waiting = append(o.waiting[:0], o.waiting[1:]...) + o.deliverMsgRequest(mset, reply, subj, msg, seq) + o.seq++ + } else { + // No one waiting, let's break out and wait. + o.mu.Unlock() + break + } + o.mu.Unlock() } + // We will wait here for new messages to arrive. mset.waitForMsgs() } } -// Deliver a msg to the observable. +// Deliver a msg to the observable push delivery subject. func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint64) { mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg} } +// Deliver a msg to the msg request subject. +func (o *Observable) deliverMsgRequest(mset *MsgSet, dsubj, subj string, msg []byte, seq uint64) { + mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg} +} + // SeqFromReply will extract a sequence number from a reply ack subject. func (o *Observable) SeqFromReply(reply string) (seq uint64) { n, err := fmt.Sscanf(reply, o.ackReply, &seq) @@ -182,6 +241,11 @@ func (o *Observable) SeqFromReply(reply string) (seq uint64) { return } +// NextSeq returns the next delivered sequence number for this observable. +func (o *Observable) NextSeq() uint64 { + return atomic.LoadUint64(&o.seq) +} + // Will select the starting sequence. func (o *Observable) selectStartingSeqNo() { stats := o.mset.Stats() @@ -203,7 +267,9 @@ func (o *Observable) selectStartingSeqNo() { o.seq = o.config.StartSeq } - if o.seq < stats.FirstSeq { + if stats.FirstSeq == 0 { + o.seq = 1 + } else if o.seq < stats.FirstSeq { o.seq = stats.FirstSeq } else if o.seq > stats.LastSeq { o.seq = stats.LastSeq + 1 @@ -237,7 +303,9 @@ func (o *Observable) Delete() error { mset := o.mset o.mset = nil ackSub := o.ackSub + reqSub := o.reqSub o.ackSub = nil + o.reqSub = nil o.mu.Unlock() if mset == nil { @@ -251,6 +319,7 @@ func (o *Observable) Delete() error { // performance wise. mset.sg.Broadcast() mset.unsubscribe(ackSub) + mset.unsubscribe(reqSub) delete(mset.obs, o.name) mset.mu.Unlock() diff --git a/test/jetstream_test.go b/test/jetstream_test.go index fe164cfa..4369e580 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -17,6 +17,8 @@ import ( "fmt" "os" "path/filepath" + "sync" + "sync/atomic" "testing" "time" @@ -227,12 +229,9 @@ func TestJetStreamCreateObservable(t *testing.T) { t.Fatalf("Expected an error for no config") } - // Check for delivery subject errors. - // Empty delivery subject - if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: ""}); err == nil { - t.Fatalf("Expected an error on empty delivery subject") - } - // No literal delivery subject allowed. + // Check for delivery subject errors. + + // Literal delivery subject required. if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "foo.*"}); err == nil { t.Fatalf("Expected an error on bad delivery subject") } @@ -424,3 +423,180 @@ func TestJetStreamBasicDelivery(t *testing.T) { checkMsgs(101) } + +func TestJetStreamBasicWorkQueue(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mname := "MY_MSG_SET" + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + // Create basic work queue mode observable. + oname := "WQ" + o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + if o.NextSeq() != 1 { + t.Fatalf("Expected to be starting at sequence 1") + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Now load up some messages. + toSend := 100 + sendSubj := "bar" + for i := 0; i < toSend; i++ { + resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + stats := mset.Stats() + if stats.Msgs != uint64(toSend) { + t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs) + } + + // For normal work queue semantics, you send requests to the subject with message set and observable name. + reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname) + + // FIXME(dlc) - Right now this will *not* work with new style nc.Request(). + // The new Request() mux in client needs the original subject to de-mux. Will panic. + // Working on a fix, but for now revert back to old style. + nc.Opts.UseOldRequestStyle = true + + getNext := func(seqno int) { + t.Helper() + nextMsg, err := nc.Request(reqMsgSubj, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if nextMsg.Subject != "bar" { + t.Fatalf("Expected subject of %q, got %q", "bar", nextMsg.Subject) + } + if seq := o.SeqFromReply(nextMsg.Reply); seq != uint64(seqno) { + t.Fatalf("Expected sequence of %d , got %d", seqno, seq) + } + } + + // Make sure we can get the messages already there. + for i := 1; i <= toSend; i++ { + getNext(i) + } + + // Now we want to make sure we can get a message that is published to the message + // set as we are waiting for it. + nextDelay := 100 * time.Millisecond + + go func() { + time.Sleep(nextDelay) + nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + }() + + start := time.Now() + getNext(toSend + 1) + if time.Since(start) < nextDelay { + t.Fatalf("Received message too quickly") + } + + // Now do same thing but combine waiting for new ones with sending. + go func() { + time.Sleep(nextDelay) + for i := 0; i < toSend; i++ { + nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + time.Sleep(5 * time.Millisecond) + } + }() + + for i := toSend + 2; i < toSend*2+2; i++ { + getNext(i) + } +} + +func TestJetStreamWorkQueueLoadBalance(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mname := "MY_MSG_SET" + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + // Create basic work queue mode observable. + oname := "WQ" + o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + // For normal work queue semantics, you send requests to the subject with message set and observable name. + reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname) + + numWorkers := 25 + counts := make([]int32, numWorkers) + var received int32 + + wg := &sync.WaitGroup{} + wg.Add(numWorkers) + + dwg := &sync.WaitGroup{} + dwg.Add(numWorkers) + + toSend := 1000 + + for i := 0; i < numWorkers; i++ { + nc := clientConnectToServer(t, s) + defer nc.Close() + + go func(index int32) { + counter := &counts[index] + // Signal we are ready + wg.Done() + defer dwg.Done() + + for { + if _, err := nc.Request(reqMsgSubj, nil, 50*time.Millisecond); err != nil { + return + } + atomic.AddInt32(counter, 1) + if total := atomic.AddInt32(&received, 1); total >= int32(toSend) { + return + } + } + }(int32(i)) + } + + // To send messages. + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Wait for requestors. + wg.Wait() + + sendSubj := "bar" + for i := 0; i < toSend; i++ { + resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + + // Wait for test to complete. + dwg.Wait() + + target := toSend / numWorkers + delta := target / 3 + low, high := int32(target-delta), int32(target+delta) + + for i := 0; i < numWorkers; i++ { + if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high { + t.Fatalf("Messages received for worker too far off from target of %d, got %d", target, msgs) + } + } +}