First pass pull mode, e.g. worker

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-08 14:50:23 -07:00
parent 9214fa2995
commit 98b78d06c4
4 changed files with 278 additions and 31 deletions

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.1.7"
VERSION = "2.2.0-beta"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -84,6 +84,8 @@ const (
// JsAckPre is the prefix for the ack stream coming back to observable.
JsAckPre = "$JS.A"
JsReqPre = "$JS.M"
)
// For easier handling of exports and imports.

View File

@@ -36,12 +36,12 @@ type ObservableConfig struct {
type AckPolicy int
const (
// AckNone requires no acks for delivered messages.
AckNone AckPolicy = iota
// AckExplicit requires ack or nack for all messages.
AckExplicit AckPolicy = iota
AckExplicit
// When acking a sequence number, this implicitly acks all sequences below this one as well.
AckAll
// AckNone requires no acks for delivered messages.
AckNone
)
// Observable is a jetstream observable/subscriber.
@@ -51,8 +51,10 @@ type Observable struct {
mset *MsgSet
seq uint64
dsubj string
reqSub *subscription
ackSub *subscription
ackReply string
waiting []string
config ObservableConfig
}
@@ -60,16 +62,14 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
if config == nil {
return nil, fmt.Errorf("observable config required")
}
// For now expect a literal subject that is not empty.
// FIXME(dlc) - Empty == Worker mode
if config.Delivery == "" {
return nil, fmt.Errorf("observable delivery subject is empty")
}
if !subjectIsLiteral(config.Delivery) {
return nil, fmt.Errorf("observable delivery subject has wildcards")
}
if mset.deliveryFormsCycle(config.Delivery) {
return nil, fmt.Errorf("observable delivery subject forms a cycle")
// For now expect a literal subject if its not empty. Empty means work queue mode (pull mode).
if config.Delivery != _EMPTY_ {
if !subjectIsLiteral(config.Delivery) {
return nil, fmt.Errorf("observable delivery subject has wildcards")
}
if mset.deliveryFormsCycle(config.Delivery) {
return nil, fmt.Errorf("observable delivery subject forms a cycle")
}
}
// Check on start position conflicts.
@@ -83,7 +83,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
}
// Check if we are not durable that the delivery subject has interest.
if config.Durable == "" {
if config.Durable == _EMPTY_ && config.Delivery != _EMPTY_ {
if mset.noInterest(config.Delivery) {
return nil, fmt.Errorf("observable requires interest for delivery subject when ephemeral")
}
@@ -123,6 +123,13 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
} else {
o.ackSub = sub
}
// Setup the internal sub for individual message requests.
reqSubj := fmt.Sprintf("%s.%s.%s", JsReqPre, cn, o.name)
if sub, err := mset.subscribeInternal(reqSubj, o.processObservableMsgRequest); err != nil {
return nil, err
} else {
o.reqSub = sub
}
mset.obs[o.name] = o
mset.mu.Unlock()
@@ -143,6 +150,34 @@ func (o *Observable) processObservableAck(_ *subscription, _ *client, subject, _
// No-op for now.
}
func (o *Observable) processObservableMsgRequest(_ *subscription, _ *client, subject, reply string, msg []byte) {
o.mu.Lock()
mset := o.mset
if mset == nil {
o.mu.Unlock()
// FIXME(dlc) - send err?
return
}
// Determine which sequence number they are looking for. nil request means next message.
wantNextMsg := len(msg) == 0
var seq uint64
if wantNextMsg {
seq = o.seq
}
// FIXME(dlc) - do actual sequence numbers.
subj, msg, _, err := mset.store.Lookup(seq)
if err == nil {
o.deliverMsgRequest(mset, reply, subj, msg, seq)
if seq == o.seq {
o.seq++
}
} else if wantNextMsg {
o.waiting = append(o.waiting, reply)
}
o.mu.Unlock()
}
func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
var mset *MsgSet
for {
@@ -150,29 +185,53 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
if mset = o.msgSet(); mset == nil {
return
}
// Deliver all the msgs we have now, once done or on a condition, we wait.
for {
seq := atomic.LoadUint64(&o.seq)
o.mu.Lock()
seq := o.seq
subj, msg, _, err := mset.store.Lookup(seq)
if err == nil {
atomic.AddUint64(&o.seq, 1)
o.deliverMsg(mset, subj, msg, seq)
} else if err != ErrStoreMsgNotFound {
s.Warnf("Jetstream internal storage error on lookup: %v", err)
return
} else {
if err != nil {
o.mu.Unlock()
if err != ErrStoreMsgNotFound {
s.Warnf("Jetstream internal storage error on lookup: %v", err)
return
}
break
}
// We have the message. We need to check if we are in push mode or pull mode.
if o.config.Delivery != "" {
o.deliverMsg(mset, subj, msg, seq)
o.seq++
} else if len(o.waiting) > 0 {
reply := o.waiting[0]
o.waiting = append(o.waiting[:0], o.waiting[1:]...)
o.deliverMsgRequest(mset, reply, subj, msg, seq)
o.seq++
} else {
// No one waiting, let's break out and wait.
o.mu.Unlock()
break
}
o.mu.Unlock()
}
// We will wait here for new messages to arrive.
mset.waitForMsgs()
}
}
// Deliver a msg to the observable.
// Deliver a msg to the observable push delivery subject.
func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint64) {
mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
}
// Deliver a msg to the msg request subject.
func (o *Observable) deliverMsgRequest(mset *MsgSet, dsubj, subj string, msg []byte, seq uint64) {
mset.sendq <- &jsPubMsg{dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
}
// SeqFromReply will extract a sequence number from a reply ack subject.
func (o *Observable) SeqFromReply(reply string) (seq uint64) {
n, err := fmt.Sscanf(reply, o.ackReply, &seq)
@@ -182,6 +241,11 @@ func (o *Observable) SeqFromReply(reply string) (seq uint64) {
return
}
// NextSeq returns the next delivered sequence number for this observable.
func (o *Observable) NextSeq() uint64 {
return atomic.LoadUint64(&o.seq)
}
// Will select the starting sequence.
func (o *Observable) selectStartingSeqNo() {
stats := o.mset.Stats()
@@ -203,7 +267,9 @@ func (o *Observable) selectStartingSeqNo() {
o.seq = o.config.StartSeq
}
if o.seq < stats.FirstSeq {
if stats.FirstSeq == 0 {
o.seq = 1
} else if o.seq < stats.FirstSeq {
o.seq = stats.FirstSeq
} else if o.seq > stats.LastSeq {
o.seq = stats.LastSeq + 1
@@ -237,7 +303,9 @@ func (o *Observable) Delete() error {
mset := o.mset
o.mset = nil
ackSub := o.ackSub
reqSub := o.reqSub
o.ackSub = nil
o.reqSub = nil
o.mu.Unlock()
if mset == nil {
@@ -251,6 +319,7 @@ func (o *Observable) Delete() error {
// performance wise.
mset.sg.Broadcast()
mset.unsubscribe(ackSub)
mset.unsubscribe(reqSub)
delete(mset.obs, o.name)
mset.mu.Unlock()

View File

@@ -17,6 +17,8 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
@@ -227,12 +229,9 @@ func TestJetStreamCreateObservable(t *testing.T) {
t.Fatalf("Expected an error for no config")
}
// Check for delivery subject errors.
// Empty delivery subject
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: ""}); err == nil {
t.Fatalf("Expected an error on empty delivery subject")
}
// No literal delivery subject allowed.
// Check for delivery subject errors.
// Literal delivery subject required.
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "foo.*"}); err == nil {
t.Fatalf("Expected an error on bad delivery subject")
}
@@ -424,3 +423,180 @@ func TestJetStreamBasicDelivery(t *testing.T) {
checkMsgs(101)
}
func TestJetStreamBasicWorkQueue(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mname := "MY_MSG_SET"
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
// Create basic work queue mode observable.
oname := "WQ"
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
if o.NextSeq() != 1 {
t.Fatalf("Expected to be starting at sequence 1")
}
nc := clientConnectToServer(t, s)
defer nc.Close()
// Now load up some messages.
toSend := 100
sendSubj := "bar"
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
stats := mset.Stats()
if stats.Msgs != uint64(toSend) {
t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs)
}
// For normal work queue semantics, you send requests to the subject with message set and observable name.
reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname)
// FIXME(dlc) - Right now this will *not* work with new style nc.Request().
// The new Request() mux in client needs the original subject to de-mux. Will panic.
// Working on a fix, but for now revert back to old style.
nc.Opts.UseOldRequestStyle = true
getNext := func(seqno int) {
t.Helper()
nextMsg, err := nc.Request(reqMsgSubj, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if nextMsg.Subject != "bar" {
t.Fatalf("Expected subject of %q, got %q", "bar", nextMsg.Subject)
}
if seq := o.SeqFromReply(nextMsg.Reply); seq != uint64(seqno) {
t.Fatalf("Expected sequence of %d , got %d", seqno, seq)
}
}
// Make sure we can get the messages already there.
for i := 1; i <= toSend; i++ {
getNext(i)
}
// Now we want to make sure we can get a message that is published to the message
// set as we are waiting for it.
nextDelay := 100 * time.Millisecond
go func() {
time.Sleep(nextDelay)
nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
}()
start := time.Now()
getNext(toSend + 1)
if time.Since(start) < nextDelay {
t.Fatalf("Received message too quickly")
}
// Now do same thing but combine waiting for new ones with sending.
go func() {
time.Sleep(nextDelay)
for i := 0; i < toSend; i++ {
nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
time.Sleep(5 * time.Millisecond)
}
}()
for i := toSend + 2; i < toSend*2+2; i++ {
getNext(i)
}
}
func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mname := "MY_MSG_SET"
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
// Create basic work queue mode observable.
oname := "WQ"
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
// For normal work queue semantics, you send requests to the subject with message set and observable name.
reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname)
numWorkers := 25
counts := make([]int32, numWorkers)
var received int32
wg := &sync.WaitGroup{}
wg.Add(numWorkers)
dwg := &sync.WaitGroup{}
dwg.Add(numWorkers)
toSend := 1000
for i := 0; i < numWorkers; i++ {
nc := clientConnectToServer(t, s)
defer nc.Close()
go func(index int32) {
counter := &counts[index]
// Signal we are ready
wg.Done()
defer dwg.Done()
for {
if _, err := nc.Request(reqMsgSubj, nil, 50*time.Millisecond); err != nil {
return
}
atomic.AddInt32(counter, 1)
if total := atomic.AddInt32(&received, 1); total >= int32(toSend) {
return
}
}
}(int32(i))
}
// To send messages.
nc := clientConnectToServer(t, s)
defer nc.Close()
// Wait for requestors.
wg.Wait()
sendSubj := "bar"
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
// Wait for test to complete.
dwg.Wait()
target := toSend / numWorkers
delta := target / 3
low, high := int32(target-delta), int32(target+delta)
for i := 0; i < numWorkers; i++ {
if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high {
t.Fatalf("Messages received for worker too far off from target of %d, got %d", target, msgs)
}
}
}