Make sure to suppress duplicate create/delete audit events

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-31 06:05:49 -08:00
parent 230c17ee48
commit 095a83bc2a
6 changed files with 125 additions and 40 deletions

View File

@@ -1,3 +1,6 @@
dist: focal
language: go
go:
- 1.15.x

View File

@@ -582,14 +582,22 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, ca *consum
mset.setConsumer(o)
mset.mu.Unlock()
if !s.JetStreamIsClustered() {
if !s.JetStreamIsClustered() && s.standAloneMode() {
o.setLeader(true)
}
// This is always true in single server mode.
if o.isLeader() {
// Send advisory.
o.sendCreateAdvisory()
var suppress bool
if !s.standAloneMode() && ca == nil {
suppress = true
} else if ca != nil {
suppress = ca.responded
}
if !suppress {
o.sendCreateAdvisory()
}
}
return o, nil
@@ -2321,7 +2329,10 @@ func (o *Consumer) Stream() string {
o.mu.RLock()
mset := o.mset
o.mu.RUnlock()
return mset.Name()
if mset != nil {
return mset.Name()
}
return _EMPTY_
}
// Active indicates if this consumer is still active.

View File

@@ -1038,7 +1038,7 @@ func (jsa *jsAccount) delete() {
jsa.mu.Unlock()
for _, ms := range streams {
ms.stop(false)
ms.stop(false, false)
}
for _, t := range ts {

View File

@@ -1368,6 +1368,9 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
} else {
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(mset.raftNode())}
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
if node := mset.raftNode(); node != nil {
mset.sendCreateAdvisory()
}
}
}
@@ -1719,7 +1722,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
if mset.Config().internal {
err = errors.New("not allowed to delete internal stream")
} else {
err = mset.Delete()
err = mset.stop(true, wasLeader)
}
}
@@ -1934,8 +1937,8 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
} else if mset != nil {
if mset.Config().internal {
err = errors.New("not allowed to delete internal consumer")
} else if obs := mset.LookupConsumer(ca.Name); obs != nil {
err = obs.Delete()
} else if o := mset.LookupConsumer(ca.Name); o != nil {
err = o.stop(true, true, wasLeader)
} else {
resp.Error = jsNoConsumerErr
}
@@ -1945,12 +1948,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
ca.Group.node.Delete()
}
if !isMember || !wasLeader && ca.Group.node != nil && ca.Group.node.GroupLeader() != noLeader {
return
}
// Just return if no reply subject.
if ca.Reply == _EMPTY_ {
if !wasLeader || ca.Reply == _EMPTY_ {
return
}
@@ -2193,6 +2191,9 @@ func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssign
} else {
resp.ConsumerInfo = o.Info()
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
if node := o.raftNode(); node != nil {
o.sendCreateAdvisory()
}
}
}

View File

