More cleanup and stabilization for consumers and failing when sending messages.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-22 10:09:30 -08:00
parent e203b1b568
commit 227901a56b
4 changed files with 198 additions and 16 deletions

View File

@@ -2636,6 +2636,7 @@ func (mset *Stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.config.NoAck && len(reply) > 0
s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.config.Storage, mset.config.Replicas, mset.sendq
maxMsgSize := int(mset.config.MaxMsgSize)
mset.mu.RUnlock()
// Check here pre-emptively if we have exceeded our account limits.
@@ -2667,16 +2668,30 @@ func (mset *Stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
return err
}
// Check msgSize if we have a limit set there. Again this works if it goes through but better to be pre-emptive.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
err := fmt.Errorf("JetStream message size exceeds limits for '%s > %s'", jsa.acc().Name, mset.config.Name)
s.Warnf(err.Error())
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.Name()}}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
response, _ = json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
return err
}
// Proceed with proposing this message.
mset.mu.Lock()
// We only use mset.nlseq for clustering and in case we run ahead of actual commits.
// We only use mset.clseq for clustering and in case we run ahead of actual commits.
// Check if we need to set initial value here
if mset.nlseq < mset.lseq {
mset.nlseq = mset.lseq
if mset.clseq < mset.lseq {
mset.clseq = mset.lseq
}
err := mset.node.Propose(encodeStreamMsg(subject, reply, hdr, msg, mset.nlseq, time.Now().UnixNano()))
// Do proposal.
err := mset.node.Propose(encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano()))
if err != nil {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.config.Name}}
@@ -2684,7 +2699,7 @@ func (mset *Stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
response, _ = json.Marshal(resp)
}
} else {
mset.nlseq++
mset.clseq++
}
mset.mu.Unlock()

View File

