From 0a36706958b635bec592c9d893f0e0507f19daec Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 13 Apr 2020 17:20:24 -0700 Subject: [PATCH] PubAck details that provide stream name and sequence assigned Signed-off-by: Derek Collison --- server/stream.go | 24 ++++++++++++++++++- test/jetstream_test.go | 52 +++++++++++++++++++++++++++++++++++++++++- test/norace_test.go | 3 +-- 3 files changed, 75 insertions(+), 4 deletions(-) diff --git a/server/stream.go b/server/stream.go index b3b3de63..7b8f05e0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -16,6 +16,7 @@ package server import ( "encoding/json" "fmt" + "math" "path" "strconv" "sync" @@ -39,6 +40,14 @@ type StreamConfig struct { Template string `json:"template_owner,omitempty"` } +// PubAck is the detail you get back from a publish to a stream that was successful. +// e.g. +OK {"stream": "my_stream", "seq": 22} +type PubAck struct { + Stream string `json:"stream"` + Seq uint64 `json:"seq"` +} + +// StreamInfo shows config and current state for this stream. type StreamInfo struct { Config StreamConfig `json:"config"` State StreamState `json:"state"` @@ -53,6 +62,7 @@ type Stream struct { jsa *jsAccount client *client sid int + pubAck []byte sendq chan *jsPubMsg store StreamStore consumers map[string]*Consumer @@ -137,6 +147,14 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo return nil, err } + // Create our pubAck here. This will be reused and for +OK will contain JSON + // for stream name and sequence. + longestSeq := strconv.FormatUint(math.MaxUint64, 10) + lpubAck := len(AckAck) + len(cfg.Name) + len("{\"stream\": ,\"seq\": }") + len(longestSeq) + mset.pubAck = make([]byte, 0, lpubAck) + mset.pubAck = append(mset.pubAck, AckAck...) + mset.pubAck = append(mset.pubAck, fmt.Sprintf(" {\"stream\": %q, \"seq\": ", cfg.Name)...) + return mset, nil } @@ -507,6 +525,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje accName = c.acc.Name } doAck := !mset.config.NoAck + pubAck := mset.pubAck jsa := mset.jsa stype := mset.config.Storage name := mset.config.Name @@ -519,7 +538,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje } // Response to send. - response := AckAck + var response []byte var seq uint64 var err error @@ -538,6 +557,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje response = []byte("-ERR 'resource limits exceeded for account'") store.RemoveMsg(seq) seq = 0 + } else if err == nil && doAck && len(reply) > 0 { + response = append(pubAck, strconv.FormatUint(seq, 10)...) + response = append(response, '}') } } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index fac0f7aa..0ebe074e 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -14,6 +14,7 @@ package test import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -221,6 +222,55 @@ func TestJetStreamAddStream(t *testing.T) { } } +func TestJetStreamPubAck(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + sname := "PUBACK" + acc := s.GlobalAccount() + mconfig := &server.StreamConfig{Name: sname, Subjects: []string{"foo"}, Storage: server.MemoryStorage} + mset, err := acc.AddStream(mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + checkRespDetails := func(resp *nats.Msg, err error, seq uint64) { + if err != nil { + t.Fatalf("Unexpected error from send stream msg: %v", err) + } + if resp == nil { + t.Fatalf("No response from send stream msg") + } + if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { + t.Fatalf("Did not get a correct response: %q", resp.Data) + } + var pubAck server.PubAck + if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if pubAck.Stream != sname { + t.Fatalf("Expected %q for stream name, got %q", sname, pubAck.Stream) + } + if pubAck.Seq != seq { + t.Fatalf("Expected %d for sequence, got %d", seq, pubAck.Seq) + } + } + + // Send messages and make sure pubAck details are correct. + for i := uint64(1); i <= 1000; i++ { + resp, err := nc.Request("foo", []byte("HELLO"), 100*time.Millisecond) + checkRespDetails(resp, err, i) + } +} + func TestJetStreamConsumerWithStartTime(t *testing.T) { subj := "my_stream" cases := []struct { @@ -588,7 +638,7 @@ func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) { if resp == nil { t.Fatalf("No response, possible timeout?") } - if string(resp.Data) != server.OK { + if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data) } } diff --git a/test/norace_test.go b/test/norace_test.go index 39ec0506..04bcf816 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -604,8 +604,7 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { sendSubj := "bar" for i := 0; i < toSend; i++ { - resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) - expectOKResponse(t, resp) + sendStreamMsg(t, nc, sendSubj, "Hello World!") } // Wait for test to complete.