Pull-based observables must be durable

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-07 06:55:40 -08:00
parent 1d4f6402ba
commit 7e8c74fdbd
2 changed files with 22 additions and 5 deletions

View File

@@ -177,6 +177,11 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
if config.AckPolicy != AckExplicit {
return nil, fmt.Errorf("observable in pull mode requires explicit ack policy")
}
// They are also required to be durable since otherwise we will not know when to
// clean them up.
if config.Durable == _EMPTY_ {
return nil, fmt.Errorf("observable in pull mode requires a durable name")
}
}
// Setup proper default for ack wait if we are in explicit ack mode.

View File

@@ -364,6 +364,15 @@ func TestJetStreamCreateObservable(t *testing.T) {
t.Fatalf("Expected an error on unsubscribed delivery subject")
}
// Pull-based observables are required to be durable since we do not know when they should
// be cleaned up.
if _, err := mset.AddObservable(&server.ObservableConfig{
DeliverAll: true,
AckPolicy: server.AckExplicit,
}); err == nil {
t.Fatalf("Expected an error on pull-based that is non-durable.")
}
nc := clientConnectToServer(t, s)
defer nc.Close()
sub, _ := nc.SubscribeSync(delivery)
@@ -965,7 +974,7 @@ func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) {
}
// We will create a non-partitioned observable. This should succeed.
o, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true, AckPolicy: server.AckExplicit})
o, err := mset.AddObservable(&server.ObservableConfig{Durable: "PBO", DeliverAll: true, AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -982,8 +991,11 @@ func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) {
}
// Now add in an observable that has a partition.
pindex := 1
pConfig := func(pname string) *server.ObservableConfig {
return &server.ObservableConfig{DeliverAll: true, Subject: pname, AckPolicy: server.AckExplicit}
dname := fmt.Sprintf("PPBO-%d", pindex)
pindex += 1
return &server.ObservableConfig{Durable: dname, DeliverAll: true, Subject: pname, AckPolicy: server.AckExplicit}
}
o, err = mset.AddObservable(pConfig("MY_WORK_QUEUE.A"))
if err != nil {
@@ -1053,7 +1065,7 @@ func TestJetStreamWorkQueueAckWaitRedelivery(t *testing.T) {
ackWait := 100 * time.Millisecond
o, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true, AckPolicy: server.AckExplicit, AckWait: ackWait})
o, err := mset.AddObservable(&server.ObservableConfig{Durable: "PBO", DeliverAll: true, AckPolicy: server.AckExplicit, AckWait: ackWait})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -1152,7 +1164,7 @@ func TestJetStreamWorkQueueNakRedelivery(t *testing.T) {
t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs)
}
o, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true, AckPolicy: server.AckExplicit})
o, err := mset.AddObservable(&server.ObservableConfig{Durable: "PBO", DeliverAll: true, AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -1229,7 +1241,7 @@ func TestJetStreamWorkQueueWorkingIndicator(t *testing.T) {
ackWait := 50 * time.Millisecond
o, err := mset.AddObservable(&server.ObservableConfig{DeliverAll: true, AckPolicy: server.AckExplicit, AckWait: ackWait})
o, err := mset.AddObservable(&server.ObservableConfig{Durable: "PBO", DeliverAll: true, AckPolicy: server.AckExplicit, AckWait: ackWait})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}