diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d3d84547..b2d401c9 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -183,6 +183,18 @@ const ( // JSAdvisoryConsumerDeletedPre notification that a template deleted JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED" + // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created + JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE" + + // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed + JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE" + + // JSAdvisoryStreamRestoreCreatePre notification that a restore was start + JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE" + + // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed + JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE" + // JSAuditAdvisory is a notification about JetStream API access. // FIXME - Add in details about who.. JSAuditAdvisory = "$JS.EVENT.ADVISORY.API" @@ -1127,6 +1139,21 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r // Create our internal subscription to accept the snapshot. restoreSubj := fmt.Sprintf(jsRestoreDeliverT, stream, nuid.Next()) + s.Noticef("Starting restore for stream %q in account %q", stream, c.acc.Name) + start := time.Now() + received := 0 + + caudit := c.apiAuditClient() + s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCreatePre+"."+stream, &JSRestoreCreateAdvisory{ + TypedEvent: TypedEvent{ + Type: JSRestoreCreateAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Stream: stream, + Client: caudit, + }) + // FIXME(dlc) - Can't recover well here if something goes wrong. Could use channels and at least time // things out. Note that this is tied to the requesting client, so if it is a tool this goes away when // the client does. Only thing leaking here is the sub on strange failure. @@ -1138,10 +1165,28 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r tfile.Close() os.Remove(tfile.Name()) c.processUnsub(sub.sid) + + end := time.Now() + s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCompletePre+"."+stream, &JSRestoreCompleteAdvisory{ + TypedEvent: TypedEvent{ + Type: JSRestoreCompleteAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Stream: stream, + Start: start.UTC(), + End: end.UTC(), + Bytes: int64(received), + Client: caudit, + }) + + s.Noticef("Completed %s restore for stream %q in account %q in %v", FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start)) + return } // Append chunk to temp file. tfile.Write(msg) + received += len(msg) }) resp.DeliverSubject = restoreSubj @@ -1190,8 +1235,17 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, // We will do the snapshot in a go routine as well since check msgs may // stall this go routine. go func() { - sr, err := mset.Snapshot(0, !req.NoConsumers, req.CheckMsgs) + if req.CheckMsgs { + s.Noticef("Starting health check and snapshot for stream %q in account %q", mset.Name(), mset.jsa.account.Name) + } else { + s.Noticef("Starting snapshot for stream %q in account %q", mset.Name(), mset.jsa.account.Name) + } + + start := time.Now() + + sr, err := mset.Snapshot(0, req.CheckMsgs, !req.NoConsumers) if err != nil { + s.Noticef("Snapshot of %q in account %q failed: %s", mset.Name(), mset.jsa.account.Name, err) resp.Error = jsError(err) s.sendAPIResponse(c, subject, reply, smsg, s.jsonResponse(&resp)) return @@ -1199,10 +1253,40 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, resp.NumBlks = sr.NumBlks resp.BlkSize = sr.BlkSize + s.sendAPIResponse(c, subject, reply, smsg, s.jsonResponse(resp)) + caudit := c.apiAuditClient() + s.publishAdvisory(c.acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.Name(), &JSSnapshotCreateAdvisory{ + TypedEvent: TypedEvent{ + Type: JSSnapshotCreatedAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Stream: mset.Name(), + NumBlks: sr.NumBlks, + BlkSize: sr.BlkSize, + Client: caudit, + }) + // Now do the real streaming. s.streamSnapshot(c, mset, sr, &req) + + end := time.Now() + + s.publishAdvisory(c.acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.Name(), &JSSnapshotCompleteAdvisory{ + TypedEvent: TypedEvent{ + Type: JSSnapshotCompleteAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Stream: mset.Name(), + Start: start.UTC(), + End: end.UTC(), + Client: caudit, + }) + + s.Noticef("Completed %s snapshot for stream %q in account %q in %v", FriendlyBytes(int64(resp.NumBlks*resp.BlkSize)), mset.Name(), mset.jsa.account.Name, end.Sub(start)) }() } @@ -1558,6 +1642,21 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject, // sendJetStreamAPIAuditAdvisor will send the audit event for a given event. func (s *Server) sendJetStreamAPIAuditAdvisory(c *client, subject, request, response string) { + s.publishAdvisory(c.acc, JSAuditAdvisory, JSAPIAudit{ + TypedEvent: TypedEvent{ + Type: JSAPIAuditType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Server: s.Name(), + Client: *c.apiAuditClient(), + Subject: subject, + Request: request, + Response: response, + }) +} + +func (c *client) apiAuditClient() *ClientAPIAudit { c.mu.Lock() auditUser := c.auditUser() h, p := c.auditClient() @@ -1567,33 +1666,15 @@ func (s *Server) sendJetStreamAPIAuditAdvisory(c *client, subject, request, resp cid := c.cid c.mu.Unlock() - e := JSAPIAudit{ - TypedEvent: TypedEvent{ - Type: JSAPIAuditType, - ID: nuid.Next(), - Time: time.Now().UTC(), - }, - Server: s.Name(), - Client: ClientAPIAudit{ - Host: h, - Port: p, - CID: cid, - Account: c.Account().Name, - User: auditUser, - Name: appName, - Language: lang, - Version: version, - }, - Subject: subject, - Request: request, - Response: response, - } - - ej, err := json.MarshalIndent(e, "", " ") - if err == nil { - s.sendInternalAccountMsg(c.acc, JSAuditAdvisory, ej) - } else { - s.Warnf("JetStream could not marshal audit event for account %q: %v", c.acc.Name, err) + return &ClientAPIAudit{ + Host: h, + Port: p, + CID: cid, + Account: c.Account().Name, + User: auditUser, + Name: appName, + Language: lang, + Version: version, } } diff --git a/server/jetstream_events.go b/server/jetstream_events.go index 32044abe..f72f5d90 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -1,5 +1,22 @@ package server +import ( + "encoding/json" + "time" +) + +func (s *Server) publishAdvisory(acc *Account, subject string, adv interface{}) { + ej, err := json.MarshalIndent(adv, "", " ") + if err == nil { + err = s.sendInternalAccountMsg(acc, subject, ej) + if err != nil { + s.Warnf("Advisory could not be sent for account %q: %v", acc.Name, err) + } + } else { + s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err) + } +} + // ClientAPIAudit is for identifying a client who initiated an API call to the system. type ClientAPIAudit struct { Host string `json:"host"` @@ -94,3 +111,50 @@ type JSConsumerDeliveryTerminatedAdvisory struct { // JSConsumerDeliveryTerminatedAdvisoryType is the schema type for JSConsumerDeliveryTerminatedAdvisory const JSConsumerDeliveryTerminatedAdvisoryType = "io.nats.jetstream.advisory.v1.terminated" + +// JSSnapshotCreateAdvisory is an advisory sent after a snapshot is successfully started +type JSSnapshotCreateAdvisory struct { + TypedEvent + Stream string `json:"stream"` + NumBlks int `json:"blocks"` + BlkSize int `json:"block_size"` + Client *ClientAPIAudit `json:"client"` +} + +// JSSnapshotCreatedAdvisoryType is the schema type for JSSnapshotCreateAdvisory +const JSSnapshotCreatedAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_create" + +// JSSnapshotCompleteAdvisory is an advisory sent after a snapshot is successfully started +type JSSnapshotCompleteAdvisory struct { + TypedEvent + Stream string `json:"stream"` + Start time.Time `json:"start"` + End time.Time `json:"end"` + Client *ClientAPIAudit `json:"client"` +} + +// JSSnapshotCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory +const JSSnapshotCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_complete" + +// JSRestoreCreateAdvisory is an advisory sent after a snapshot is successfully started +type JSRestoreCreateAdvisory struct { + TypedEvent + Stream string `json:"stream"` + Client *ClientAPIAudit `json:"client"` +} + +// JSRestoreCreateAdvisory is the schema type for JSSnapshotCreateAdvisory +const JSRestoreCreateAdvisoryType = "io.nats.jetstream.advisory.v1.restore_create" + +// JSRestoreCompleteAdvisory is an advisory sent after a snapshot is successfully started +type JSRestoreCompleteAdvisory struct { + TypedEvent + Stream string `json:"stream"` + Start time.Time `json:"start"` + End time.Time `json:"end"` + Bytes int64 `json:"bytes"` + Client *ClientAPIAudit `json:"client"` +} + +// JSRestoreCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory +const JSRestoreCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.restore_complete" diff --git a/server/stream.go b/server/stream.go index be40d5c8..7aa68daf 100644 --- a/server/stream.go +++ b/server/stream.go @@ -978,7 +978,7 @@ func (mset *Stream) Snapshot(deadline time.Duration, checkMsgs, includeConsumers o.writeState() } - return store.Snapshot(deadline, checkMsgs, includeConsumers) + return store.Snapshot(deadline, includeConsumers, checkMsgs) } const snapsDir = "__snapshots__" diff --git a/test/jetstream_test.go b/test/jetstream_test.go index a525d615..09d1ab74 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -2810,7 +2810,7 @@ func TestJetStreamSnapshots(t *testing.T) { // Snapshot state of the stream and consumers. info := info{mset.Config(), mset.State(), obs} - sr, err := mset.Snapshot(5*time.Second, true, false) + sr, err := mset.Snapshot(5*time.Second, false, true) if err != nil { t.Fatalf("Error getting snapshot: %v", err) }