mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Added support for stream and consumer lists.
This utilizes a scatter and gather approach. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -220,6 +220,10 @@ func NewAccount(name string) *Account {
|
||||
return a
|
||||
}
|
||||
|
||||
func (a *Account) String() string {
|
||||
return a.Name
|
||||
}
|
||||
|
||||
// Used to create shallow copies of accounts for transfer
|
||||
// from opts to real accounts in server struct.
|
||||
func (a *Account) shallowCopy() *Account {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2020 The NATS Authors
|
||||
// Copyright 2012-2021 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.2.0-beta.42"
|
||||
VERSION = "2.2.0-beta.44"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -171,6 +171,7 @@ type Consumer struct {
|
||||
mset *Stream
|
||||
acc *Account
|
||||
client *client
|
||||
sysc *client
|
||||
sid int
|
||||
name string
|
||||
stream string
|
||||
@@ -211,7 +212,8 @@ type Consumer struct {
|
||||
closed bool
|
||||
|
||||
// Clustered.
|
||||
node RaftNode
|
||||
node RaftNode
|
||||
infoSub *subscription
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -433,6 +435,7 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN
|
||||
mset: mset,
|
||||
acc: a,
|
||||
client: s.createInternalJetStreamClient(),
|
||||
sysc: s.createInternalJetStreamClient(),
|
||||
config: *config,
|
||||
dsubj: config.DeliverSubject,
|
||||
active: true,
|
||||
@@ -444,8 +447,10 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN
|
||||
created: time.Now().UTC(),
|
||||
}
|
||||
|
||||
// Bind internal client
|
||||
// Bind internal client to the user account.
|
||||
o.client.registerWithAccount(a)
|
||||
// Bind to the system account.
|
||||
o.sysc.registerWithAccount(s.SystemAccount())
|
||||
|
||||
if isDurableConsumer(config) {
|
||||
if len(config.Durable) > JSMaxNameLen {
|
||||
@@ -603,6 +608,16 @@ func (o *Consumer) setLeader(isLeader bool) {
|
||||
// Restore our saved state. During non-leader status we just update our underlying store.
|
||||
o.readStoredState()
|
||||
|
||||
// Do info sub.
|
||||
if o.infoSub == nil && mset != nil && mset.jsa != nil {
|
||||
mset.mu.RLock()
|
||||
s, jsa, stream := mset.srv, mset.jsa, mset.config.Name
|
||||
mset.mu.RUnlock()
|
||||
isubj := fmt.Sprintf("$JSC.CI.%s.%s.%s", jsa.acc(), stream, o.name)
|
||||
// Note below the way we subscribe here is so that we can send requests to ourselves.
|
||||
o.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, o.sysc, o.handleClusterConsumerInfoRequest)
|
||||
}
|
||||
|
||||
var err error
|
||||
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.processAck); err != nil {
|
||||
o.mu.Unlock()
|
||||
@@ -651,8 +666,10 @@ func (o *Consumer) setLeader(isLeader bool) {
|
||||
o.mu.Lock()
|
||||
o.unsubscribe(o.ackSub)
|
||||
o.unsubscribe(o.reqSub)
|
||||
o.unsubscribe(o.infoSub)
|
||||
o.ackSub = nil
|
||||
o.reqSub = nil
|
||||
o.infoSub = nil
|
||||
o.sendq = nil
|
||||
close(o.qch)
|
||||
o.qch = nil
|
||||
@@ -660,6 +677,21 @@ func (o *Consumer) setLeader(isLeader bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
var s *Server
|
||||
o.mu.RLock()
|
||||
if o.client != nil {
|
||||
s = o.client.srv
|
||||
}
|
||||
o.mu.RUnlock()
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
ci := o.Info()
|
||||
b, _ := json.Marshal(ci)
|
||||
s.sendInternalMsgLocked(reply, _EMPTY_, nil, b)
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (o *Consumer) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
|
||||
c := o.client
|
||||
@@ -2273,10 +2305,14 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
o.active = false
|
||||
o.unsubscribe(o.ackSub)
|
||||
o.unsubscribe(o.reqSub)
|
||||
o.unsubscribe(o.infoSub)
|
||||
o.ackSub = nil
|
||||
o.reqSub = nil
|
||||
o.infoSub = nil
|
||||
c := o.client
|
||||
o.client = nil
|
||||
sysc := o.sysc
|
||||
o.sysc = nil
|
||||
stopAndClearTimer(&o.ptmr)
|
||||
stopAndClearTimer(&o.dtmr)
|
||||
delivery := o.config.DeliverSubject
|
||||
@@ -2291,6 +2327,9 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
if c != nil {
|
||||
c.closeConnection(ClientClosed)
|
||||
}
|
||||
if sysc != nil {
|
||||
sysc.closeConnection(ClientClosed)
|
||||
}
|
||||
|
||||
if delivery != "" {
|
||||
a.sl.ClearNotification(delivery, o.inch)
|
||||
|
||||
@@ -994,9 +994,10 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
|
||||
// Request for the list of all detailed stream info.
|
||||
// TODO(dlc) - combine with above long term
|
||||
func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
if c == nil {
|
||||
if c == nil || !s.JetStreamIsLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
|
||||
if err != nil {
|
||||
s.Warnf(badAPIRequestT, msg)
|
||||
@@ -1025,6 +1026,12 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
|
||||
offset = req.Offset
|
||||
}
|
||||
|
||||
// Clustered mode will invoke a scatter and gather.
|
||||
if s.JetStreamIsClustered() {
|
||||
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) })
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
|
||||
// TODO(dlc) - If this list is long maybe do this in a Go routine?
|
||||
msets := acc.Streams()
|
||||
@@ -1143,8 +1150,9 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
|
||||
}
|
||||
stream := streamNameFromSubject(subject)
|
||||
|
||||
// Clustered.
|
||||
if s.JetStreamIsClustered() {
|
||||
s.jsClusteredStreamDeleteRequest(ci, stream, subject, reply, rmsg)
|
||||
s.jsClusteredStreamDeleteRequest(ci, stream, reply, msg)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1814,9 +1822,10 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
|
||||
|
||||
// Request for the list of all detailed consumer information.
|
||||
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
if c == nil {
|
||||
if c == nil || !s.JetStreamIsLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
|
||||
if err != nil {
|
||||
s.Warnf(badAPIRequestT, msg)
|
||||
@@ -1846,6 +1855,13 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
|
||||
}
|
||||
|
||||
streamName := streamNameFromSubject(subject)
|
||||
|
||||
// Clustered mode will invoke a scatter and gather.
|
||||
if s.JetStreamIsClustered() {
|
||||
s.startGoRoutine(func() { s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg) })
|
||||
return
|
||||
}
|
||||
|
||||
mset, err := acc.LookupStream(streamName)
|
||||
if err != nil {
|
||||
resp.Error = jsNotFoundError(err)
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -1147,7 +1148,10 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
|
||||
accStreams[stream] = sa
|
||||
cc.streams[acc.Name] = accStreams
|
||||
|
||||
isMember := sa.Group.isMember(cc.meta.ID())
|
||||
var isMember bool
|
||||
if sa.Group != nil && cc.meta != nil {
|
||||
isMember = sa.Group.isMember(cc.meta.ID())
|
||||
}
|
||||
js.mu.Unlock()
|
||||
|
||||
// Check if this is for us..
|
||||
@@ -1895,7 +1899,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
|
||||
cc.meta.Propose(encodeAddStreamAssignment(sa))
|
||||
}
|
||||
|
||||
func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, subject, reply string, rmsg []byte) {
|
||||
func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, reply string, rmsg []byte) {
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil || cc == nil {
|
||||
return
|
||||
@@ -1932,6 +1936,207 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject,
|
||||
n.Propose(encodeStreamPurge(sp))
|
||||
}
|
||||
|
||||
// This will do a scatter and gather operation for all streams for this account.
|
||||
// This will be running in a separate Go routine.
|
||||
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) {
|
||||
defer s.grWG.Done()
|
||||
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil || cc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
|
||||
var streams []*streamAssignment
|
||||
for _, sa := range cc.streams[acc.Name] {
|
||||
streams = append(streams, sa)
|
||||
}
|
||||
// Needs to be sorted.
|
||||
if len(streams) > 1 {
|
||||
sort.Slice(streams, func(i, j int) bool {
|
||||
return strings.Compare(streams[i].Config.Name, streams[j].Config.Name) < 0
|
||||
})
|
||||
}
|
||||
|
||||
scnt := len(streams)
|
||||
if offset > scnt {
|
||||
offset = scnt
|
||||
}
|
||||
if offset > 0 {
|
||||
streams = streams[offset:]
|
||||
}
|
||||
if len(streams) > JSApiListLimit {
|
||||
streams = streams[:JSApiListLimit]
|
||||
}
|
||||
|
||||
rc := make(chan *StreamInfo, len(streams))
|
||||
|
||||
// Create an inbox for our responses and send out requests.
|
||||
inbox := infoReplySubject()
|
||||
rsub, _ := s.systemSubscribe(inbox, _EMPTY_, false, cc.c, func(_ *subscription, _ *client, _, reply string, msg []byte) {
|
||||
var si StreamInfo
|
||||
if err := json.Unmarshal(msg, &si); err != nil {
|
||||
s.Warnf("Error unmarshaling clustered stream info response:%v", err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case rc <- &si:
|
||||
default:
|
||||
s.Warnf("Failed placing stream info result on internal chan")
|
||||
}
|
||||
})
|
||||
defer s.sysUnsubscribe(rsub)
|
||||
|
||||
// Send out our requests here.
|
||||
for _, sa := range streams {
|
||||
isubj := fmt.Sprintf("$JSC.SI.%s.%s", sa.Client.Account, sa.Config.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
}
|
||||
|
||||
const timeout = 2 * time.Second
|
||||
notActive := time.NewTimer(timeout)
|
||||
defer notActive.Stop()
|
||||
|
||||
var resp = JSApiStreamListResponse{
|
||||
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
|
||||
Streams: make([]*StreamInfo, 0, len(streams)),
|
||||
}
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
return
|
||||
case <-notActive.C:
|
||||
s.Warnf("Did not receive all stream info results for %q", acc)
|
||||
break LOOP
|
||||
case si := <-rc:
|
||||
resp.Streams = append(resp.Streams, si)
|
||||
// Check to see if we are done.
|
||||
if len(resp.Streams) == len(streams) {
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Needs to be sorted as well.
|
||||
if len(resp.Streams) > 1 {
|
||||
sort.Slice(resp.Streams, func(i, j int) bool {
|
||||
return strings.Compare(resp.Streams[i].Config.Name, resp.Streams[j].Config.Name) < 0
|
||||
})
|
||||
}
|
||||
|
||||
resp.Total = len(resp.Streams)
|
||||
resp.Limit = JSApiListLimit
|
||||
resp.Offset = offset
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
|
||||
}
|
||||
|
||||
// This will do a scatter and gather operation for all consumers for this stream and account.
|
||||
// This will be running in a separate Go routine.
|
||||
func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, offset int, stream, subject, reply string, rmsg []byte) {
|
||||
defer s.grWG.Done()
|
||||
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil || cc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
|
||||
var consumers []*consumerAssignment
|
||||
if sas := cc.streams[acc.Name]; sas != nil {
|
||||
if sa := sas[stream]; sa != nil {
|
||||
// Copy over since we need to sort etc.
|
||||
for _, ca := range sa.consumers {
|
||||
consumers = append(consumers, ca)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Needs to be sorted.
|
||||
if len(consumers) > 1 {
|
||||
sort.Slice(consumers, func(i, j int) bool {
|
||||
return strings.Compare(consumers[i].Name, consumers[j].Name) < 0
|
||||
})
|
||||
}
|
||||
|
||||
ocnt := len(consumers)
|
||||
if offset > ocnt {
|
||||
offset = ocnt
|
||||
}
|
||||
if offset > 0 {
|
||||
consumers = consumers[offset:]
|
||||
}
|
||||
if len(consumers) > JSApiListLimit {
|
||||
consumers = consumers[:JSApiListLimit]
|
||||
}
|
||||
|
||||
rc := make(chan *ConsumerInfo, len(consumers))
|
||||
|
||||
// Create an inbox for our responses and send out requests.
|
||||
inbox := infoReplySubject()
|
||||
rsub, _ := s.systemSubscribe(inbox, _EMPTY_, false, cc.c, func(_ *subscription, _ *client, _, reply string, msg []byte) {
|
||||
var ci ConsumerInfo
|
||||
if err := json.Unmarshal(msg, &ci); err != nil {
|
||||
s.Warnf("Error unmarshaling clustered consumer info response:%v", err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case rc <- &ci:
|
||||
default:
|
||||
s.Warnf("Failed placing consumer info result on internal chan")
|
||||
}
|
||||
})
|
||||
defer s.sysUnsubscribe(rsub)
|
||||
|
||||
// Send out our requests here.
|
||||
for _, ca := range consumers {
|
||||
isubj := fmt.Sprintf("$JSC.CI.%s.%s.%s", ca.Client.Account, stream, ca.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
}
|
||||
|
||||
const timeout = 2 * time.Second
|
||||
notActive := time.NewTimer(timeout)
|
||||
defer notActive.Stop()
|
||||
|
||||
var resp = JSApiConsumerListResponse{
|
||||
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
|
||||
Consumers: []*ConsumerInfo{},
|
||||
}
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
return
|
||||
case <-notActive.C:
|
||||
s.Warnf("Did not receive all stream info results for %q", acc)
|
||||
break LOOP
|
||||
case ci := <-rc:
|
||||
resp.Consumers = append(resp.Consumers, ci)
|
||||
// Check to see if we are done.
|
||||
if len(resp.Consumers) == len(consumers) {
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Needs to be sorted as well.
|
||||
if len(resp.Consumers) > 1 {
|
||||
sort.Slice(resp.Consumers, func(i, j int) bool {
|
||||
return strings.Compare(resp.Consumers[i].Name, resp.Consumers[j].Name) < 0
|
||||
})
|
||||
}
|
||||
|
||||
resp.Total = len(resp.Consumers)
|
||||
resp.Limit = JSApiListLimit
|
||||
resp.Offset = offset
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
|
||||
}
|
||||
|
||||
func encodeStreamPurge(sp *streamPurge) []byte {
|
||||
var bb bytes.Buffer
|
||||
bb.WriteByte(byte(purgeStreamOp))
|
||||
@@ -2447,6 +2652,21 @@ func (mset *Stream) handleClusterSyncRequest(sub *subscription, c *client, subje
|
||||
mset.srv.startGoRoutine(func() { mset.runCatchup(reply, &sreq) })
|
||||
}
|
||||
|
||||
func (mset *Stream) handleClusterStreamInfoRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
mset.mu.RLock()
|
||||
if mset.client == nil {
|
||||
mset.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
s := mset.srv
|
||||
config := mset.config
|
||||
mset.mu.RUnlock()
|
||||
|
||||
si := &StreamInfo{Created: mset.Created(), State: mset.State(), Config: config}
|
||||
b, _ := json.Marshal(si)
|
||||
s.sendInternalMsgLocked(reply, _EMPTY_, nil, b)
|
||||
}
|
||||
|
||||
func (mset *Stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
s := mset.srv
|
||||
defer s.grWG.Done()
|
||||
@@ -2539,15 +2759,19 @@ func (mset *Stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
}
|
||||
|
||||
func syncSubjForStream() string {
|
||||
return syncSubject("$SYS.JSC.SYNC")
|
||||
return syncSubject("$JSC.SYNC")
|
||||
}
|
||||
|
||||
func syncReplySubject() string {
|
||||
return syncSubject("$SYS.JSC.R")
|
||||
return syncSubject("$JSC.R")
|
||||
}
|
||||
|
||||
func infoReplySubject() string {
|
||||
return syncSubject("$JSC.R")
|
||||
}
|
||||
|
||||
func syncAckSubject() string {
|
||||
return syncSubject("$SYS.JSC.ACK") + ".*"
|
||||
return syncSubject("$JSC.ACK") + ".*"
|
||||
}
|
||||
|
||||
func syncSubject(pre string) string {
|
||||
|
||||
@@ -89,6 +89,7 @@ type Stream struct {
|
||||
jsa *jsAccount
|
||||
srv *Server
|
||||
client *client
|
||||
sysc *client
|
||||
sid int
|
||||
pubAck []byte
|
||||
sendq chan *jsPubMsg
|
||||
@@ -110,6 +111,7 @@ type Stream struct {
|
||||
node RaftNode
|
||||
catchup bool
|
||||
syncSub *subscription
|
||||
infoSub *subscription
|
||||
nlseq uint64
|
||||
}
|
||||
|
||||
@@ -192,16 +194,19 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa
|
||||
return nil, fmt.Errorf("subjects overlap with an existing stream")
|
||||
}
|
||||
|
||||
// Setup the internal client.
|
||||
// Setup the internal clients.
|
||||
c := s.createInternalJetStreamClient()
|
||||
mset := &Stream{jsa: jsa, config: cfg, srv: s, client: c, consumers: make(map[string]*Consumer), qch: make(chan struct{})}
|
||||
ic := s.createInternalJetStreamClient()
|
||||
mset := &Stream{jsa: jsa, config: cfg, srv: s, client: c, sysc: ic, consumers: make(map[string]*Consumer), qch: make(chan struct{})}
|
||||
|
||||
jsa.streams[cfg.Name] = mset
|
||||
storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
|
||||
jsa.mu.Unlock()
|
||||
|
||||
// Bind to the account.
|
||||
// Bind to the user account.
|
||||
c.registerWithAccount(a)
|
||||
// Bind to the system account.
|
||||
ic.registerWithAccount(s.SystemAccount())
|
||||
|
||||
// Create the appropriate storage
|
||||
fsCfg := fsConfig
|
||||
@@ -285,10 +290,10 @@ func (mset *Stream) setLeader(isLeader bool) error {
|
||||
}
|
||||
// Make sure we are listening for sync requests.
|
||||
// TODO(dlc) - Original design was that all in sync members of the group would do DQ.
|
||||
mset.startSyncSub()
|
||||
mset.startClusterSubs()
|
||||
} else {
|
||||
// Stop responding to sync requests.
|
||||
mset.stopSyncSub()
|
||||
mset.stopClusterSubs()
|
||||
// Unsubscribe from direct stream.
|
||||
mset.unsubscribeToStream()
|
||||
}
|
||||
@@ -297,14 +302,25 @@ func (mset *Stream) setLeader(isLeader bool) error {
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *Stream) startSyncSub() {
|
||||
func (mset *Stream) startClusterSubs() {
|
||||
if mset.infoSub == nil {
|
||||
if jsa := mset.jsa; jsa != nil {
|
||||
isubj := fmt.Sprintf("$JSC.SI.%s.%s", jsa.acc(), mset.config.Name)
|
||||
// Note below the way we subscribe here is so that we can send requests to ourselves.
|
||||
mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest)
|
||||
}
|
||||
}
|
||||
if mset.isClustered() && mset.syncSub == nil {
|
||||
mset.syncSub, _ = mset.srv.sysSubscribe(mset.sa.Sync, mset.handleClusterSyncRequest)
|
||||
}
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *Stream) stopSyncSub() {
|
||||
func (mset *Stream) stopClusterSubs() {
|
||||
if mset.infoSub != nil {
|
||||
mset.srv.sysUnsubscribe(mset.infoSub)
|
||||
mset.infoSub = nil
|
||||
}
|
||||
if mset.syncSub != nil {
|
||||
mset.srv.sysUnsubscribe(mset.syncSub)
|
||||
mset.syncSub = nil
|
||||
@@ -319,10 +335,7 @@ func (mset *Stream) account() *Account {
|
||||
if jsa == nil {
|
||||
return nil
|
||||
}
|
||||
jsa.mu.RLock()
|
||||
acc := jsa.account
|
||||
jsa.mu.RUnlock()
|
||||
return acc
|
||||
return jsa.acc()
|
||||
}
|
||||
|
||||
// Helper to determine the max msg size for this stream if file based.
|
||||
@@ -1440,7 +1453,7 @@ func (mset *Stream) stop(delete bool) error {
|
||||
} else {
|
||||
n.Stop()
|
||||
}
|
||||
mset.stopSyncSub()
|
||||
mset.stopClusterSubs()
|
||||
}
|
||||
|
||||
// Send stream delete advisory after the consumers.
|
||||
@@ -1467,11 +1480,18 @@ func (mset *Stream) stop(delete bool) error {
|
||||
mset.ddmap = nil
|
||||
}
|
||||
|
||||
sysc := mset.sysc
|
||||
mset.sysc = nil
|
||||
|
||||
// Clustered cleanup.
|
||||
mset.mu.Unlock()
|
||||
|
||||
c.closeConnection(ClientClosed)
|
||||
|
||||
if sysc != nil {
|
||||
sysc.closeConnection(ClientClosed)
|
||||
}
|
||||
|
||||
if mset.store == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -737,15 +737,168 @@ func TestJetStreamClusterStreamOverlapSubjects(t *testing.T) {
|
||||
}
|
||||
|
||||
// Now do detailed version.
|
||||
resp, _ = nc.Request(server.JSApiStreamList, nil, time.Second)
|
||||
resp, err = nc.Request(server.JSApiStreamList, nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
var listResponse server.JSApiStreamListResponse
|
||||
if err = json.Unmarshal(resp.Data, &listResponse); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamInfoList(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
createStream := func(name string) {
|
||||
t.Helper()
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: name}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
createStream("foo")
|
||||
createStream("bar")
|
||||
createStream("baz")
|
||||
|
||||
sendBatch := func(subject string, n int) {
|
||||
t.Helper()
|
||||
// Send a batch to a given subject.
|
||||
for i := 0; i < n; i++ {
|
||||
if _, err := js.Publish(subject, []byte("OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sendBatch("foo", 10)
|
||||
sendBatch("bar", 22)
|
||||
sendBatch("baz", 33)
|
||||
|
||||
// Now get the stream list info.
|
||||
sl := js.NewStreamLister()
|
||||
if !sl.Next() {
|
||||
t.Fatalf("Unexpected error: %v", sl.Err())
|
||||
}
|
||||
p := sl.Page()
|
||||
if len(p) != 3 {
|
||||
t.Fatalf("StreamInfo expected 3 results, got %d", len(p))
|
||||
}
|
||||
for _, si := range p {
|
||||
switch si.Config.Name {
|
||||
case "foo":
|
||||
if si.State.Msgs != 10 {
|
||||
t.Fatalf("Expected %d msgs but got %d", 10, si.State.Msgs)
|
||||
}
|
||||
case "bar":
|
||||
if si.State.Msgs != 22 {
|
||||
t.Fatalf("Expected %d msgs but got %d", 22, si.State.Msgs)
|
||||
}
|
||||
case "baz":
|
||||
if si.State.Msgs != 33 {
|
||||
t.Fatalf("Expected %d msgs but got %d", 33, si.State.Msgs)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterConsumerInfoList(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Place messages so we can generate consumer state.
|
||||
for i := 0; i < 10; i++ {
|
||||
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
createConsumer := func(name string) *nats.Subscription {
|
||||
t.Helper()
|
||||
sub, err := js.SubscribeSync("TEST", nats.Durable(name), nats.Pull(2))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
checkSubsPending(t, sub, 2)
|
||||
return sub
|
||||
}
|
||||
|
||||
subFoo := createConsumer("foo")
|
||||
subBar := createConsumer("bar")
|
||||
subBaz := createConsumer("baz")
|
||||
|
||||
// Place consumers in various states.
|
||||
for _, ss := range []struct {
|
||||
sub *nats.Subscription
|
||||
fetch int
|
||||
ack int
|
||||
}{
|
||||
{subFoo, 4, 2},
|
||||
{subBar, 2, 0},
|
||||
{subBaz, 8, 6},
|
||||
} {
|
||||
for i := 0; i < ss.fetch; i++ {
|
||||
if m, err := ss.sub.NextMsg(time.Second); err != nil {
|
||||
t.Fatalf("Unexpected error getting message %d: %v", i, err)
|
||||
} else if i < ss.ack {
|
||||
m.Ack()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now get the consumer list info.
|
||||
cl := js.NewConsumerLister("TEST")
|
||||
if !cl.Next() {
|
||||
t.Fatalf("Unexpected error: %v", cl.Err())
|
||||
}
|
||||
p := cl.Page()
|
||||
if len(p) != 3 {
|
||||
t.Fatalf("ConsumerInfo expected 3 results, got %d", len(p))
|
||||
}
|
||||
for _, ci := range p {
|
||||
switch ci.Name {
|
||||
case "foo":
|
||||
if ci.Delivered.Consumer != 4 {
|
||||
t.Fatalf("Expected %d delivered but got %d", 4, ci.Delivered.Consumer)
|
||||
}
|
||||
if ci.AckFloor.Consumer != 2 {
|
||||
t.Fatalf("Expected %d for ack floor but got %d", 2, ci.AckFloor.Consumer)
|
||||
}
|
||||
case "bar":
|
||||
if ci.Delivered.Consumer != 2 {
|
||||
t.Fatalf("Expected %d delivered but got %d", 2, ci.Delivered.Consumer)
|
||||
}
|
||||
if ci.AckFloor.Consumer != 0 {
|
||||
t.Fatalf("Expected %d for ack floor but got %d", 0, ci.AckFloor.Consumer)
|
||||
}
|
||||
case "baz":
|
||||
if ci.Delivered.Consumer != 8 {
|
||||
t.Fatalf("Expected %d delivered but got %d", 8, ci.Delivered.Consumer)
|
||||
}
|
||||
if ci.AckFloor.Consumer != 6 {
|
||||
t.Fatalf("Expected %d for ack floor but got %d", 6, ci.AckFloor.Consumer)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamUpdate(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R32", 3)
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
|
||||
Reference in New Issue
Block a user