mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #1439 from ripienaar/snapshot_advisories
add advisories and logging for snapshots
This commit is contained in:
@@ -183,6 +183,18 @@ const (
|
||||
// JSAdvisoryConsumerDeletedPre notification that a template deleted
|
||||
JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
|
||||
|
||||
// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created
|
||||
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
|
||||
|
||||
// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed
|
||||
JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
|
||||
|
||||
// JSAdvisoryStreamRestoreCreatePre notification that a restore was start
|
||||
JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
|
||||
|
||||
// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed
|
||||
JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
|
||||
|
||||
// JSAuditAdvisory is a notification about JetStream API access.
|
||||
// FIXME - Add in details about who..
|
||||
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
|
||||
@@ -1127,6 +1139,21 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
|
||||
// Create our internal subscription to accept the snapshot.
|
||||
restoreSubj := fmt.Sprintf(jsRestoreDeliverT, stream, nuid.Next())
|
||||
|
||||
s.Noticef("Starting restore for stream %q in account %q", stream, c.acc.Name)
|
||||
start := time.Now()
|
||||
received := 0
|
||||
|
||||
caudit := c.apiAuditClient()
|
||||
s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCreatePre+"."+stream, &JSRestoreCreateAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSRestoreCreateAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Client: caudit,
|
||||
})
|
||||
|
||||
// FIXME(dlc) - Can't recover well here if something goes wrong. Could use channels and at least time
|
||||
// things out. Note that this is tied to the requesting client, so if it is a tool this goes away when
|
||||
// the client does. Only thing leaking here is the sub on strange failure.
|
||||
@@ -1138,10 +1165,28 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
|
||||
tfile.Close()
|
||||
os.Remove(tfile.Name())
|
||||
c.processUnsub(sub.sid)
|
||||
|
||||
end := time.Now()
|
||||
s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCompletePre+"."+stream, &JSRestoreCompleteAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSRestoreCompleteAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Start: start.UTC(),
|
||||
End: end.UTC(),
|
||||
Bytes: int64(received),
|
||||
Client: caudit,
|
||||
})
|
||||
|
||||
s.Noticef("Completed %s restore for stream %q in account %q in %v", FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start))
|
||||
|
||||
return
|
||||
}
|
||||
// Append chunk to temp file.
|
||||
tfile.Write(msg)
|
||||
received += len(msg)
|
||||
})
|
||||
|
||||
resp.DeliverSubject = restoreSubj
|
||||
@@ -1190,8 +1235,17 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
|
||||
// We will do the snapshot in a go routine as well since check msgs may
|
||||
// stall this go routine.
|
||||
go func() {
|
||||
sr, err := mset.Snapshot(0, !req.NoConsumers, req.CheckMsgs)
|
||||
if req.CheckMsgs {
|
||||
s.Noticef("Starting health check and snapshot for stream %q in account %q", mset.Name(), mset.jsa.account.Name)
|
||||
} else {
|
||||
s.Noticef("Starting snapshot for stream %q in account %q", mset.Name(), mset.jsa.account.Name)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
sr, err := mset.Snapshot(0, req.CheckMsgs, !req.NoConsumers)
|
||||
if err != nil {
|
||||
s.Noticef("Snapshot of %q in account %q failed: %s", mset.Name(), mset.jsa.account.Name, err)
|
||||
resp.Error = jsError(err)
|
||||
s.sendAPIResponse(c, subject, reply, smsg, s.jsonResponse(&resp))
|
||||
return
|
||||
@@ -1199,10 +1253,40 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
|
||||
|
||||
resp.NumBlks = sr.NumBlks
|
||||
resp.BlkSize = sr.BlkSize
|
||||
|
||||
s.sendAPIResponse(c, subject, reply, smsg, s.jsonResponse(resp))
|
||||
|
||||
caudit := c.apiAuditClient()
|
||||
s.publishAdvisory(c.acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.Name(), &JSSnapshotCreateAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSSnapshotCreatedAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: mset.Name(),
|
||||
NumBlks: sr.NumBlks,
|
||||
BlkSize: sr.BlkSize,
|
||||
Client: caudit,
|
||||
})
|
||||
|
||||
// Now do the real streaming.
|
||||
s.streamSnapshot(c, mset, sr, &req)
|
||||
|
||||
end := time.Now()
|
||||
|
||||
s.publishAdvisory(c.acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.Name(), &JSSnapshotCompleteAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSSnapshotCompleteAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: mset.Name(),
|
||||
Start: start.UTC(),
|
||||
End: end.UTC(),
|
||||
Client: caudit,
|
||||
})
|
||||
|
||||
s.Noticef("Completed %s snapshot for stream %q in account %q in %v", FriendlyBytes(int64(resp.NumBlks*resp.BlkSize)), mset.Name(), mset.jsa.account.Name, end.Sub(start))
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -1558,6 +1642,21 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
|
||||
|
||||
// sendJetStreamAPIAuditAdvisor will send the audit event for a given event.
|
||||
func (s *Server) sendJetStreamAPIAuditAdvisory(c *client, subject, request, response string) {
|
||||
s.publishAdvisory(c.acc, JSAuditAdvisory, JSAPIAudit{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSAPIAuditType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Server: s.Name(),
|
||||
Client: *c.apiAuditClient(),
|
||||
Subject: subject,
|
||||
Request: request,
|
||||
Response: response,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *client) apiAuditClient() *ClientAPIAudit {
|
||||
c.mu.Lock()
|
||||
auditUser := c.auditUser()
|
||||
h, p := c.auditClient()
|
||||
@@ -1567,33 +1666,15 @@ func (s *Server) sendJetStreamAPIAuditAdvisory(c *client, subject, request, resp
|
||||
cid := c.cid
|
||||
c.mu.Unlock()
|
||||
|
||||
e := JSAPIAudit{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSAPIAuditType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Server: s.Name(),
|
||||
Client: ClientAPIAudit{
|
||||
Host: h,
|
||||
Port: p,
|
||||
CID: cid,
|
||||
Account: c.Account().Name,
|
||||
User: auditUser,
|
||||
Name: appName,
|
||||
Language: lang,
|
||||
Version: version,
|
||||
},
|
||||
Subject: subject,
|
||||
Request: request,
|
||||
Response: response,
|
||||
}
|
||||
|
||||
ej, err := json.MarshalIndent(e, "", " ")
|
||||
if err == nil {
|
||||
s.sendInternalAccountMsg(c.acc, JSAuditAdvisory, ej)
|
||||
} else {
|
||||
s.Warnf("JetStream could not marshal audit event for account %q: %v", c.acc.Name, err)
|
||||
return &ClientAPIAudit{
|
||||
Host: h,
|
||||
Port: p,
|
||||
CID: cid,
|
||||
Account: c.Account().Name,
|
||||
User: auditUser,
|
||||
Name: appName,
|
||||
Language: lang,
|
||||
Version: version,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,22 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (s *Server) publishAdvisory(acc *Account, subject string, adv interface{}) {
|
||||
ej, err := json.MarshalIndent(adv, "", " ")
|
||||
if err == nil {
|
||||
err = s.sendInternalAccountMsg(acc, subject, ej)
|
||||
if err != nil {
|
||||
s.Warnf("Advisory could not be sent for account %q: %v", acc.Name, err)
|
||||
}
|
||||
} else {
|
||||
s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// ClientAPIAudit is for identifying a client who initiated an API call to the system.
|
||||
type ClientAPIAudit struct {
|
||||
Host string `json:"host"`
|
||||
@@ -94,3 +111,50 @@ type JSConsumerDeliveryTerminatedAdvisory struct {
|
||||
|
||||
// JSConsumerDeliveryTerminatedAdvisoryType is the schema type for JSConsumerDeliveryTerminatedAdvisory
|
||||
const JSConsumerDeliveryTerminatedAdvisoryType = "io.nats.jetstream.advisory.v1.terminated"
|
||||
|
||||
// JSSnapshotCreateAdvisory is an advisory sent after a snapshot is successfully started
|
||||
type JSSnapshotCreateAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
NumBlks int `json:"blocks"`
|
||||
BlkSize int `json:"block_size"`
|
||||
Client *ClientAPIAudit `json:"client"`
|
||||
}
|
||||
|
||||
// JSSnapshotCreatedAdvisoryType is the schema type for JSSnapshotCreateAdvisory
|
||||
const JSSnapshotCreatedAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_create"
|
||||
|
||||
// JSSnapshotCompleteAdvisory is an advisory sent after a snapshot is successfully started
|
||||
type JSSnapshotCompleteAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
Start time.Time `json:"start"`
|
||||
End time.Time `json:"end"`
|
||||
Client *ClientAPIAudit `json:"client"`
|
||||
}
|
||||
|
||||
// JSSnapshotCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory
|
||||
const JSSnapshotCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_complete"
|
||||
|
||||
// JSRestoreCreateAdvisory is an advisory sent after a snapshot is successfully started
|
||||
type JSRestoreCreateAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
Client *ClientAPIAudit `json:"client"`
|
||||
}
|
||||
|
||||
// JSRestoreCreateAdvisory is the schema type for JSSnapshotCreateAdvisory
|
||||
const JSRestoreCreateAdvisoryType = "io.nats.jetstream.advisory.v1.restore_create"
|
||||
|
||||
// JSRestoreCompleteAdvisory is an advisory sent after a snapshot is successfully started
|
||||
type JSRestoreCompleteAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
Start time.Time `json:"start"`
|
||||
End time.Time `json:"end"`
|
||||
Bytes int64 `json:"bytes"`
|
||||
Client *ClientAPIAudit `json:"client"`
|
||||
}
|
||||
|
||||
// JSRestoreCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory
|
||||
const JSRestoreCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.restore_complete"
|
||||
|
||||
@@ -978,7 +978,7 @@ func (mset *Stream) Snapshot(deadline time.Duration, checkMsgs, includeConsumers
|
||||
o.writeState()
|
||||
}
|
||||
|
||||
return store.Snapshot(deadline, checkMsgs, includeConsumers)
|
||||
return store.Snapshot(deadline, includeConsumers, checkMsgs)
|
||||
}
|
||||
|
||||
const snapsDir = "__snapshots__"
|
||||
|
||||
@@ -2810,7 +2810,7 @@ func TestJetStreamSnapshots(t *testing.T) {
|
||||
// Snapshot state of the stream and consumers.
|
||||
info := info{mset.Config(), mset.State(), obs}
|
||||
|
||||
sr, err := mset.Snapshot(5*time.Second, true, false)
|
||||
sr, err := mset.Snapshot(5*time.Second, false, true)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting snapshot: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user