mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
When a stream or consumer was offline we would not properly respond to a delete.
We also would hang if no stream info requests were sent during a stream list due to the asset being offline. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1158,5 +1158,25 @@
|
||||
"help": "",
|
||||
"url": "",
|
||||
"deprecates": ""
|
||||
},
|
||||
{
|
||||
"constant": "JSStreamOfflineErr",
|
||||
"code": 500,
|
||||
"error_code": 10118,
|
||||
"description": "stream is offline",
|
||||
"comment": "",
|
||||
"help": "",
|
||||
"url": "",
|
||||
"deprecates": ""
|
||||
},
|
||||
{
|
||||
"constant": "JSConsumerOfflineErr",
|
||||
"code": 500,
|
||||
"error_code": 10119,
|
||||
"description": "consumer is offline",
|
||||
"comment": "",
|
||||
"help": "",
|
||||
"url": "",
|
||||
"deprecates": ""
|
||||
}
|
||||
]
|
||||
]
|
||||
@@ -1609,9 +1609,8 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
|
||||
|
||||
// Clustered mode will invoke a scatter and gather.
|
||||
if s.JetStreamIsClustered() {
|
||||
// Need to copy these off before sending..
|
||||
msg = copyBytes(msg)
|
||||
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
|
||||
// Need to copy the msg before sending..
|
||||
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, copyBytes(msg)) })
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1684,8 +1683,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
|
||||
js.mu.RLock()
|
||||
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
|
||||
var offline bool
|
||||
if sa != nil {
|
||||
clusterWideConsCount = len(sa.consumers)
|
||||
offline = s.allPeersOffline(sa.Group)
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
|
||||
@@ -1709,6 +1710,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
|
||||
}
|
||||
return
|
||||
} else if isLeader && offline {
|
||||
resp.Error = NewJSStreamOfflineError()
|
||||
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Check to see if we are a member of the group and if the group has no leader.
|
||||
@@ -3461,9 +3466,8 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
|
||||
|
||||
// Clustered mode will invoke a scatter and gather.
|
||||
if s.JetStreamIsClustered() {
|
||||
msg = copyBytes(msg)
|
||||
s.startGoRoutine(func() {
|
||||
s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
|
||||
s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, copyBytes(msg))
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -3530,6 +3534,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
js.mu.RLock()
|
||||
isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
|
||||
ourID := cc.meta.ID()
|
||||
var offline bool
|
||||
if ca != nil {
|
||||
offline = s.allPeersOffline(ca.Group)
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
|
||||
if isLeader && ca == nil {
|
||||
@@ -3557,6 +3565,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
|
||||
}
|
||||
return
|
||||
} else if isLeader && offline {
|
||||
resp.Error = NewJSConsumerOfflineError()
|
||||
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Check to see if we are a member of the group and if the group has no leader.
|
||||
|
||||
@@ -2617,36 +2617,37 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
|
||||
js.mu.RLock()
|
||||
s := js.srv
|
||||
hadLeader := sa.Group.node == nil || sa.Group.node.GroupLeader() != noLeader
|
||||
offline := s.allPeersOffline(sa.Group)
|
||||
var isMetaLeader bool
|
||||
if cc := js.cluster; cc != nil {
|
||||
isMetaLeader = cc.isLeader()
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
|
||||
acc, err := s.LookupAccount(sa.Client.serviceAccount())
|
||||
if err != nil {
|
||||
s.Debugf("JetStream cluster failed to lookup account %q: %v", sa.Client.serviceAccount(), err)
|
||||
return
|
||||
}
|
||||
|
||||
var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
|
||||
var err error
|
||||
var acc *Account
|
||||
|
||||
// Go ahead and delete the stream.
|
||||
mset, err := acc.lookupStream(sa.Config.Name)
|
||||
if err != nil {
|
||||
resp.Error = NewJSStreamNotFoundError(Unless(err))
|
||||
} else if mset != nil {
|
||||
err = mset.stop(true, wasLeader)
|
||||
// Go ahead and delete the stream if we have it and the account here.
|
||||
if acc, _ = s.LookupAccount(sa.Client.serviceAccount()); acc != nil {
|
||||
if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
|
||||
err = mset.stop(true, wasLeader)
|
||||
}
|
||||
}
|
||||
|
||||
// Always delete the node if present.
|
||||
if sa.Group.node != nil {
|
||||
sa.Group.node.Delete()
|
||||
}
|
||||
|
||||
if !isMember || !wasLeader && hadLeader {
|
||||
return
|
||||
if !(offline && isMetaLeader) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if resp.Error == nil {
|
||||
resp.Error = NewJSStreamGeneralError(err, Unless(err))
|
||||
}
|
||||
resp.Error = NewJSStreamGeneralError(err, Unless(err))
|
||||
s.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, s.jsonResponse(resp))
|
||||
} else {
|
||||
resp.Success = true
|
||||
@@ -2942,40 +2943,39 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
|
||||
}
|
||||
js.mu.RLock()
|
||||
s := js.srv
|
||||
offline := s.allPeersOffline(ca.Group)
|
||||
var isMetaLeader bool
|
||||
if cc := js.cluster; cc != nil {
|
||||
isMetaLeader = cc.isLeader()
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
|
||||
acc, err := s.LookupAccount(ca.Client.serviceAccount())
|
||||
if err != nil {
|
||||
s.Warnf("JetStream cluster failed to lookup account %q: %v", ca.Client.serviceAccount(), err)
|
||||
return
|
||||
}
|
||||
|
||||
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
|
||||
var err error
|
||||
var acc *Account
|
||||
|
||||
// Go ahead and delete the consumer.
|
||||
mset, err := acc.lookupStream(ca.Stream)
|
||||
if err != nil {
|
||||
resp.Error = NewJSStreamNotFoundError(Unless(err))
|
||||
} else if mset != nil {
|
||||
if o := mset.lookupConsumer(ca.Name); o != nil {
|
||||
err = o.stopWithFlags(true, false, true, wasLeader)
|
||||
} else {
|
||||
resp.Error = NewJSConsumerNotFoundError()
|
||||
// Go ahead and delete the consumer if we have it and the account.
|
||||
if acc, _ = s.LookupAccount(ca.Client.serviceAccount()); acc != nil {
|
||||
if mset, _ := acc.lookupStream(ca.Stream); mset != nil {
|
||||
if o := mset.lookupConsumer(ca.Name); o != nil {
|
||||
err = o.stopWithFlags(true, false, true, wasLeader)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Always delete the node if present.
|
||||
if ca.Group.node != nil {
|
||||
ca.Group.node.Delete()
|
||||
}
|
||||
|
||||
if !wasLeader || ca.Reply == _EMPTY_ {
|
||||
return
|
||||
if !(offline && isMetaLeader) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if resp.Error == nil {
|
||||
resp.Error = NewJSStreamNotFoundError(Unless(err))
|
||||
}
|
||||
resp.Error = NewJSStreamNotFoundError(Unless(err))
|
||||
s.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(resp))
|
||||
} else {
|
||||
resp.Success = true
|
||||
@@ -4233,7 +4233,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
|
||||
return
|
||||
}
|
||||
|
||||
js.mu.Lock()
|
||||
js.mu.RLock()
|
||||
|
||||
var streams []*streamAssignment
|
||||
for _, sa := range cc.streams[acc.Name] {
|
||||
@@ -4283,8 +4283,9 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
|
||||
Streams: make([]*StreamInfo, 0, len(streams)),
|
||||
}
|
||||
|
||||
js.mu.RUnlock()
|
||||
|
||||
if len(streams) == 0 {
|
||||
js.mu.Unlock()
|
||||
resp.Limit = JSApiListLimit
|
||||
resp.Offset = offset
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
|
||||
@@ -4322,7 +4323,9 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
|
||||
|
||||
var missingNames []string
|
||||
sent := map[string]int{}
|
||||
|
||||
// Send out our requests here.
|
||||
js.mu.RLock()
|
||||
for _, sa := range streams {
|
||||
if s.allPeersOffline(sa.Group) {
|
||||
// Place offline onto our results by hand here.
|
||||
@@ -4336,14 +4339,14 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
|
||||
}
|
||||
}
|
||||
// Don't hold lock.
|
||||
js.mu.Unlock()
|
||||
js.mu.RUnlock()
|
||||
|
||||
const timeout = 4 * time.Second
|
||||
notActive := time.NewTimer(timeout)
|
||||
defer notActive.Stop()
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
for len(sent) > 0 {
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
return
|
||||
@@ -4352,7 +4355,6 @@ LOOP:
|
||||
for sName := range sent {
|
||||
missingNames = append(missingNames, sName)
|
||||
}
|
||||
resp.Missing = missingNames
|
||||
break LOOP
|
||||
case si := <-rc:
|
||||
consCount := sent[si.Config.Name]
|
||||
@@ -4378,6 +4380,7 @@ LOOP:
|
||||
resp.Total = len(resp.Streams)
|
||||
resp.Limit = JSApiListLimit
|
||||
resp.Offset = offset
|
||||
resp.Missing = missingNames
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
|
||||
}
|
||||
|
||||
@@ -4391,7 +4394,7 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
|
||||
return
|
||||
}
|
||||
|
||||
js.mu.Lock()
|
||||
js.mu.RLock()
|
||||
|
||||
var consumers []*consumerAssignment
|
||||
if sas := cc.streams[acc.Name]; sas != nil {
|
||||
@@ -4426,8 +4429,9 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
|
||||
Consumers: []*ConsumerInfo{},
|
||||
}
|
||||
|
||||
js.mu.RUnlock()
|
||||
|
||||
if len(consumers) == 0 {
|
||||
js.mu.Unlock()
|
||||
resp.Limit = JSApiListLimit
|
||||
resp.Offset = offset
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
|
||||
@@ -4465,35 +4469,38 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
|
||||
|
||||
var missingNames []string
|
||||
sent := map[string]struct{}{}
|
||||
|
||||
// Send out our requests here.
|
||||
js.mu.RLock()
|
||||
for _, ca := range consumers {
|
||||
if s.allPeersOffline(ca.Group) {
|
||||
// Place offline onto our results by hand here.
|
||||
ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)}
|
||||
resp.Consumers = append(resp.Consumers, ci)
|
||||
missingNames = append(missingNames, ci.Name)
|
||||
missingNames = append(missingNames, ca.Name)
|
||||
} else {
|
||||
isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
sent[ca.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
js.mu.Unlock()
|
||||
// Don't hold lock.
|
||||
js.mu.RUnlock()
|
||||
|
||||
const timeout = 4 * time.Second
|
||||
notActive := time.NewTimer(timeout)
|
||||
defer notActive.Stop()
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
for len(sent) > 0 {
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
return
|
||||
case <-notActive.C:
|
||||
s.Warnf("Did not receive all consumer info results for %q", acc)
|
||||
s.Warnf("Did not receive all consumer info results for '%s > %s'", acc, stream)
|
||||
for cName := range sent {
|
||||
missingNames = append(missingNames, cName)
|
||||
}
|
||||
resp.Missing = missingNames
|
||||
break LOOP
|
||||
case ci := <-rc:
|
||||
delete(sent, ci.Name)
|
||||
@@ -4515,6 +4522,7 @@ LOOP:
|
||||
resp.Total = len(resp.Consumers)
|
||||
resp.Limit = JSApiListLimit
|
||||
resp.Offset = offset
|
||||
resp.Missing = missingNames
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
@@ -11108,6 +11109,103 @@ func TestJetStreamStreamAdvisories(t *testing.T) {
|
||||
t.Run("Clustered_R3", func(t *testing.T) { checkAdvisories(t, c.randomServer(), 3) })
|
||||
}
|
||||
|
||||
func TestJetStreamRemovedPeersAndStreamsListAndDelete(t *testing.T) {
|
||||
sc := createJetStreamSuperCluster(t, 3, 3)
|
||||
defer sc.shutdown()
|
||||
|
||||
pcn := "C2"
|
||||
sc.waitOnLeader()
|
||||
ml := sc.leader()
|
||||
if ml.ClusterName() == pcn {
|
||||
pcn = "C1"
|
||||
}
|
||||
|
||||
// Client based API
|
||||
nc, js := jsClientConnect(t, ml)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "GONE",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Cluster: pcn},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.AddConsumer("GONE", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Cluster: ml.ClusterName()},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Put messages in..
|
||||
num := 100
|
||||
for i := 0; i < num; i++ {
|
||||
js.PublishAsync("GONE", []byte("SLS"))
|
||||
js.PublishAsync("TEST", []byte("SLS"))
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
c := sc.clusterForName(pcn)
|
||||
c.shutdown()
|
||||
|
||||
// Grab Stream List..
|
||||
start := time.Now()
|
||||
resp, err := nc.Request(JSApiStreamList, nil, 2*time.Second)
|
||||
require_NoError(t, err)
|
||||
if delta := time.Since(start); delta > 100*time.Millisecond {
|
||||
t.Fatalf("Stream list call took too long to return: %v", delta)
|
||||
}
|
||||
var list JSApiStreamListResponse
|
||||
err = json.Unmarshal(resp.Data, &list)
|
||||
require_NoError(t, err)
|
||||
|
||||
if len(list.Missing) != 1 || list.Missing[0] != "GONE" {
|
||||
t.Fatalf("Wrong Missing: %+v", list)
|
||||
}
|
||||
|
||||
// Check behavior of stream info as well. We want it to return the stream is offline and not just timeout.
|
||||
_, err = js.StreamInfo("GONE")
|
||||
// FIXME(dlc) - Go client not putting nats: prefix on for stream but does for consumer.
|
||||
require_Error(t, err, NewJSStreamOfflineError())
|
||||
|
||||
// Same for Consumer
|
||||
start = time.Now()
|
||||
resp, err = nc.Request("$JS.API.CONSUMER.LIST.GONE", nil, 2*time.Second)
|
||||
require_NoError(t, err)
|
||||
if delta := time.Since(start); delta > 100*time.Millisecond {
|
||||
t.Fatalf("Consumer list call took too long to return: %v", delta)
|
||||
}
|
||||
var clist JSApiConsumerListResponse
|
||||
err = json.Unmarshal(resp.Data, &clist)
|
||||
require_NoError(t, err)
|
||||
|
||||
if len(clist.Missing) != 1 || clist.Missing[0] != "dlc" {
|
||||
t.Fatalf("Wrong Missing: %+v", clist)
|
||||
}
|
||||
|
||||
_, err = js.ConsumerInfo("GONE", "dlc")
|
||||
require_Error(t, err, NewJSConsumerOfflineError(), errors.New("nats: consumer is offline"))
|
||||
|
||||
// Make sure delete works.
|
||||
err = js.DeleteConsumer("GONE", "dlc")
|
||||
require_NoError(t, err)
|
||||
|
||||
err = js.DeleteStream("GONE")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Test it is really gone.
|
||||
_, err = js.StreamInfo("GONE")
|
||||
require_Error(t, err, nats.ErrStreamNotFound)
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
|
||||
@@ -128,6 +128,9 @@ const (
|
||||
// JSConsumerNotFoundErr consumer not found
|
||||
JSConsumerNotFoundErr ErrorIdentifier = 10014
|
||||
|
||||
// JSConsumerOfflineErr consumer is offline
|
||||
JSConsumerOfflineErr ErrorIdentifier = 10119
|
||||
|
||||
// JSConsumerOnMappedErr consumer direct on a mapped consumer
|
||||
JSConsumerOnMappedErr ErrorIdentifier = 10092
|
||||
|
||||
@@ -299,6 +302,9 @@ const (
|
||||
// JSStreamNotMatchErr expected stream does not match
|
||||
JSStreamNotMatchErr ErrorIdentifier = 10060
|
||||
|
||||
// JSStreamOfflineErr stream is offline
|
||||
JSStreamOfflineErr ErrorIdentifier = 10118
|
||||
|
||||
// JSStreamPurgeFailedF Generic stream purge failure error string ({err})
|
||||
JSStreamPurgeFailedF ErrorIdentifier = 10110
|
||||
|
||||
@@ -397,6 +403,7 @@ var (
|
||||
JSConsumerNameExistErr: {Code: 400, ErrCode: 10013, Description: "consumer name already in use"},
|
||||
JSConsumerNameTooLongErrF: {Code: 400, ErrCode: 10102, Description: "consumer name is too long, maximum allowed is {max}"},
|
||||
JSConsumerNotFoundErr: {Code: 404, ErrCode: 10014, Description: "consumer not found"},
|
||||
JSConsumerOfflineErr: {Code: 500, ErrCode: 10119, Description: "consumer is offline"},
|
||||
JSConsumerOnMappedErr: {Code: 400, ErrCode: 10092, Description: "consumer direct on a mapped consumer"},
|
||||
JSConsumerPullNotDurableErr: {Code: 400, ErrCode: 10085, Description: "consumer in pull mode requires a durable name"},
|
||||
JSConsumerPullRequiresAckErr: {Code: 400, ErrCode: 10084, Description: "consumer in pull mode requires ack policy"},
|
||||
@@ -454,6 +461,7 @@ var (
|
||||
JSStreamNameExistErr: {Code: 400, ErrCode: 10058, Description: "stream name already in use"},
|
||||
JSStreamNotFoundErr: {Code: 404, ErrCode: 10059, Description: "stream not found"},
|
||||
JSStreamNotMatchErr: {Code: 400, ErrCode: 10060, Description: "expected stream does not match"},
|
||||
JSStreamOfflineErr: {Code: 500, ErrCode: 10118, Description: "stream is offline"},
|
||||
JSStreamPurgeFailedF: {Code: 500, ErrCode: 10110, Description: "{err}"},
|
||||
JSStreamReplicasNotSupportedErr: {Code: 500, ErrCode: 10074, Description: "replicas > 1 not supported in non-clustered mode"},
|
||||
JSStreamReplicasNotUpdatableErr: {Code: 400, ErrCode: 10061, Description: "Replicas configuration can not be updated"},
|
||||
@@ -937,6 +945,16 @@ func NewJSConsumerNotFoundError(opts ...ErrorOption) *ApiError {
|
||||
return ApiErrors[JSConsumerNotFoundErr]
|
||||
}
|
||||
|
||||
// NewJSConsumerOfflineError creates a new JSConsumerOfflineErr error: "consumer is offline"
|
||||
func NewJSConsumerOfflineError(opts ...ErrorOption) *ApiError {
|
||||
eopts := parseOpts(opts)
|
||||
if ae, ok := eopts.err.(*ApiError); ok {
|
||||
return ae
|
||||
}
|
||||
|
||||
return ApiErrors[JSConsumerOfflineErr]
|
||||
}
|
||||
|
||||
// NewJSConsumerOnMappedError creates a new JSConsumerOnMappedErr error: "consumer direct on a mapped consumer"
|
||||
func NewJSConsumerOnMappedError(opts ...ErrorOption) *ApiError {
|
||||
eopts := parseOpts(opts)
|
||||
@@ -1603,6 +1621,16 @@ func NewJSStreamNotMatchError(opts ...ErrorOption) *ApiError {
|
||||
return ApiErrors[JSStreamNotMatchErr]
|
||||
}
|
||||
|
||||
// NewJSStreamOfflineError creates a new JSStreamOfflineErr error: "stream is offline"
|
||||
func NewJSStreamOfflineError(opts ...ErrorOption) *ApiError {
|
||||
eopts := parseOpts(opts)
|
||||
if ae, ok := eopts.err.(*ApiError); ok {
|
||||
return ae
|
||||
}
|
||||
|
||||
return ApiErrors[JSStreamOfflineErr]
|
||||
}
|
||||
|
||||
// NewJSStreamPurgeFailedError creates a new JSStreamPurgeFailedF error: "{err}"
|
||||
func NewJSStreamPurgeFailedError(err error, opts ...ErrorOption) *ApiError {
|
||||
eopts := parseOpts(opts)
|
||||
|
||||
Reference in New Issue
Block a user