diff --git a/server/observable.go b/server/observable.go index cc2aa790..e423c4db 100644 --- a/server/observable.go +++ b/server/observable.go @@ -240,10 +240,12 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) // Check on msgset type conflicts. switch mset.config.Retention { case WorkQueuePolicy: - if config.Delivery != "" { + // Force explicit acks here. + if config.AckPolicy != AckExplicit { mset.mu.Unlock() - return nil, fmt.Errorf("delivery subject not allowed on workqueue message set") + return nil, fmt.Errorf("workqueue message set requires explicit ack") } + if len(mset.obs) > 0 { if config.FilterSubject == _EMPTY_ { mset.mu.Unlock() @@ -1418,7 +1420,6 @@ func (o *Observable) stop(dflag bool) error { mset.mu.Unlock() var err error - if store != nil { if dflag { err = store.Delete() @@ -1426,7 +1427,6 @@ func (o *Observable) stop(dflag bool) error { err = store.Stop() } } - return err } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index dbdfcec6..bb1fd48c 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1078,20 +1078,7 @@ func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) { } defer mset.Delete() - nc := clientConnectToServer(t, s) - defer nc.Close() - - sub, _ := nc.SubscribeSync(nats.NewInbox()) - defer sub.Unsubscribe() - nc.Flush() - // This type of message set has restrictions which we will test here. - - // Push based not allowed. - if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: sub.Subject}); err == nil { - t.Fatalf("Expected an error on delivery subject") - } - // DeliverAll is only start mode allowed. if _, err := mset.AddObservable(&server.ObservableConfig{DeliverLast: true}); err == nil { t.Fatalf("Expected an error with anything but DeliverAll") @@ -1149,7 +1136,44 @@ func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) { if err != nil { t.Fatalf("Expected no error, got %v", err) } - defer o3.Delete() + + o.Delete() + o2.Delete() + o3.Delete() + + // Push based will be allowed now, including ephemerals. + // They can not overlap etc meaning same rules as above apply. + o4, err := mset.AddObservable(&server.ObservableConfig{ + Durable: "DURABLE", + Delivery: "SOME.SUBJ", + DeliverAll: true, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected Error: %v", err) + } + defer o4.Delete() + + // Now try to create an ephemeral + nc := clientConnectToServer(t, s) + defer nc.Close() + + sub, _ := nc.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + nc.Flush() + + // This should fail at first due to conflict above. + ephCfg := &server.ObservableConfig{Delivery: sub.Subject, DeliverAll: true, AckPolicy: server.AckExplicit} + if _, err := mset.AddObservable(ephCfg); err == nil { + t.Fatalf("Expected an error ") + } + // Delete of o4 should clear. + o4.Delete() + o5, err := mset.AddObservable(ephCfg) + if err != nil { + t.Fatalf("Unexpected Error: %v", err) + } + defer o5.Delete() }) } }