@@ -282,7 +282,15 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa
if isLeader {
// Send advisory.
mset.sendCreateAdvisory()
var suppress bool
if !s.standAloneMode() && sa == nil {
suppress = true
} else if sa != nil {
suppress = sa.responded
}
if !suppress {
mset.sendCreateAdvisory()
}
}
return mset, nil
@@ -497,10 +505,12 @@ func (mset *Stream) sendCreateAdvisory() {
}
j, err := json.MarshalIndent(m, "", " ")
if err == nil {
subj := JSAdvisoryStreamCreatedPre + "." + name
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
if err != nil {
return
}
subj := JSAdvisoryStreamCreatedPre + "." + name
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
}
func (mset *Stream) sendDeleteAdvisoryLocked() {
@@ -670,16 +680,6 @@ func (mset *Stream) FileStoreConfig() (FileStoreConfig, error) {
// Delete deletes a stream from the owning account.
func (mset *Stream) Delete() error {
mset.mu.Lock()
jsa := mset.jsa
mset.mu.Unlock()
if jsa == nil {
return ErrJetStreamNotEnabledForAccount
}
jsa.mu.Lock()
delete(jsa.streams, mset.config.Name)
jsa.mu.Unlock()
return mset.delete()
}
@@ -1401,6 +1401,9 @@ func (mset *Stream) setupSendCapabilities() {
// Name returns the stream name.
func (mset *Stream) Name() string {
if mset == nil {
return _EMPTY_
}
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.config.Name
@@ -1468,11 +1471,24 @@ func (mset *Stream) internalSendLoop() {
// Internal function to delete a stream.
func (mset *Stream) delete() error {
return mset.stop(true)
return mset.stop(true, true)
}
// Internal function to stop or delete the stream.
func (mset *Stream) stop(delete bool) error {
func (mset *Stream) stop(deleteFlag, advisory bool) error {
mset.mu.RLock()
jsa := mset.jsa
mset.mu.RUnlock()
if jsa == nil {
return ErrJetStreamNotEnabledForAccount
}
// Remove from our account map.
jsa.mu.Lock()
delete(jsa.streams, mset.config.Name)
jsa.mu.Unlock()
// Clean up consumers.
mset.mu.Lock()
var obs []*Consumer
@@ -1486,7 +1502,7 @@ func (mset *Stream) stop(delete bool) error {
// Second flag says do not broadcast to signal.
// TODO(dlc) - If we have an err here we don't want to stop
// but should we log?
o.stop(delete, false, delete)
o.stop(deleteFlag, false, advisory)
}
mset.mu.Lock()
@@ -1499,7 +1515,7 @@ func (mset *Stream) stop(delete bool) error {
// Cluster cleanup
if n := mset.node; n != nil {
if delete {
if deleteFlag {
n.Delete()
} else {
n.Stop()
@@ -1508,7 +1524,7 @@ func (mset *Stream) stop(delete bool) error {
}
// Send stream delete advisory after the consumers.
if delete {
if deleteFlag && advisory {
mset.sendDeleteAdvisoryLocked()
}
@@ -1547,7 +1563,7 @@ func (mset *Stream) stop(delete bool) error {
return nil
}
if delete {
if deleteFlag {
if err := mset.store.Delete(); err != nil {
return err
}

View File

@@ -1930,7 +1930,15 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
}
nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
}
nc.Request(rresp.DeliverSubject, nil, time.Second)
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
if rresp.Error != nil {
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
}
si, err := js.StreamInfo("TEST")
if err != nil {
@@ -2532,7 +2540,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
}
}
func TestJetStreamRestartAdvisories(t *testing.T) {
func TestJetStreamClusterRestartAndRemoveAdvisories(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
@@ -2546,6 +2554,12 @@ func TestJetStreamRestartAdvisories(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
csub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.CREATED.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer csub.Unsubscribe()
nc.Flush()
sendBatch := func(subject string, n int) {
@@ -2582,10 +2596,18 @@ func TestJetStreamRestartAdvisories(t *testing.T) {
}
sendBatch("TEST-3", 100)
drainSub := func(sub *nats.Subscription) {
for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) {
}
}
// Wait for the advisories for all streams and consumers.
checkSubsPending(t, sub, 9) // 3 streams, 3 consumers, 3 stream names lookups for creating consumers.
for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) {
}
drainSub(sub)
// Created audit events.
checkSubsPending(t, csub, 6)
drainSub(csub)
usub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.UPDATED.>")
if err != nil {
@@ -2604,12 +2626,38 @@ func TestJetStreamRestartAdvisories(t *testing.T) {
c.restartServer(cs)
}
}
for _, cs := range c.servers {
c.waitOnServerCurrent(cs)
}
c.waitOnAllCurrent()
checkSubsPending(t, csub, 0)
checkSubsPending(t, sub, 0)
checkSubsPending(t, usub, 0)
dsub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.DELETED.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer dsub.Unsubscribe()
nc.Flush()
c.waitOnConsumerLeader("$G", "TEST-1", "DC")
// Now check delete advisories as well.
if err := js.DeleteConsumer("TEST-1", "DC"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, csub, 0)
checkSubsPending(t, dsub, 1)
checkSubsPending(t, sub, 1)
checkSubsPending(t, usub, 0)
drainSub(dsub)
if err := js.DeleteStream("TEST-3"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, dsub, 2) // Stream and the consumer underneath.
checkSubsPending(t, sub, 4)
}
func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) {
@@ -2998,6 +3046,12 @@ func (c *cluster) waitOnServerCurrent(s *server.Server) {
c.t.Fatalf("Expected server %q to eventually be current", s)
}
func (c *cluster) waitOnAllCurrent() {
for _, cs := range c.servers {
c.waitOnServerCurrent(cs)
}
}
func (c *cluster) randomNonLeader() *server.Server {
// range should randomize.. but..
for _, s := range c.servers {