send stream advisories using a helper

This commit is contained in:
R.I.Pienaar
2020-06-02 08:48:02 +02:00
parent b5dfb984e9
commit 3fc5c9284a
2 changed files with 167 additions and 27 deletions

View File

@@ -183,6 +183,18 @@ const (
// JSAdvisoryConsumerDeletedPre notification that a template deleted
JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed
JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
// JSAdvisoryStreamRestoreCreatePre notification that a restore was start
JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed
JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
// JSAuditAdvisory is a notification about JetStream API access.
// FIXME - Add in details about who..
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
@@ -1127,6 +1139,21 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
// Create our internal subscription to accept the snapshot.
restoreSubj := fmt.Sprintf(jsRestoreDeliverT, stream, nuid.Next())
s.Noticef("Starting restore for stream %q in account %q", stream, c.acc.Name)
start := time.Now()
received := 0
caudit := c.apiAuditClient()
s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCreatePre+"."+stream, &JSRestoreCreateAdvisory{
TypedEvent: TypedEvent{
Type: JSRestoreCreateAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: stream,
Client: caudit,
})
// FIXME(dlc) - Can't recover well here if something goes wrong. Could use channels and at least time
// things out. Note that this is tied to the requesting client, so if it is a tool this goes away when
// the client does. Only thing leaking here is the sub on strange failure.
@@ -1138,10 +1165,28 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
tfile.Close()
os.Remove(tfile.Name())
c.processUnsub(sub.sid)
end := time.Now()
s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCompletePre+"."+stream, &JSRestoreCompleteAdvisory{
TypedEvent: TypedEvent{
Type: JSRestoreCompleteAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: stream,
Start: start.UTC(),
End: end.UTC(),
Bytes: int64(received),
Client: caudit,
})
s.Noticef("Completed %s restore for stream %q in account %q in %v", FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start))
return
}
// Append chunk to temp file.
tfile.Write(msg)
received += len(msg)
})
resp.DeliverSubject = restoreSubj
@@ -1190,8 +1235,12 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
// We will do the snapshot in a go routine as well since check msgs may
// stall this go routine.
go func() {
s.Noticef("Starting snapshot for stream %q in account %q", mset.Name(), mset.jsa.account.Name)
start := time.Now()
sr, err := mset.Snapshot(0, !req.NoConsumers, req.CheckMsgs)
if err != nil {
s.Noticef("Snapshot of %q in account %q failed: %s", mset.Name(), mset.jsa.account.Name, err)
resp.Error = jsError(err)
s.sendAPIResponse(c, subject, reply, smsg, s.jsonResponse(&resp))
return
@@ -1199,10 +1248,40 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
resp.NumBlks = sr.NumBlks
resp.BlkSize = sr.BlkSize
s.sendAPIResponse(c, subject, reply, smsg, s.jsonResponse(resp))
caudit := c.apiAuditClient()
s.publishAdvisory(c.acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.Name(), &JSSnapshotCreateAdvisory{
TypedEvent: TypedEvent{
Type: JSSnapshotCreatedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: mset.Name(),
NumBlks: sr.NumBlks,
BlkSize: sr.BlkSize,
Client: caudit,
})
// Now do the real streaming.
s.streamSnapshot(c, mset, sr, &req)
end := time.Now()
s.publishAdvisory(c.acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.Name(), &JSSnapshotCompleteAdvisory{
TypedEvent: TypedEvent{
Type: JSSnapshotCompleteAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: mset.Name(),
Start: start.UTC(),
End: end.UTC(),
Client: caudit,
})
s.Noticef("Completed %s snapshot for stream %q in account %q in %v", FriendlyBytes(int64(resp.NumBlks*resp.BlkSize)), mset.Name(), mset.jsa.account.Name, end.Sub(start))
}()
}
@@ -1558,6 +1637,21 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
// sendJetStreamAPIAuditAdvisor will send the audit event for a given event.
func (s *Server) sendJetStreamAPIAuditAdvisory(c *client, subject, request, response string) {
s.publishAdvisory(c.acc, subject, JSAPIAudit{
TypedEvent: TypedEvent{
Type: JSAPIAuditType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Client: *c.apiAuditClient(),
Subject: subject,
Request: request,
Response: response,
})
}
func (c *client) apiAuditClient() *ClientAPIAudit {
c.mu.Lock()
auditUser := c.auditUser()
h, p := c.auditClient()
@@ -1567,33 +1661,15 @@ func (s *Server) sendJetStreamAPIAuditAdvisory(c *client, subject, request, resp
cid := c.cid
c.mu.Unlock()
e := JSAPIAudit{
TypedEvent: TypedEvent{
Type: JSAPIAuditType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Client: ClientAPIAudit{
Host: h,
Port: p,
CID: cid,
Account: c.Account().Name,
User: auditUser,
Name: appName,
Language: lang,
Version: version,
},
Subject: subject,
Request: request,
Response: response,
}
ej, err := json.MarshalIndent(e, "", " ")
if err == nil {
s.sendInternalAccountMsg(c.acc, JSAuditAdvisory, ej)
} else {
s.Warnf("JetStream could not marshal audit event for account %q: %v", c.acc.Name, err)
return &ClientAPIAudit{
Host: h,
Port: p,
CID: cid,
Account: c.Account().Name,
User: auditUser,
Name: appName,
Language: lang,
Version: version,
}
}

View File

@@ -1,5 +1,22 @@
package server
import (
"encoding/json"
"time"
)
func (s *Server) publishAdvisory(acc *Account, subject string, adv interface{}) {
ej, err := json.MarshalIndent(adv, "", " ")
if err == nil {
err = s.sendInternalAccountMsg(acc, subject, ej)
if err != nil {
s.Warnf("Advisory could not be sent for account %q: %v", acc.Name, err)
}
} else {
s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err)
}
}
// ClientAPIAudit is for identifying a client who initiated an API call to the system.
type ClientAPIAudit struct {
Host string `json:"host"`
@@ -94,3 +111,50 @@ type JSConsumerDeliveryTerminatedAdvisory struct {
// JSConsumerDeliveryTerminatedAdvisoryType is the schema type for JSConsumerDeliveryTerminatedAdvisory
const JSConsumerDeliveryTerminatedAdvisoryType = "io.nats.jetstream.advisory.v1.terminated"
// JSSnapshotCreateAdvisory is an advisory sent after a snapshot is successfully started
type JSSnapshotCreateAdvisory struct {
TypedEvent
Stream string `json:"stream"`
NumBlks int `json:"blocks"`
BlkSize int `json:"block_size"`
Client *ClientAPIAudit `json:"client"`
}
// JSSnapshotCreatedAdvisoryType is the schema type for JSSnapshotCreateAdvisory
const JSSnapshotCreatedAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_create"
// JSSnapshotCompleteAdvisory is an advisory sent after a snapshot is successfully started
type JSSnapshotCompleteAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
Client *ClientAPIAudit `json:"client"`
}
// JSSnapshotCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory
const JSSnapshotCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_complete"
// JSRestoreCreateAdvisory is an advisory sent after a snapshot is successfully started
type JSRestoreCreateAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Client *ClientAPIAudit `json:"client"`
}
// JSRestoreCreateAdvisory is the schema type for JSSnapshotCreateAdvisory
const JSRestoreCreateAdvisoryType = "io.nats.jetstream.advisory.v1.restore_create"
// JSRestoreCompleteAdvisory is an advisory sent after a snapshot is successfully started
type JSRestoreCompleteAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
Bytes int64 `json:"bytes"`
Client *ClientAPIAudit `json:"client"`
}
// JSRestoreCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory
const JSRestoreCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.restore_complete"