@@ -129,7 +129,8 @@ type Stream struct {
catchup bool
syncSub *subscription
infoSub *subscription
nlseq uint64
clseq uint64
clfs uint64
}
// Headers for published messages.
@@ -1139,8 +1140,15 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// For clustering the lower layers will pass our expected lseq. If it is present check for that here.
// This is from the clustering layers so will not respond here.
if lseq > 0 && lseq != mset.lseq {
if lseq > 0 && lseq != (mset.lseq+mset.clfs) {
sendq := mset.sendq
mset.mu.Unlock()
if canRespond && sendq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 503, Description: "expected stream sequence does not match"}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
}
return errLastSeqMismatch
}
@@ -1150,6 +1158,7 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
msgId = getMsgId(hdr)
sendq := mset.sendq
if dde := mset.checkMsgId(msgId); dde != nil {
mset.clfs++
mset.mu.Unlock()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
@@ -1161,6 +1170,7 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected stream.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
@@ -1173,6 +1183,7 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected last sequence.
if seq := getExpectedLastSeq(hdr); seq > 0 && seq != mset.lseq {
mlseq := mset.lseq
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
@@ -1185,6 +1196,7 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected last msgId.
if lmsgId := getExpectedLastMsgId(hdr); lmsgId != _EMPTY_ && lmsgId != mset.lmsgId {
last := mset.lmsgId
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
@@ -1206,6 +1218,7 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
mset.mu.Unlock()
mset.clfs++
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
@@ -1276,6 +1289,7 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// If we did not succeed put those values back.
if err != nil {
// FIXME(dlc) - This most likely is asymmetric under clustered scenarios.
mset.mu.Lock()
mset.lseq = olseq
mset.lmsgId = olmsgId
@@ -1316,21 +1330,21 @@ func (mset *Stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
// FIXME(dlc) - Check leader status?
if err == nil && seq > 0 && numConsumers > 0 {
var _obs [4]*Consumer
obs := _obs[:0]
mset.mu.Lock()
for _, o := range mset.consumers {
obs = append(obs, o)
if o.isLeader() {
obs = append(obs, o)
}
}
mset.mu.Unlock()
for _, o := range obs {
o.incStreamPending(seq, subject)
if !o.deliverCurrentMsg(subject, hdr, msg, seq, ts) && o.isLeader() {
if !o.deliverCurrentMsg(subject, hdr, msg, seq, ts) {
o.signalNewMessages()
}
}

View File

@@ -20,6 +20,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"reflect"
"strings"
"sync"
"testing"
@@ -707,6 +708,104 @@ func TestJetStreamClusterStreamSynchedTimeStamps(t *testing.T) {
}
}
func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomServer()
// Client based API
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err = js.Publish("foo", []byte("TSS")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if m, err := sub.NextMsg(time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else {
m.Ack()
}
// Send 10 messages.
for i := 1; i <= 10; i++ {
payload := []byte(fmt.Sprintf("MSG-%d", i))
if _, err = js.Publish("foo", payload); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
checkSubsPending(t, sub, 10)
// Sanity check for duplicate deliveries..
if nmsgs, _, _ := sub.Pending(); nmsgs > 10 {
t.Fatalf("Expected only %d responses, got %d more", 10, nmsgs)
}
for i := 1; i <= 10; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
payload := []byte(fmt.Sprintf("MSG-%d", i))
if !bytes.Equal(m.Data, payload) {
t.Fatalf("Did not get expected msg, expected %q, got %q", payload, m.Data)
}
}
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error getting consumer info: %v", err)
}
c.consumerLeader("$G", "foo", "dlc").Shutdown()
c.waitOnNewConsumerLeader("$G", "foo", "dlc")
ci2, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error getting consumer info: %v", err)
}
// For slight skew in creation time.
ci.Created = ci.Created.Round(time.Second)
ci2.Created = ci2.Created.Round(time.Second)
if !reflect.DeepEqual(ci, ci2) {
t.Fatalf("Consumer info did not match: %+v vs %+v", ci, ci2)
}
// Now send more..
// Send 10 more messages.
for i := 10; i <= 20; i++ {
payload := []byte(fmt.Sprintf("MSG-%d", i))
if _, err = js.Publish("foo", payload); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
checkSubsPending(t, sub, 10)
// Sanity check for duplicate deliveries..
if nmsgs, _, _ := sub.Pending(); nmsgs > 10 {
t.Fatalf("Expected only %d responses, got %d more", 10, nmsgs)
}
for i := 10; i <= 20; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
payload := []byte(fmt.Sprintf("MSG-%d", i))
if !bytes.Equal(m.Data, payload) {
t.Fatalf("Did not get expected msg, expected %q, got %q", payload, m.Data)
}
}
}
func TestJetStreamClusterStreamOverlapSubjects(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R32", 2)
defer c.shutdown()
@@ -1508,7 +1607,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
}
func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 5)
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
// Adjust our limits.
@@ -1587,6 +1686,64 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
if _, err := js.Publish("baz", []byte("JSC-NOT-OK")); err == nil {
t.Fatalf("Expected publish error but got none")
}
// Check consumers
_, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// This should fail.
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc22", AckPolicy: nats.AckExplicitPolicy})
if err == nil {
t.Fatalf("Expected error but got none")
}
}
func TestJetStreamClusterStreamLimits(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Check that large R will fail.
if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 5}); err == nil {
t.Fatalf("Expected error but got none")
}
maxMsgs := 5
_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Replicas: 3,
Retention: nats.LimitsPolicy,
Discard: server.DiscardNew,
MaxMsgSize: 11,
MaxMsgs: int64(maxMsgs),
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Large message should fail.
if _, err := js.Publish("foo", []byte("0123456789ZZZ")); err == nil {
t.Fatalf("Expected publish to fail")
}
for i := 0; i < maxMsgs; i++ {
if _, err := js.Publish("foo", []byte("JSC-OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// These should fail.
if _, err := js.Publish("foo", []byte("JSC-OK")); err == nil {
t.Fatalf("Expected publish to fail")
}
}
func TestJetStreamClusterStreamPerf(t *testing.T) {

View File

@@ -3461,10 +3461,6 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {
defer os.RemoveAll(config.StoreDir)
}
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "MYS-PULL"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.MemoryStorage})
if err != nil {