From cd3c1c7a3f769cf28493e4236d690eefee23dae5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 8 Oct 2019 18:09:17 -0700 Subject: [PATCH] Basic partitioning Signed-off-by: Derek Collison --- server/observable.go | 71 ++++++++++++++++++---- test/jetstream_test.go | 134 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 189 insertions(+), 16 deletions(-) diff --git a/server/observable.go b/server/observable.go index db764fbc..ae3d4ff0 100644 --- a/server/observable.go +++ b/server/observable.go @@ -30,6 +30,7 @@ type ObservableConfig struct { DeliverAll bool `json:"deliver_all,omitempty"` DeliverLast bool `json:"deliver_last,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` + Partition string `json:"partition"` } // AckPolicy determines how the observable shoulc acknowledge delivered messages. @@ -50,6 +51,7 @@ type Observable struct { name string mset *MsgSet seq uint64 + dseq uint64 dsubj string reqSub *subscription ackSub *subscription @@ -72,6 +74,17 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) } } + // Make sure any partition subject is also a literal. + if config.Partition != "" { + if !subjectIsLiteral(config.Partition) { + return nil, fmt.Errorf("observable partition subject has wildcards") + } + // Make sure this is a valid partition of the interest subjects. + if !mset.validPartition(config.Partition) { + return nil, fmt.Errorf("observable partition not a valid subset of the interest subjects") + } + } + // Check on start position conflicts. noTime := time.Time{} if config.StartSeq > 0 && (config.StartTime != noTime || config.DeliverAll || config.DeliverLast) { @@ -166,14 +179,24 @@ func (o *Observable) processObservableMsgRequest(_ *subscription, _ *client, sub seq = o.seq } // FIXME(dlc) - do actual sequence numbers. - subj, msg, _, err := mset.store.Lookup(seq) - if err == nil { - o.deliverMsgRequest(mset, reply, subj, msg, seq) - if seq == o.seq { - o.seq++ + // We do loop here in case we are partitioned. + for { + subj, msg, _, err := mset.store.Lookup(seq) + if err == nil { + if o.config.Partition != "" && subj != o.config.Partition { + o.seq++ + seq = o.seq + continue + } + o.deliverMsgRequest(mset, reply, subj, msg, o.dseq) + if wantNextMsg { + o.incSeqs() + } + break + } else if wantNextMsg { + o.waiting = append(o.waiting, reply) + break } - } else if wantNextMsg { - o.waiting = append(o.waiting, reply) } o.mu.Unlock() } @@ -186,12 +209,13 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) { return } - // Deliver all the msgs we have now, once done or on a condition, we wait. + // Deliver all the msgs we have now, once done or on a condition, we wait for new ones. for { o.mu.Lock() seq := o.seq subj, msg, _, err := mset.store.Lookup(seq) + // On error either break or return. if err != nil { o.mu.Unlock() if err != ErrStoreMsgNotFound { @@ -202,14 +226,21 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) { } // We have the message. We need to check if we are in push mode or pull mode. - if o.config.Delivery != "" { - o.deliverMsg(mset, subj, msg, seq) + // Also need to check if we have a partition filter. + if o.config.Partition != "" && subj != o.config.Partition { o.seq++ + o.mu.Unlock() + continue + } + + if o.config.Delivery != "" { + o.deliverMsg(mset, subj, msg, o.dseq) + o.incSeqs() } else if len(o.waiting) > 0 { reply := o.waiting[0] o.waiting = append(o.waiting[:0], o.waiting[1:]...) - o.deliverMsgRequest(mset, reply, subj, msg, seq) - o.seq++ + o.deliverMsgRequest(mset, reply, subj, msg, o.dseq) + o.incSeqs() } else { // No one waiting, let's break out and wait. o.mu.Unlock() @@ -222,6 +253,13 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) { } } +// Advance the sequence numbers. +// Lock should be held. +func (o *Observable) incSeqs() { + o.seq++ + o.dseq++ +} + // Deliver a msg to the observable push delivery subject. func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint64) { mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg} @@ -274,6 +312,8 @@ func (o *Observable) selectStartingSeqNo() { } else if o.seq > stats.LastSeq { o.seq = stats.LastSeq + 1 } + // Set deliveryt sequence to be the same to start. + o.dseq = o.seq } // Test whether a config represents a durable subscriber. @@ -346,6 +386,8 @@ func (mset *MsgSet) noInterest(delivery string) bool { return len(r.psubs)+len(r.qsubs) == 0 } +// Check that we do not form a cycle by delivering to a delivery subject +// that is part of the interest group. func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool { mset.mu.Lock() defer mset.mu.Unlock() @@ -357,3 +399,8 @@ func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool { } return false } + +// This is same as check for delivery cycle. +func (mset *MsgSet) validPartition(partitionSubject string) bool { + return mset.deliveryFormsCycle(partitionSubject) +} diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 4369e580..958c3547 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -537,6 +537,10 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { } defer o.Delete() + // To send messages. + nc := clientConnectToServer(t, s) + defer nc.Close() + // For normal work queue semantics, you send requests to the subject with message set and observable name. reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname) @@ -574,10 +578,6 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { }(int32(i)) } - // To send messages. - nc := clientConnectToServer(t, s) - defer nc.Close() - // Wait for requestors. wg.Wait() @@ -600,3 +600,129 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { } } } + +func TestJetStreamPartitioning(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "MSET", Subjects: []string{"foo.*"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc := clientConnectToServer(t, s) + defer nc.Close() + + toSend := 50 + subjA := "foo.A" + subjB := "foo.B" + + for i := 0; i < toSend; i++ { + resp, _ := nc.Request(subjA, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + resp, _ = nc.Request(subjB, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + stats := mset.Stats() + if stats.Msgs != uint64(toSend*2) { + t.Fatalf("Expected %d messages, got %d", toSend*2, stats.Msgs) + } + + delivery := nats.NewInbox() + sub, _ := nc.SubscribeSync(delivery) + defer sub.Unsubscribe() + nc.Flush() + + o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, Partition: subjB, DeliverAll: true}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + // Now let's check the messages + for i := 1; i <= toSend; i++ { + m, err := sub.NextMsg(time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // JetStream will have the subject match the stream subject, not delivery subject. + // We want these to only be subjB. + if m.Subject != subjB { + t.Fatalf("Expected original subject of %q, but got %q", subjB, m.Subject) + } + // Now check that reply subject exists and has a sequence as the last token. + if seq := o.SeqFromReply(m.Reply); seq != uint64(i) { + t.Fatalf("Expected sequence of %d , got %d", i, seq) + } + // Ack the message here. + m.Respond(nil) + } + + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 0 { + t.Fatalf("Expected sub to have no pending") + } +} + +func TestJetStreamWorkQueuePartitioning(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mname := "MY_MSG_SET" + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo.*"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc := clientConnectToServer(t, s) + defer nc.Close() + + toSend := 50 + subjA := "foo.A" + subjB := "foo.B" + + for i := 0; i < toSend; i++ { + resp, _ := nc.Request(subjA, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + resp, _ = nc.Request(subjB, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + stats := mset.Stats() + if stats.Msgs != uint64(toSend*2) { + t.Fatalf("Expected %d messages, got %d", toSend*2, stats.Msgs) + } + + oname := "WQ" + o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, Partition: subjA, DeliverAll: true}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + if o.NextSeq() != 1 { + t.Fatalf("Expected to be starting at sequence 1") + } + + // For normal work queue semantics, you send requests to the subject with message set and observable name. + reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname) + + getNext := func(seqno int) { + t.Helper() + nextMsg, err := nc.Request(reqMsgSubj, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if nextMsg.Subject != subjA { + t.Fatalf("Expected subject of %q, got %q", subjA, nextMsg.Subject) + } + if seq := o.SeqFromReply(nextMsg.Reply); seq != uint64(seqno) { + t.Fatalf("Expected sequence of %d , got %d", seqno, seq) + } + } + + // Make sure we can get the messages already there. + for i := 1; i <= toSend; i++ { + getNext(i) + } +}