Allow to check for last sequence and last msgid for conditional publish

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-11-22 15:02:17 -08:00
parent 4d51a41dfd
commit a50f96461b
2 changed files with 203 additions and 29 deletions

View File

@@ -52,11 +52,19 @@ type StreamConfig struct {
Duplicates time.Duration `json:"duplicate_window,omitempty"`
}
const JSApiPubAckResponseType = "io.nats.jetstream.api.v1.pub_ack_response"
// JSPubAckResponse is a formal response to a publish operation.
type JSPubAckResponse struct {
ApiResponse
*PubAck
}
// PubAck is the detail you get back from a publish to a stream that was successful.
// e.g. +OK {"stream": "Orders", "seq": 22}
type PubAck struct {
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
Sequence uint64 `json:"seq"`
Duplicate bool `json:"duplicate,omitempty"`
}
@@ -77,6 +85,8 @@ type Stream struct {
pubAck []byte
sendq chan *jsPubMsg
store StreamStore
lseq uint64
lmsgId string
consumers map[string]*Consumer
numFilter int
config StreamConfig
@@ -87,9 +97,13 @@ type Stream struct {
ddtmr *time.Timer
}
// JSPubId is used for identifying published messages and performing de-duplication.
const JSPubId = "Msg-Id"
const StreamDefaultDuplicatesWindow = 2 * time.Minute
// Headers for published messages.
const (
JSMsgId = "Nats-Msg-Id"
JSExpectedStream = "Nats-Expected-Stream"
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
)
// Dedupe entry
type ddentry struct {
@@ -270,14 +284,12 @@ func (mset *Stream) autoTuneFileStorageBlockSize(fsCfg *FileStoreConfig) {
}
// rebuildDedupe will rebuild any dedupe structures needed after recovery of a stream.
// Lock not needed, only called during initialization.
// TODO(dlc) - Might be good to know if this should be checked at all for streams with no
// headers and msgId in them. Would need signaling from the storage layer.
func (mset *Stream) rebuildDedupe() {
state := mset.store.State()
if state.Msgs == 0 {
return
}
mset.lseq = state.LastSeq
// We have some messages. Lookup starting sequence by duplicate time window.
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.config.Duplicates))
if sseq == 0 {
@@ -286,11 +298,15 @@ func (mset *Stream) rebuildDedupe() {
for seq := sseq; seq <= state.LastSeq; seq++ {
_, hdr, _, ts, err := mset.store.LoadMsg(seq)
var msgId string
if err == nil && len(hdr) > 0 {
if msgId := getMsgId(hdr); msgId != "" {
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
}
if seq == state.LastSeq {
mset.lmsgId = msgId
}
}
}
@@ -399,6 +415,9 @@ func (jsa *jsAccount) subjectsOverlap(subjects []string) bool {
return false
}
// Default duplicates window.
const StreamDefaultDuplicatesWindow = 2 * time.Minute
func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
if config == nil {
return StreamConfig{}, fmt.Errorf("stream configuration invalid")
@@ -861,7 +880,26 @@ func getHdrVal(key string, hdr []byte) []byte {
// Fast lookup of msgId.
func getMsgId(hdr []byte) string {
return string(getHdrVal(JSPubId, hdr))
return string(getHdrVal(JSMsgId, hdr))
}
// Fast lookup of expected last msgId.
func getExpectedLastMsgId(hdr []byte) string {
return string(getHdrVal(JSExpectedLastMsgId, hdr))
}
// Fast lookup of expected stream.
func getExpectedStream(hdr []byte) string {
return string(getHdrVal(JSExpectedStream, hdr))
}
// Fast lookup of expected stream.
func getExpectedLastSeq(hdr []byte) uint64 {
bseq := getHdrVal(JSExpectedLastSeq, hdr)
if len(bseq) == 0 {
return 0
}
return uint64(parseInt64(bseq))
}
// processInboundJetStreamMsg handles processing messages bound for a stream.
@@ -883,17 +921,48 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
numConsumers := len(mset.consumers)
interestRetention := mset.config.Retention == InterestPolicy
// Process msgId if we have headers.
// Process msg headers if present.
var msgId string
if pc != nil && pc.pa.hdr > 0 {
msgId = getMsgId(msg[:pc.pa.hdr])
hdr := msg[:pc.pa.hdr]
msgId = getMsgId(hdr)
sendq := mset.sendq
if dde := mset.checkMsgId(msgId); dde != nil {
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ", \"duplicate\": true}"...)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
return
}
// Expected stream.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := []byte("-ERR 'wrong expected stream'")
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
return
}
// Expected last sequence.
if seq := getExpectedLastSeq(hdr); seq > 0 && seq != mset.lseq {
lseq := mset.lseq
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := []byte(fmt.Sprintf("-ERR 'wrong last sequence: %d'", lseq))
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
return
}
// Expected last msgId.
if lmsgId := getExpectedLastMsgId(hdr); lmsgId != _EMPTY_ && lmsgId != mset.lmsgId {
last := mset.lmsgId
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response := []byte(fmt.Sprintf("-ERR 'wrong last msg ID: %s'", last))
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
return
}
}
@@ -941,13 +1010,15 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
}
}
}
mset.mu.Unlock()
// Skip here.
// Skip msg here.
if noInterest {
seq = store.SkipMsg()
mset.lseq = store.SkipMsg()
mset.lmsgId = msgId
mset.mu.Unlock()
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...)
response = append(response, '}')
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
@@ -959,12 +1030,20 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
}
// If here we will attempt to store the message.
// Headers.
// Check for headers.
if pc != nil && pc.pa.hdr > 0 {
hdr = msg[:pc.pa.hdr]
msg = msg[pc.pa.hdr:]
}
seq, ts, err = store.StoreMsg(subject, hdr, msg)
if err == nil && seq > 0 {
mset.lseq = seq
mset.lmsgId = msgId
}
// We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions.
mset.mu.Unlock()
if err != nil {
if err != ErrStoreClosed {
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
@@ -976,14 +1055,14 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
store.RemoveMsg(seq)
seq = 0
} else {
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
// If we have a msgId make sure to save.
if msgId != "" {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
}
// Send response here.

View File

@@ -392,8 +392,8 @@ func TestJetStreamPubAck(t *testing.T) {
if pubAck.Stream != sname {
t.Fatalf("Expected %q for stream name, got %q", sname, pubAck.Stream)
}
if pubAck.Seq != seq {
t.Fatalf("Expected %d for sequence, got %d", seq, pubAck.Seq)
if pubAck.Sequence != seq {
t.Fatalf("Expected %d for sequence, got %d", seq, pubAck.Sequence)
}
}
@@ -3344,7 +3344,7 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
sendMsg := func(seq uint64, id, msg string) *server.PubAck {
t.Helper()
m := nats.NewMsg(fmt.Sprintf("foo.%d", seq))
m.Header.Add(server.JSPubId, id)
m.Header.Add(server.JSMsgId, id)
m.Data = []byte(msg)
resp, _ := nc.RequestMsg(m, 100*time.Millisecond)
if resp == nil {
@@ -3357,8 +3357,8 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if pubAck.Seq != seq {
t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pubAck.Seq)
if pubAck.Sequence != seq {
t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pubAck.Sequence)
}
return &pubAck
}
@@ -3415,7 +3415,6 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
if err := mset.Update(&cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
mset.Purge()
// Send 5 new messages.
@@ -3465,6 +3464,102 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
nmids(0)
}
func TestJetStreamPublishExpect(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "EXPECT"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage, MaxAge: time.Hour, Subjects: []string{"foo.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Test that we get no error when expected stream is correct.
m := nats.NewMsg("foo.bar")
m.Data = []byte("HELLO")
m.Header.Set(server.JSExpectedStream, mname)
resp, err := nc.RequestMsg(m, 100*time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
}
// Now test that we get an error back when expecting a different stream.
m.Header.Set(server.JSExpectedStream, "ORDERS")
resp, err = nc.RequestMsg(m, 100*time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) {
t.Fatalf("Expected an error, got %q", resp.Data)
}
// Now test that we get an error back when expecting a different sequence number.
m.Header.Set(server.JSExpectedStream, mname)
m.Header.Set(server.JSExpectedLastSeq, "10")
resp, err = nc.RequestMsg(m, 100*time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) {
t.Fatalf("Expected an error, got %q", resp.Data)
}
// Now send a message with a message ID and make sure we can match that.
m = nats.NewMsg("foo.bar")
m.Data = []byte("HELLO")
m.Header.Set(server.JSMsgId, "AAA")
if _, err = nc.RequestMsg(m, 100*time.Millisecond); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now try again with new message ID but require last one to be 'BBB'
m.Header.Set(server.JSMsgId, "ZZZ")
m.Header.Set(server.JSExpectedLastMsgId, "BBB")
resp, err = nc.RequestMsg(m, 100*time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("-ERR '")) {
t.Fatalf("Expected an error, got %q", resp.Data)
}
// Restart the server and make sure we remember/rebuild last seq and last msgId.
// Stop current server.
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()
nc = clientConnectToServer(t, s)
defer nc.Close()
// Our last sequence was 2 and last msgId was "AAA"
m = nats.NewMsg("foo.baz")
m.Data = []byte("HELLO AGAIN")
m.Header.Set(server.JSExpectedLastSeq, "2")
m.Header.Set(server.JSExpectedLastMsgId, "AAA")
m.Header.Set(server.JSMsgId, "BBB")
resp, err = nc.RequestMsg(m, 100*time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
}
}
func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
@@ -9860,7 +9955,7 @@ func TestJetStreamAccountImportBasics(t *testing.T) {
// Simple publish to a stream.
pubAck := sendStreamMsg(t, nc, "my.orders.foo", "ORDERS-1")
if pubAck.Stream != "ORDERS" || pubAck.Seq != 1 {
if pubAck.Stream != "ORDERS" || pubAck.Sequence != 1 {
t.Fatalf("Bad pubAck received: %+v", pubAck)
}
if msgs := mset.State().Msgs; msgs != 1 {